spice/server/red-channel-client.cpp
Frediano Ziglio 176970f3f1 red-channel-client: Remove GObject type
Make all RedChannelClient hierarchy a C++ class.
This allows to use virtual methods.
Added a normal contructor instead or properties and g_object_new.

As we remove GObject conversion macros I added a macro XXX_CAST
to create a function to replace the old macro.
They will be removed when more type safety is introduced.

There's a new SPICE_CXX_GLIB_ALLOCATOR macro in red-common.h.
This macro, added to a class define the class allocator allowing
to use, in this case, GLib for allocation. This to avoid C++ library
dependency and to initialize all structure to 0 (not all fields
are manually initialized, will be improved with more encapsulation).

Currently the methods are mainly public, access will be modified
when more encapsulation (all functions in method) are done.

Some classes are now defined in the header, C++ uses access to
limit accessibility but for efficiency and type safety/inline and
other features require types to be defined in the headers.

Some fields were moved from XxxPrivate structure to class, C++
has accessibility.

Many destructors are defined as protected to forbid the use of
stack, this as these objects uses internal reference counting
to have normal pointers. Maybe in the future pointers like
std::shared_ptr could be used instead.

Reference counting is now implemented very easily using atomic
operations.

Signed-off-by: Frediano Ziglio <fziglio@redhat.com>
2020-05-01 06:58:09 +01:00

1711 lines
54 KiB
C++

/*
Copyright (C) 2009-2016 Red Hat, Inc.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/
#include <config.h>
#include <glib.h>
#include <stdio.h>
#include <stdint.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#ifndef _WIN32
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/ioctl.h>
#endif
#ifdef HAVE_LINUX_SOCKIOS_H
#include <linux/sockios.h> /* SIOCOUTQ */
#endif
#include <common/generated_server_marshallers.h>
#include "red-channel-client.h"
#include "red-client.h"
#define CLIENT_ACK_WINDOW 20
#define MAX_HEADER_SIZE sizeof(SpiceDataHeader)
#ifndef IOV_MAX
#define IOV_MAX 1024
#endif
typedef struct SpiceDataHeaderOpaque SpiceDataHeaderOpaque;
typedef uint16_t (*get_msg_type_proc)(SpiceDataHeaderOpaque *header);
typedef uint32_t (*get_msg_size_proc)(SpiceDataHeaderOpaque *header);
typedef void (*set_msg_type_proc)(SpiceDataHeaderOpaque *header, uint16_t type);
typedef void (*set_msg_size_proc)(SpiceDataHeaderOpaque *header, uint32_t size);
typedef void (*set_msg_serial_proc)(SpiceDataHeaderOpaque *header, uint64_t serial);
typedef void (*set_msg_sub_list_proc)(SpiceDataHeaderOpaque *header, uint32_t sub_list);
struct SpiceDataHeaderOpaque {
uint8_t *data;
uint16_t header_size;
set_msg_type_proc set_msg_type;
set_msg_size_proc set_msg_size;
set_msg_serial_proc set_msg_serial;
set_msg_sub_list_proc set_msg_sub_list;
get_msg_type_proc get_msg_type;
get_msg_size_proc get_msg_size;
};
typedef enum {
PING_STATE_NONE,
PING_STATE_TIMER,
PING_STATE_WARMUP,
PING_STATE_LATENCY,
} QosPingState;
typedef struct RedChannelClientLatencyMonitor {
QosPingState state;
uint64_t last_pong_time;
SpiceTimer *timer;
uint32_t timeout;
uint32_t id;
bool tcp_nodelay;
bool warmup_was_sent;
int64_t roundtrip;
} RedChannelClientLatencyMonitor;
typedef enum {
CONNECTIVITY_STATE_CONNECTED,
CONNECTIVITY_STATE_BLOCKED,
CONNECTIVITY_STATE_WAIT_PONG,
CONNECTIVITY_STATE_DISCONNECTED,
} ConnectivityState;
typedef struct RedChannelClientConnectivityMonitor {
ConnectivityState state;
bool sent_bytes;
bool received_bytes;
uint32_t timeout;
SpiceTimer *timer;
} RedChannelClientConnectivityMonitor;
typedef struct OutgoingMessageBuffer {
int pos;
int size;
} OutgoingMessageBuffer;
typedef struct IncomingMessageBuffer {
uint8_t header_buf[MAX_HEADER_SIZE];
SpiceDataHeaderOpaque header;
uint32_t header_pos;
uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
uint32_t msg_pos;
} IncomingMessageBuffer;
struct RedChannelClientPrivate
{
SPICE_CXX_GLIB_ALLOCATOR
RedChannel *channel;
RedClient *client;
RedStream *stream;
gboolean monitor_latency;
struct {
uint32_t generation;
uint32_t client_generation;
uint32_t messages_window;
uint32_t client_window;
} ack_data;
struct {
/* this can be either main.marshaller or urgent.marshaller */
SpiceMarshaller *marshaller;
SpiceDataHeaderOpaque header;
uint32_t size;
bool blocked;
uint64_t last_sent_serial;
struct {
SpiceMarshaller *marshaller;
uint8_t *header_data;
} main;
struct {
SpiceMarshaller *marshaller;
} urgent;
} send_data;
bool block_read;
bool during_send;
GQueue pipe;
RedChannelCapabilities remote_caps;
bool is_mini_header;
bool wait_migrate_data;
bool wait_migrate_flush_mark;
RedChannelClientLatencyMonitor latency_monitor;
RedChannelClientConnectivityMonitor connectivity_monitor;
IncomingMessageBuffer incoming;
OutgoingMessageBuffer outgoing;
RedStatCounter out_messages;
RedStatCounter out_bytes;
};
static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type);
static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size);
static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial);
static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list);
static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header);
static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header);
static const SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader),
full_header_set_msg_type,
full_header_set_msg_size,
full_header_set_msg_serial,
full_header_set_msg_sub_list,
full_header_get_msg_type,
full_header_get_msg_size};
static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type);
static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size);
static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial);
static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list);
static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header);
static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header);
static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader),
mini_header_set_msg_type,
mini_header_set_msg_size,
mini_header_set_msg_serial,
mini_header_set_msg_sub_list,
mini_header_get_msg_type,
mini_header_get_msg_size};
static void red_channel_client_clear_sent_item(RedChannelClient *rcc);
static void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t);
/*
* When an error occurs over a channel, we treat it as a warning
* for spice-server and shutdown the channel.
*/
#define spice_channel_client_error(rcc, format, ...) \
do { \
red_channel_warning(rcc->priv->channel, format, ## __VA_ARGS__); \
rcc->shutdown(); \
} while (0)
#define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15)
#define PING_TEST_LONG_TIMEOUT_MS (MSEC_PER_SEC * 60 * 5)
#define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10)
typedef struct RedEmptyMsgPipeItem {
RedPipeItem base;
int msg;
} RedEmptyMsgPipeItem;
typedef struct MarkerPipeItem {
RedPipeItem base;
bool item_sent;
} MarkerPipeItem;
static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout)
{
if (!rcc->priv->latency_monitor.timer) {
return;
}
if (rcc->priv->latency_monitor.state != PING_STATE_NONE) {
return;
}
rcc->priv->latency_monitor.state = PING_STATE_TIMER;
red_timer_start(rcc->priv->latency_monitor.timer, timeout);
}
static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc)
{
if (!rcc->priv->latency_monitor.timer) {
return;
}
if (rcc->priv->latency_monitor.state != PING_STATE_TIMER) {
return;
}
red_timer_cancel(rcc->priv->latency_monitor.timer);
rcc->priv->latency_monitor.state = PING_STATE_NONE;
}
static void red_channel_client_restart_ping_timer(RedChannelClient *rcc)
{
uint64_t passed, timeout;
if (!rcc->priv->latency_monitor.timer) {
return;
}
passed = (spice_get_monotonic_time_ns() - rcc->priv->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC;
timeout = PING_TEST_IDLE_NET_TIMEOUT_MS;
if (passed < rcc->priv->latency_monitor.timeout) {
timeout += rcc->priv->latency_monitor.timeout - passed;
}
red_channel_client_start_ping_timer(rcc, timeout);
}
RedChannelClient::~RedChannelClient()
{
red_timer_remove(priv->latency_monitor.timer);
priv->latency_monitor.timer = NULL;
red_timer_remove(priv->connectivity_monitor.timer);
priv->connectivity_monitor.timer = NULL;
red_stream_free(priv->stream);
priv->stream = NULL;
if (priv->send_data.main.marshaller) {
spice_marshaller_destroy(priv->send_data.main.marshaller);
}
if (priv->send_data.urgent.marshaller) {
spice_marshaller_destroy(priv->send_data.urgent.marshaller);
}
red_channel_capabilities_reset(&priv->remote_caps);
if (priv->channel) {
g_object_unref(priv->channel);
}
delete priv;
}
RedChannelClient::RedChannelClient(RedChannel *channel,
RedClient *client,
RedStream *stream,
RedChannelCapabilities *caps,
bool monitor_latency)
{
RedChannelClient *self = this;
// XXX initialize
priv = new RedChannelClientPrivate();
// blocks send message (maybe use send_data.blocked + block flags)
self->priv->ack_data.messages_window = ~0;
self->priv->ack_data.client_generation = ~0;
self->priv->ack_data.client_window = CLIENT_ACK_WINDOW;
self->priv->send_data.main.marshaller = spice_marshaller_new();
self->priv->send_data.urgent.marshaller = spice_marshaller_new();
self->priv->send_data.marshaller = self->priv->send_data.main.marshaller;
g_queue_init(&self->priv->pipe);
red_channel_capabilities_reset(&self->priv->remote_caps);
red_channel_capabilities_init(&self->priv->remote_caps, caps);
priv->channel = (RedChannel*) g_object_ref(channel);
priv->client = client;
priv->stream = stream;
self->priv->outgoing.pos = 0;
self->priv->outgoing.size = 0;
if (self->test_remote_common_cap(SPICE_COMMON_CAP_MINI_HEADER)) {
self->priv->incoming.header = mini_header_wrapper;
self->priv->send_data.header = mini_header_wrapper;
self->priv->is_mini_header = TRUE;
} else {
self->priv->incoming.header = full_header_wrapper;
self->priv->send_data.header = full_header_wrapper;
self->priv->is_mini_header = FALSE;
}
self->priv->incoming.header.data = self->priv->incoming.header_buf;
RedsState* reds = channel->get_server();
const RedStatNode *node = channel->get_stat_node();
stat_init_counter(&self->priv->out_messages, reds, node, "out_messages", TRUE);
stat_init_counter(&self->priv->out_bytes, reds, node, "out_bytes", TRUE);
}
RedChannel* RedChannelClient::get_channel()
{
return priv->channel;
}
static void red_channel_client_data_sent(RedChannelClient *rcc, int n)
{
if (rcc->priv->connectivity_monitor.timer) {
rcc->priv->connectivity_monitor.sent_bytes = true;
}
stat_inc_counter(rcc->priv->out_bytes, n);
}
static void red_channel_client_data_read(RedChannelClient *rcc, int n)
{
if (rcc->priv->connectivity_monitor.timer) {
rcc->priv->connectivity_monitor.received_bytes = true;
}
}
static int red_channel_client_get_out_msg_size(RedChannelClient *rcc)
{
return rcc->priv->send_data.size;
}
static int red_channel_client_prepare_out_msg(RedChannelClient *rcc,
struct iovec *vec, int vec_size,
int pos)
{
return spice_marshaller_fill_iovec(rcc->priv->send_data.marshaller,
vec, vec_size, pos);
}
static void red_channel_client_set_blocked(RedChannelClient *rcc)
{
rcc->priv->send_data.blocked = TRUE;
}
static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc)
{
return (rcc->priv->send_data.marshaller == rcc->priv->send_data.urgent.marshaller);
}
static void red_channel_client_reset_send_data(RedChannelClient *rcc)
{
spice_marshaller_reset(rcc->priv->send_data.marshaller);
rcc->priv->send_data.header.data = spice_marshaller_reserve_space(rcc->priv->send_data.marshaller,
rcc->priv->send_data.header.header_size);
spice_marshaller_set_base(rcc->priv->send_data.marshaller, rcc->priv->send_data.header.header_size);
rcc->priv->send_data.header.set_msg_type(&rcc->priv->send_data.header, 0);
rcc->priv->send_data.header.set_msg_size(&rcc->priv->send_data.header, 0);
if (!rcc->priv->is_mini_header) {
spice_assert(rcc->priv->send_data.marshaller != rcc->priv->send_data.urgent.marshaller);
rcc->priv->send_data.header.set_msg_sub_list(&rcc->priv->send_data.header, 0);
}
}
static void red_channel_client_send_set_ack(RedChannelClient *rcc)
{
SpiceMsgSetAck ack;
spice_assert(rcc);
rcc->init_send_data(SPICE_MSG_SET_ACK);
ack.generation = ++rcc->priv->ack_data.generation;
ack.window = rcc->priv->ack_data.client_window;
rcc->priv->ack_data.messages_window = 0;
spice_marshall_msg_set_ack(rcc->priv->send_data.marshaller, &ack);
rcc->begin_send_message();
}
static void red_channel_client_send_migrate(RedChannelClient *rcc)
{
SpiceMsgMigrate migrate;
rcc->init_send_data(SPICE_MSG_MIGRATE);
g_object_get(rcc->priv->channel, "migration-flags", &migrate.flags, NULL);
spice_marshall_msg_migrate(rcc->priv->send_data.marshaller, &migrate);
if (migrate.flags & SPICE_MIGRATE_NEED_FLUSH) {
rcc->priv->wait_migrate_flush_mark = TRUE;
}
rcc->begin_send_message();
}
static void red_channel_client_send_ping(RedChannelClient *rcc)
{
SpiceMsgPing ping;
if (!rcc->priv->latency_monitor.warmup_was_sent) { // latency test start
int delay_val;
rcc->priv->latency_monitor.warmup_was_sent = true;
/*
* When testing latency, TCP_NODELAY must be switched on, otherwise,
* sending the ping message is delayed by Nagle algorithm, and the
* roundtrip measurement is less accurate (bigger).
*/
rcc->priv->latency_monitor.tcp_nodelay = true;
delay_val = red_stream_get_no_delay(rcc->priv->stream);
if (delay_val != -1) {
rcc->priv->latency_monitor.tcp_nodelay = delay_val;
if (!delay_val) {
red_stream_set_no_delay(rcc->priv->stream, TRUE);
}
}
}
rcc->init_send_data(SPICE_MSG_PING);
ping.id = rcc->priv->latency_monitor.id;
ping.timestamp = spice_get_monotonic_time_ns();
spice_marshall_msg_ping(rcc->priv->send_data.marshaller, &ping);
rcc->begin_send_message();
}
static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base)
{
RedEmptyMsgPipeItem *msg_pipe_item = SPICE_UPCAST(RedEmptyMsgPipeItem, base);
rcc->init_send_data(msg_pipe_item->msg);
rcc->begin_send_message();
}
static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item)
{
spice_assert(rcc->no_item_being_sent());
red_channel_client_reset_send_data(rcc);
switch (item->type) {
case RED_PIPE_ITEM_TYPE_SET_ACK:
red_channel_client_send_set_ack(rcc);
break;
case RED_PIPE_ITEM_TYPE_MIGRATE:
red_channel_client_send_migrate(rcc);
break;
case RED_PIPE_ITEM_TYPE_EMPTY_MSG:
red_channel_client_send_empty_msg(rcc, item);
break;
case RED_PIPE_ITEM_TYPE_PING:
red_channel_client_send_ping(rcc);
break;
case RED_PIPE_ITEM_TYPE_MARKER:
SPICE_UPCAST(MarkerPipeItem, item)->item_sent = true;
break;
default:
rcc->priv->channel->send_item(rcc, item);
break;
}
red_pipe_item_unref(item);
}
static void red_channel_client_restore_main_sender(RedChannelClient *rcc)
{
rcc->priv->send_data.marshaller = rcc->priv->send_data.main.marshaller;
rcc->priv->send_data.header.data = rcc->priv->send_data.main.header_data;
}
static void red_channel_client_msg_sent(RedChannelClient *rcc)
{
#ifndef _WIN32
int fd;
if (spice_marshaller_get_fd(rcc->priv->send_data.marshaller, &fd)) {
if (red_stream_send_msgfd(rcc->priv->stream, fd) < 0) {
perror("sendfd");
rcc->disconnect();
if (fd != -1)
close(fd);
return;
}
if (fd != -1)
close(fd);
}
#endif
red_channel_client_clear_sent_item(rcc);
if (red_channel_client_urgent_marshaller_is_active(rcc)) {
red_channel_client_restore_main_sender(rcc);
spice_assert(rcc->priv->send_data.header.data != NULL);
rcc->begin_send_message();
} else {
if (g_queue_is_empty(&rcc->priv->pipe)) {
/* It is possible that the socket will become idle, so we may be able to test latency */
red_channel_client_restart_ping_timer(rcc);
}
}
}
static gboolean red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item)
{
return g_queue_remove(&rcc->priv->pipe, item);
}
bool RedChannelClient::test_remote_common_cap(uint32_t cap) const
{
return test_capability(priv->remote_caps.common_caps,
priv->remote_caps.num_common_caps,
cap);
}
bool RedChannelClient::test_remote_cap(uint32_t cap) const
{
return test_capability(priv->remote_caps.caps,
priv->remote_caps.num_caps,
cap);
}
static void red_channel_client_push_ping(RedChannelClient *rcc)
{
spice_assert(rcc->priv->latency_monitor.state == PING_STATE_NONE);
rcc->priv->latency_monitor.state = PING_STATE_WARMUP;
rcc->priv->latency_monitor.warmup_was_sent = false;
rcc->priv->latency_monitor.id = rand();
rcc->pipe_add_type(RED_PIPE_ITEM_TYPE_PING);
rcc->pipe_add_type(RED_PIPE_ITEM_TYPE_PING);
}
static void red_channel_client_ping_timer(void *opaque)
{
RedChannelClient *rcc = (RedChannelClient *) opaque;
rcc->ref();
spice_assert(rcc->priv->latency_monitor.state == PING_STATE_TIMER);
red_channel_client_cancel_ping_timer(rcc);
#ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */
int so_unsent_size = 0;
/* retrieving the occupied size of the socket's tcp send buffer (unacked + unsent) */
if (ioctl(rcc->priv->stream->socket, SIOCOUTQ, &so_unsent_size) == -1) {
red_channel_warning(rcc->get_channel(),
"ioctl(SIOCOUTQ) failed, %s", strerror(errno));
}
if (so_unsent_size > 0) {
/* tcp send buffer is still occupied. rescheduling ping */
red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
rcc->unref();
return;
}
#endif /* ifdef HAVE_LINUX_SOCKIOS_H */
/* More portable alternative code path (less accurate but avoids bogus ioctls)*/
red_channel_client_push_ping(rcc);
rcc->unref();
}
static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc)
{
gboolean handle_acks;
g_object_get(rcc->priv->channel,
"handle-acks", &handle_acks,
NULL);
return (handle_acks && (rcc->priv->ack_data.messages_window >
rcc->priv->ack_data.client_window * 2));
}
/*
* When a connection is not alive (and we can't detect it via a socket error), we
* reach one of these 2 states:
* (1) Sending msgs is blocked: either writes return EAGAIN
* or we are missing MSGC_ACK from the client.
* (2) MSG_PING was sent without receiving a MSGC_PONG in reply.
*
* The connectivity_timer callback tests if the channel's state matches one of the above.
* In case it does, on the next time the timer is called, it checks if the connection has
* been idle during the time that passed since the previous timer call. If the connection
* has been idle, we consider the client as disconnected.
*/
static void red_channel_client_connectivity_timer(void *opaque)
{
RedChannelClient *rcc = (RedChannelClient *) opaque;
RedChannelClientConnectivityMonitor *monitor = &rcc->priv->connectivity_monitor;
int is_alive = TRUE;
rcc->ref();
if (monitor->state == CONNECTIVITY_STATE_BLOCKED) {
if (!monitor->received_bytes && !monitor->sent_bytes) {
if (!rcc->is_blocked() && !red_channel_client_waiting_for_ack(rcc)) {
spice_error("mismatch between rcc-state and connectivity-state");
}
spice_debug("rcc is blocked; connection is idle");
is_alive = FALSE;
}
} else if (monitor->state == CONNECTIVITY_STATE_WAIT_PONG) {
if (!monitor->received_bytes) {
if (rcc->priv->latency_monitor.state != PING_STATE_WARMUP &&
rcc->priv->latency_monitor.state != PING_STATE_LATENCY) {
spice_error("mismatch between rcc-state and connectivity-state");
}
spice_debug("rcc waits for pong; connection is idle");
is_alive = FALSE;
}
}
if (is_alive) {
monitor->received_bytes = false;
monitor->sent_bytes = false;
if (rcc->is_blocked() || red_channel_client_waiting_for_ack(rcc)) {
monitor->state = CONNECTIVITY_STATE_BLOCKED;
} else if (rcc->priv->latency_monitor.state == PING_STATE_WARMUP ||
rcc->priv->latency_monitor.state == PING_STATE_LATENCY) {
monitor->state = CONNECTIVITY_STATE_WAIT_PONG;
} else {
monitor->state = CONNECTIVITY_STATE_CONNECTED;
}
red_timer_start(monitor->timer, monitor->timeout);
} else {
monitor->state = CONNECTIVITY_STATE_DISCONNECTED;
red_channel_warning(rcc->priv->channel,
"rcc %p has been unresponsive for more than %u ms, disconnecting",
rcc, monitor->timeout);
rcc->disconnect();
}
rcc->unref();
}
void RedChannelClient::start_connectivity_monitoring(uint32_t timeout_ms)
{
SpiceCoreInterfaceInternal *core = priv->channel->get_core_interface();
if (!is_connected()) {
return;
}
spice_debug("trace");
spice_assert(timeout_ms > 0);
/*
* If latency_monitor is not active, we activate it in order to enable
* periodic ping messages so that we will be be able to identify a disconnected
* channel-client even if there are no ongoing channel specific messages
* on this channel.
*/
if (priv->latency_monitor.timer == NULL) {
priv->latency_monitor.timer = core->timer_add(
core, red_channel_client_ping_timer, this);
priv->latency_monitor.roundtrip = -1;
} else {
red_channel_client_cancel_ping_timer(this);
}
priv->latency_monitor.timeout = PING_TEST_TIMEOUT_MS;
if (!red_client_during_migrate_at_target(priv->client)) {
red_channel_client_start_ping_timer(this, PING_TEST_IDLE_NET_TIMEOUT_MS);
}
if (priv->connectivity_monitor.timer == NULL) {
priv->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED;
priv->connectivity_monitor.timer = core->timer_add(
core, red_channel_client_connectivity_timer, this);
priv->connectivity_monitor.timeout = timeout_ms;
if (!red_client_during_migrate_at_target(priv->client)) {
red_timer_start(priv->connectivity_monitor.timer,
priv->connectivity_monitor.timeout);
}
}
}
static void red_channel_client_event(int fd, int event, void *data)
{
RedChannelClient *rcc = (RedChannelClient *) data;
rcc->ref();
if (event & SPICE_WATCH_EVENT_READ) {
rcc->receive();
}
if (event & SPICE_WATCH_EVENT_WRITE) {
rcc->push();
}
rcc->unref();
}
static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header)
{
return GUINT32_FROM_LE(((SpiceDataHeader *)header->data)->size);
}
static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header)
{
return GUINT32_FROM_LE(((SpiceMiniDataHeader *)header->data)->size);
}
static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header)
{
return GUINT16_FROM_LE(((SpiceDataHeader *)header->data)->type);
}
static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header)
{
return GUINT16_FROM_LE(((SpiceMiniDataHeader *)header->data)->type);
}
static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
{
((SpiceDataHeader *)header->data)->type = GUINT16_TO_LE(type);
}
static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
{
((SpiceMiniDataHeader *)header->data)->type = GUINT16_TO_LE(type);
}
static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
{
((SpiceDataHeader *)header->data)->size = GUINT32_TO_LE(size);
}
static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
{
((SpiceMiniDataHeader *)header->data)->size = GUINT32_TO_LE(size);
}
static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
{
((SpiceDataHeader *)header->data)->serial = GUINT64_TO_LE(serial);
}
static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
{
/* ignore serial, not supported by mini header */
}
static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
{
((SpiceDataHeader *)header->data)->sub_list = GUINT32_TO_LE(sub_list);
}
static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
{
spice_error("attempt to set header sub list on mini header");
}
bool RedChannelClient::init()
{
GError *local_error = NULL;
SpiceCoreInterfaceInternal *core;
RedChannelClient *self = this;
if (!self->priv->stream) {
g_set_error_literal(&local_error,
SPICE_SERVER_ERROR,
SPICE_SERVER_ERROR_FAILED,
"Socket not available");
goto cleanup;
}
if (!self->config_socket()) {
g_set_error_literal(&local_error,
SPICE_SERVER_ERROR,
SPICE_SERVER_ERROR_FAILED,
"Unable to configure socket");
goto cleanup;
}
core = self->priv->channel->get_core_interface();
red_stream_set_core_interface(self->priv->stream, core);
self->priv->stream->watch =
core->watch_add(core, self->priv->stream->socket,
SPICE_WATCH_EVENT_READ,
red_channel_client_event,
self);
if (red_stream_get_family(self->priv->stream) != AF_UNIX) {
self->priv->latency_monitor.timer =
core->timer_add(core, red_channel_client_ping_timer, self);
if (!red_client_during_migrate_at_target(self->priv->client)) {
red_channel_client_start_ping_timer(self,
PING_TEST_IDLE_NET_TIMEOUT_MS);
}
self->priv->latency_monitor.roundtrip = -1;
self->priv->latency_monitor.timeout =
self->priv->monitor_latency ? PING_TEST_TIMEOUT_MS : PING_TEST_LONG_TIMEOUT_MS;
}
self->priv->channel->add_client(self);
if (!red_client_add_channel(self->priv->client, self, &local_error)) {
self->priv->channel->remove_client(self);
}
cleanup:
if (local_error) {
red_channel_warning(self->get_channel(),
"Failed to create channel client: %s",
local_error->message);
g_error_free(local_error);
}
return local_error == NULL;
}
static void
red_channel_client_watch_update_mask(RedChannelClient *rcc, int event_mask)
{
if (!rcc->priv->stream->watch) {
return;
}
if (rcc->priv->block_read) {
event_mask &= ~SPICE_WATCH_EVENT_READ;
}
red_watch_update_mask(rcc->priv->stream->watch, event_mask);
}
void RedChannelClient::block_read()
{
if (priv->block_read) {
return;
}
priv->block_read = true;
red_channel_client_watch_update_mask(this, SPICE_WATCH_EVENT_WRITE);
}
void RedChannelClient::unblock_read()
{
if (!priv->block_read) {
return;
}
priv->block_read = false;
red_channel_client_watch_update_mask(this, SPICE_WATCH_EVENT_READ|SPICE_WATCH_EVENT_WRITE);
}
static void red_channel_client_seamless_migration_done(RedChannelClient *rcc)
{
rcc->priv->wait_migrate_data = FALSE;
if (red_client_seamless_migration_done_for_channel(rcc->priv->client)) {
red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
if (rcc->priv->connectivity_monitor.timer) {
red_timer_start(rcc->priv->connectivity_monitor.timer,
rcc->priv->connectivity_monitor.timeout);
}
}
}
void RedChannelClient::semi_seamless_migration_complete()
{
red_channel_client_start_ping_timer(this, PING_TEST_IDLE_NET_TIMEOUT_MS);
}
bool RedChannelClient::is_waiting_for_migrate_data() const
{
return priv->wait_migrate_data;
}
void RedChannelClient::default_migrate(RedChannelClient *rcc)
{
red_channel_client_cancel_ping_timer(rcc);
red_timer_remove(rcc->priv->latency_monitor.timer);
rcc->priv->latency_monitor.timer = NULL;
red_timer_remove(rcc->priv->connectivity_monitor.timer);
rcc->priv->connectivity_monitor.timer = NULL;
rcc->pipe_add_type(RED_PIPE_ITEM_TYPE_MIGRATE);
}
void RedChannelClient::shutdown()
{
if (priv->stream && priv->stream->watch) {
red_watch_remove(priv->stream->watch);
priv->stream->watch = NULL;
::shutdown(priv->stream->socket, SHUT_RDWR);
}
}
static void red_channel_client_handle_outgoing(RedChannelClient *rcc)
{
RedStream *stream = rcc->priv->stream;
OutgoingMessageBuffer *buffer = &rcc->priv->outgoing;
ssize_t n;
if (!stream) {
return;
}
if (buffer->size == 0) {
buffer->size = red_channel_client_get_out_msg_size(rcc);
if (!buffer->size) { // nothing to be sent
return;
}
}
for (;;) {
struct iovec vec[IOV_MAX];
int vec_size =
red_channel_client_prepare_out_msg(rcc, vec, G_N_ELEMENTS(vec),
buffer->pos);
n = red_stream_writev(stream, vec, vec_size);
if (n == -1) {
switch (errno) {
case EAGAIN:
red_channel_client_set_blocked(rcc);
break;
case EINTR:
continue;
case EPIPE:
rcc->disconnect();
break;
default:
red_channel_warning(rcc->get_channel(), "%s", strerror(errno));
rcc->disconnect();
break;
}
return;
}
buffer->pos += n;
red_channel_client_data_sent(rcc, n);
if (buffer->pos == buffer->size) { // finished writing data
/* reset buffer before calling on_msg_done, since it
* can trigger another call to red_channel_client_handle_outgoing (when
* switching from the urgent marshaller to the main one */
buffer->pos = 0;
buffer->size = 0;
red_channel_client_msg_sent(rcc);
return;
}
}
}
/* return the number of bytes read. -1 in case of error */
static int red_peer_receive(RedStream *stream, uint8_t *buf, uint32_t size)
{
uint8_t *pos = buf;
while (size) {
int now;
/* if we don't have a watch it means socket has been shutdown
* shutdown read doesn't work as accepted - receive may return data afterward.
* check the flag before calling receive
*/
if (!stream->watch) {
return -1;
}
now = red_stream_read(stream, pos, size);
if (now <= 0) {
if (now == 0) {
return -1;
}
spice_assert(now == -1);
if (errno == EAGAIN) {
break;
} else if (errno == EINTR) {
continue;
} else if (errno != EPIPE) {
g_warning("%s", strerror(errno));
}
return -1;
}
size -= now;
pos += now;
}
return pos - buf;
}
static uint8_t *red_channel_client_parse(RedChannelClient *rcc, uint8_t *message, size_t message_size,
uint16_t message_type,
size_t *size_out, message_destructor_t *free_message)
{
RedChannel *channel = rcc->get_channel();
RedChannelClass *klass = RED_CHANNEL_GET_CLASS(channel);
return klass->parser(message, message + message_size, message_type,
SPICE_VERSION_MINOR, size_out, free_message);
}
// TODO: this implementation, as opposed to the old implementation in red_worker,
// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer
// arithmetic for the case where a single cb_read could return multiple messages. But
// this is suboptimal potentially. Profile and consider fixing.
static void red_channel_client_handle_incoming(RedChannelClient *rcc)
{
RedStream *stream = rcc->priv->stream;
IncomingMessageBuffer *buffer = &rcc->priv->incoming;
int bytes_read;
uint16_t msg_type;
uint32_t msg_size;
/* XXX: This needs further investigation as to the underlying cause, it happened
* after spicec disconnect (but not with spice-gtk) repeatedly. */
if (!stream) {
return;
}
for (;;) {
int ret_handle;
uint8_t *parsed;
size_t parsed_size;
message_destructor_t parsed_free = NULL;
RedChannel *channel = rcc->get_channel();
RedChannelClass *klass = RED_CHANNEL_GET_CLASS(channel);
if (buffer->header_pos < buffer->header.header_size) {
bytes_read = red_peer_receive(stream,
buffer->header.data + buffer->header_pos,
buffer->header.header_size - buffer->header_pos);
if (bytes_read == -1) {
rcc->disconnect();
return;
}
red_channel_client_data_read(rcc, bytes_read);
buffer->header_pos += bytes_read;
if (buffer->header_pos != buffer->header.header_size) {
return;
}
}
msg_size = buffer->header.get_msg_size(&buffer->header);
msg_type = buffer->header.get_msg_type(&buffer->header);
if (buffer->msg_pos < msg_size) {
if (!buffer->msg) {
buffer->msg = rcc->alloc_recv_buf(msg_type, msg_size);
if (buffer->msg == NULL && rcc->priv->block_read) {
// if we are blocked by flow control just return, message will be read
// when data will be available
return;
}
if (buffer->msg == NULL) {
red_channel_warning(channel, "ERROR: channel refused to allocate buffer.");
rcc->disconnect();
return;
}
}
bytes_read = red_peer_receive(stream,
buffer->msg + buffer->msg_pos,
msg_size - buffer->msg_pos);
if (bytes_read == -1) {
rcc->release_recv_buf(msg_type, msg_size, buffer->msg);
buffer->msg = NULL;
rcc->disconnect();
return;
}
red_channel_client_data_read(rcc, bytes_read);
buffer->msg_pos += bytes_read;
if (buffer->msg_pos != msg_size) {
return;
}
}
parsed = red_channel_client_parse(rcc,
buffer->msg, msg_size,
msg_type,
&parsed_size, &parsed_free);
if (parsed == NULL) {
red_channel_warning(channel, "failed to parse message type %d", msg_type);
rcc->release_recv_buf(msg_type, msg_size, buffer->msg);
buffer->msg = NULL;
rcc->disconnect();
return;
}
ret_handle = klass->handle_message(rcc, msg_type,
parsed_size, parsed);
if (parsed_free != NULL) {
parsed_free(parsed);
}
buffer->msg_pos = 0;
rcc->release_recv_buf(msg_type, msg_size, buffer->msg);
buffer->msg = NULL;
buffer->header_pos = 0;
if (!ret_handle) {
rcc->disconnect();
return;
}
}
}
void RedChannelClient::receive()
{
ref();
red_channel_client_handle_incoming(this);
unref();
}
void RedChannelClient::send()
{
ref();
red_channel_client_handle_outgoing(this);
unref();
}
static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc)
{
if (!rcc || rcc->is_blocked()
|| red_channel_client_waiting_for_ack(rcc)) {
return NULL;
}
return (RedPipeItem*) g_queue_pop_tail(&rcc->priv->pipe);
}
void RedChannelClient::push()
{
RedPipeItem *pipe_item;
if (priv->during_send) {
return;
}
priv->during_send = TRUE;
ref();
if (is_blocked()) {
send();
}
if (!no_item_being_sent() && !is_blocked()) {
red_channel_client_set_blocked(this);
red_channel_warning(get_channel(),
"ERROR: an item waiting to be sent and not blocked");
}
while ((pipe_item = red_channel_client_pipe_item_get(this))) {
red_channel_client_send_item(this, pipe_item);
}
/* prepare_pipe_add() will reenable WRITE events when the priv->pipe is empty
* ack_zero_messages_window() will reenable WRITE events
* if we were waiting for acks to be received
* If we don't remove WRITE if we are waiting for ack we will be keep
* notified that we can write and we then exit (see pipe_item_get) as we
* are waiting for the ack consuming CPU in a tight loop
*/
if ((no_item_being_sent() && g_queue_is_empty(&priv->pipe)) ||
red_channel_client_waiting_for_ack(this)) {
red_channel_client_watch_update_mask(this, SPICE_WATCH_EVENT_READ);
/* channel has no pending data to send so now we can flush data in
* order to avoid data stall into buffers in case of manual
* flushing
* We need to flush also in case of ack as it is possible
* that for a long train of small messages the message that would
* cause the client to send the ack is still in the queue
*/
red_stream_flush(priv->stream);
}
priv->during_send = FALSE;
unref();
}
int RedChannelClient::get_roundtrip_ms() const
{
if (priv->latency_monitor.roundtrip < 0) {
return priv->latency_monitor.roundtrip;
}
return priv->latency_monitor.roundtrip / NSEC_PER_MILLISEC;
}
void RedChannelClient::init_outgoing_messages_window()
{
priv->ack_data.messages_window = 0;
push();
}
static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping)
{
uint64_t now;
/* ignoring unexpected pongs, or post-migration pongs for pings that
* started just before migration */
if (ping->id != rcc->priv->latency_monitor.id) {
spice_warning("ping-id (%u)!= pong-id %u",
rcc->priv->latency_monitor.id, ping->id);
return;
}
now = spice_get_monotonic_time_ns();
if (rcc->priv->latency_monitor.state == PING_STATE_WARMUP) {
rcc->priv->latency_monitor.state = PING_STATE_LATENCY;
return;
} else if (rcc->priv->latency_monitor.state != PING_STATE_LATENCY) {
spice_warning("unexpected");
return;
}
/* set TCP_NODELAY=0, in case we reverted it for the test*/
if (!rcc->priv->latency_monitor.tcp_nodelay) {
red_stream_set_no_delay(rcc->priv->stream, FALSE);
}
/*
* The real network latency shouldn't change during the connection. However,
* the measurements can be bigger than the real roundtrip due to other
* threads or processes that are utilizing the network. We update the roundtrip
* measurement with the minimal value we encountered till now.
*/
if (rcc->priv->latency_monitor.roundtrip < 0 ||
now - ping->timestamp < rcc->priv->latency_monitor.roundtrip) {
rcc->priv->latency_monitor.roundtrip = now - ping->timestamp;
spice_debug("update roundtrip %.2f(ms)", ((double)rcc->priv->latency_monitor.roundtrip)/NSEC_PER_MILLISEC);
}
rcc->priv->latency_monitor.last_pong_time = now;
rcc->priv->latency_monitor.state = PING_STATE_NONE;
red_channel_client_start_ping_timer(rcc, rcc->priv->latency_monitor.timeout);
}
static void red_channel_client_handle_migrate_flush_mark(RedChannelClient *rcc)
{
RedChannel *channel = rcc->get_channel();
RedChannelClass *klass = RED_CHANNEL_GET_CLASS(channel);
if (klass->handle_migrate_flush_mark) {
klass->handle_migrate_flush_mark(rcc);
}
}
// TODO: the whole migration is broken with multiple clients. What do we want to do?
// basically just
// 1) source send mark to all
// 2) source gets at various times the data (waits for all)
// 3) source migrates to target
// 4) target sends data to all
// So need to make all the handlers work with per channel/client data (what data exactly?)
static void red_channel_client_handle_migrate_data(RedChannelClient *rcc,
uint32_t size,
void *message)
{
RedChannel *channel = rcc->get_channel();
RedChannelClass *klass = RED_CHANNEL_GET_CLASS(channel);
red_channel_debug(channel, "rcc %p size %u", rcc, size);
if (!klass->handle_migrate_data) {
return;
}
if (!rcc->is_waiting_for_migrate_data()) {
spice_channel_client_error(rcc, "unexpected");
return;
}
if (klass->handle_migrate_data_get_serial) {
red_channel_client_set_message_serial(rcc,
klass->handle_migrate_data_get_serial(rcc, size, message));
}
if (!klass->handle_migrate_data(rcc, size, message)) {
spice_channel_client_error(rcc, "handle_migrate_data failed");
return;
}
red_channel_client_seamless_migration_done(rcc);
}
bool RedChannelClient::handle_message(RedChannelClient *rcc, uint16_t type,
uint32_t size, void *message)
{
switch (type) {
case SPICE_MSGC_ACK_SYNC:
rcc->priv->ack_data.client_generation = ((SpiceMsgcAckSync *) message)->generation;
break;
case SPICE_MSGC_ACK:
if (rcc->priv->ack_data.client_generation == rcc->priv->ack_data.generation) {
rcc->priv->ack_data.messages_window -= rcc->priv->ack_data.client_window;
red_channel_client_watch_update_mask(rcc,
SPICE_WATCH_EVENT_READ|SPICE_WATCH_EVENT_WRITE);
rcc->push();
}
break;
case SPICE_MSGC_DISCONNECTING:
break;
case SPICE_MSGC_MIGRATE_FLUSH_MARK:
if (!rcc->priv->wait_migrate_flush_mark) {
spice_error("unexpected flush mark");
return FALSE;
}
red_channel_client_handle_migrate_flush_mark(rcc);
rcc->priv->wait_migrate_flush_mark = FALSE;
break;
case SPICE_MSGC_MIGRATE_DATA:
red_channel_client_handle_migrate_data(rcc, size, message);
break;
case SPICE_MSGC_PONG:
red_channel_client_handle_pong(rcc, (SpiceMsgPing*) message);
break;
default:
red_channel_warning(rcc->get_channel(), "invalid message type %u",
type);
return FALSE;
}
return TRUE;
}
void RedChannelClient::init_send_data(uint16_t msg_type)
{
spice_assert(no_item_being_sent());
spice_assert(msg_type != 0);
priv->send_data.header.set_msg_type(&priv->send_data.header, msg_type);
}
void RedChannelClient::begin_send_message()
{
SpiceMarshaller *m = priv->send_data.marshaller;
// TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state)
if (priv->send_data.header.get_msg_type(&priv->send_data.header) == 0) {
red_channel_warning(get_channel(), "BUG: header->type == 0");
return;
}
stat_inc_counter(priv->out_messages, 1);
/* canceling the latency test timer till the nework is idle */
red_channel_client_cancel_ping_timer(this);
spice_marshaller_flush(m);
priv->send_data.size = spice_marshaller_get_total_size(m);
priv->send_data.header.set_msg_size(&priv->send_data.header,
priv->send_data.size -
priv->send_data.header.header_size);
priv->send_data.header.set_msg_serial(&priv->send_data.header,
++priv->send_data.last_sent_serial);
priv->ack_data.messages_window++;
priv->send_data.header.data = NULL; /* avoid writing to this until we have a new message */
send();
}
SpiceMarshaller *RedChannelClient::switch_to_urgent_sender()
{
spice_assert(no_item_being_sent());
spice_assert(priv->send_data.header.data != NULL);
priv->send_data.main.header_data = priv->send_data.header.data;
priv->send_data.marshaller = priv->send_data.urgent.marshaller;
red_channel_client_reset_send_data(this);
return priv->send_data.marshaller;
}
uint64_t RedChannelClient::get_message_serial() const
{
return priv->send_data.last_sent_serial + 1;
}
static void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial)
{
rcc->priv->send_data.last_sent_serial = serial - 1;
}
static inline gboolean prepare_pipe_add(RedChannelClient *rcc, RedPipeItem *item)
{
spice_assert(rcc && item);
if (SPICE_UNLIKELY(!rcc->is_connected())) {
spice_debug("rcc is disconnected %p", rcc);
red_pipe_item_unref(item);
return FALSE;
}
if (g_queue_is_empty(&rcc->priv->pipe)) {
red_channel_client_watch_update_mask(rcc,
SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE);
}
return TRUE;
}
void RedChannelClient::pipe_add(RedPipeItem *item)
{
if (!prepare_pipe_add(this, item)) {
return;
}
g_queue_push_head(&priv->pipe, item);
}
void RedChannelClient::pipe_add_push(RedPipeItem *item)
{
pipe_add(item);
push();
}
void RedChannelClient::pipe_add_after_pos(RedPipeItem *item,
GList *pipe_item_pos)
{
spice_assert(pipe_item_pos);
if (!prepare_pipe_add(this, item)) {
return;
}
g_queue_insert_after(&priv->pipe, pipe_item_pos, item);
}
static void
red_channel_client_pipe_add_before_pos(RedChannelClient *rcc,
RedPipeItem *item,
GList *pipe_item_pos)
{
spice_assert(pipe_item_pos);
if (!prepare_pipe_add(rcc, item)) {
return;
}
g_queue_insert_before(&rcc->priv->pipe, pipe_item_pos, item);
}
void RedChannelClient::pipe_add_after(RedPipeItem *item, RedPipeItem *pos)
{
GList *prev;
spice_assert(pos);
prev = g_queue_find(&priv->pipe, pos);
g_return_if_fail(prev != NULL);
pipe_add_after_pos(item, prev);
}
int RedChannelClient::pipe_item_is_linked(RedPipeItem *item)
{
return g_queue_find(&priv->pipe, item) != NULL;
}
void RedChannelClient::pipe_add_tail(RedPipeItem *item)
{
if (!prepare_pipe_add(this, item)) {
return;
}
g_queue_push_tail(&priv->pipe, item);
}
void RedChannelClient::pipe_add_type(int pipe_item_type)
{
RedPipeItem *item = g_new(RedPipeItem, 1);
red_pipe_item_init(item, pipe_item_type);
pipe_add(item);
}
RedPipeItem *RedChannelClient::new_empty_msg(int msg_type)
{
RedEmptyMsgPipeItem *item = g_new(RedEmptyMsgPipeItem, 1);
red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG);
item->msg = msg_type;
return &item->base;
}
void RedChannelClient::pipe_add_empty_msg(int msg_type)
{
pipe_add(new_empty_msg(msg_type));
}
gboolean RedChannelClient::pipe_is_empty()
{
return g_queue_is_empty(&priv->pipe);
}
uint32_t RedChannelClient::get_pipe_size()
{
return g_queue_get_length(&priv->pipe);
}
GQueue* RedChannelClient::get_pipe()
{
return &priv->pipe;
}
bool RedChannelClient::is_mini_header() const
{
return priv->is_mini_header;
}
bool RedChannelClient::is_connected() const
{
return priv->channel
&& (g_list_find(priv->channel->get_clients(), this) != NULL);
}
static void red_channel_client_clear_sent_item(RedChannelClient *rcc)
{
rcc->priv->send_data.blocked = FALSE;
rcc->priv->send_data.size = 0;
spice_marshaller_reset(rcc->priv->send_data.marshaller);
}
// TODO: again - what is the context exactly? this happens in channel disconnect. but our
// current red_channel_shutdown also closes the socket - is there a socket to close?
// are we reading from an fd here? arghh
static void red_channel_client_pipe_clear(RedChannelClient *rcc)
{
RedPipeItem *item;
red_channel_client_clear_sent_item(rcc);
while ((item = (RedPipeItem*) g_queue_pop_head(&rcc->priv->pipe)) != NULL) {
red_pipe_item_unref(item);
}
}
void RedChannelClient::ack_zero_messages_window()
{
red_channel_client_watch_update_mask(this,
SPICE_WATCH_EVENT_READ|SPICE_WATCH_EVENT_WRITE);
priv->ack_data.messages_window = 0;
}
void RedChannelClient::ack_set_client_window(int client_window)
{
priv->ack_data.client_window = client_window;
}
void RedChannelClient::push_set_ack()
{
pipe_add_type(RED_PIPE_ITEM_TYPE_SET_ACK);
}
void RedChannelClient::disconnect()
{
RedChannel *channel = priv->channel;
if (!is_connected()) {
return;
}
red_channel_client_pipe_clear(this);
shutdown();
red_timer_remove(priv->latency_monitor.timer);
priv->latency_monitor.timer = NULL;
red_timer_remove(priv->connectivity_monitor.timer);
priv->connectivity_monitor.timer = NULL;
channel->remove_client(this);
on_disconnect();
// remove client from RedClient
// NOTE this may trigger the free of the object, if we are in a watch/timer
// we should make sure we keep a reference
red_client_remove_channel(this);
}
bool RedChannelClient::is_blocked() const
{
return priv->send_data.blocked;
}
int RedChannelClient::send_message_pending()
{
return priv->send_data.header.get_msg_type(&priv->send_data.header) != 0;
}
SpiceMarshaller *RedChannelClient::get_marshaller()
{
return priv->send_data.marshaller;
}
RedStream *RedChannelClient::get_stream()
{
return priv->stream;
}
RedClient *RedChannelClient::get_client()
{
return priv->client;
}
void RedChannelClient::set_header_sub_list(uint32_t sub_list)
{
priv->send_data.header.set_msg_sub_list(&priv->send_data.header, sub_list);
}
/* TODO: more evil sync stuff. anything with the word wait in it's name. */
bool RedChannelClient::wait_pipe_item_sent(GList *item_pos, int64_t timeout)
{
uint64_t end_time;
bool item_sent;
spice_debug("trace");
if (timeout != -1) {
end_time = spice_get_monotonic_time_ns() + timeout;
} else {
end_time = UINT64_MAX;
}
MarkerPipeItem *mark_item = g_new0(MarkerPipeItem, 1);
red_pipe_item_init(&mark_item->base, RED_PIPE_ITEM_TYPE_MARKER);
mark_item->item_sent = false;
red_pipe_item_ref(&mark_item->base);
red_channel_client_pipe_add_before_pos(this, &mark_item->base, item_pos);
for (;;) {
receive();
push();
if (mark_item->item_sent ||
(timeout != -1 && spice_get_monotonic_time_ns() >= end_time)) {
break;
}
usleep(CHANNEL_BLOCKED_SLEEP_DURATION);
}
item_sent = mark_item->item_sent;
red_pipe_item_unref(&mark_item->base);
if (!item_sent) {
// still on the queue
spice_warning("timeout");
return FALSE;
}
return TRUE;
}
bool RedChannelClient::wait_outgoing_item(int64_t timeout)
{
uint64_t end_time;
int blocked;
if (!is_blocked()) {
return TRUE;
}
if (timeout != -1) {
end_time = spice_get_monotonic_time_ns() + timeout;
} else {
end_time = UINT64_MAX;
}
spice_debug("blocked");
do {
usleep(CHANNEL_BLOCKED_SLEEP_DURATION);
receive();
send();
} while ((blocked = is_blocked()) &&
(timeout == -1 || spice_get_monotonic_time_ns() < end_time));
if (blocked) {
spice_warning("timeout");
return FALSE;
} else {
spice_assert(no_item_being_sent());
return TRUE;
}
}
bool RedChannelClient::no_item_being_sent() const
{
return priv->send_data.size == 0;
}
void RedChannelClient::pipe_remove_and_release(RedPipeItem *item)
{
if (red_channel_client_pipe_remove(this, item)) {
red_pipe_item_unref(item);
}
}
void RedChannelClient::pipe_remove_and_release_pos(GList *item_pos)
{
RedPipeItem *item = (RedPipeItem*) item_pos->data;
g_queue_delete_link(&priv->pipe, item_pos);
red_pipe_item_unref(item);
}
/* client mutex should be locked before this call */
bool RedChannelClient::set_migration_seamless()
{
gboolean ret = FALSE;
uint32_t flags;
g_object_get(priv->channel,
"migration-flags", &flags,
NULL);
if (flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) {
priv->wait_migrate_data = TRUE;
ret = TRUE;
}
red_channel_debug(priv->channel, "rcc %p wait data %d", this,
priv->wait_migrate_data);
return ret;
}
GQuark spice_server_error_quark(void)
{
return g_quark_from_static_string("spice-server-error-quark");
}