/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
Copyright (C) 2019 Red Hat, Inc.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
typedef struct sockaddr SA;
static GThread *recorder_comm_thr;
static bool agent_terminated = false;
static int terminate_efd = -1;
static FILE *communication_f = NULL;
#define NB_MAX_RECORDERS 16
static recorder_info *recorders[NB_MAX_RECORDERS];
static uint32_t nb_recorders = 0;
static forward_quality_cb_t forward_quality_cb;
static void *forward_quality_cb_data;
static on_connect_cb_t on_connect_cb;
static void *on_connect_cb_data;
static uintptr_t recorder_tick(void);
#ifndef RECORDER_HZ
#define RECORDER_HZ 1000000
#endif // RECORDER_HZ
static GMutex mutex_socket;
static int agent_initialize_communication(int socket)
{
uint32_t i;
int ret = -1;
FILE *socket_f;
g_mutex_lock(&mutex_socket);
if (communication_f != NULL) {
g_warning("A client is already connected, rejecting the connection.");
goto unlock;
}
socket_f = fdopen(socket, "w+b");
fprintf(socket_f, "Recorders: ");
for (i = 0; i < nb_recorders; i++) {
g_debug("Sending %s", recorders[i]->name);
fprintf(socket_f, "%s;", recorders[i]->name);
}
fprintf(socket_f, "\n");
fflush(socket_f);
for (i = 0; i < nb_recorders; i++) {
char enable;
if (read(socket, &enable, sizeof(enable)) != sizeof(enable)) {
g_warning("Invalid read on the client socket");
goto unlock;
}
if (enable != '0' && enable != '1') {
g_critical("Invalid enable-value received for recorder '%s': %u",
recorders[i]->name, enable);
goto unlock;
}
if (enable == '0') {
continue;
}
recorders[i]->trace = 1;
g_info("Enable recorder '%s'", recorders[i]->name);
}
if (on_connect_cb && on_connect_cb(on_connect_cb_data)) {
goto unlock;
}
communication_f = socket_f;
ret = 0;
unlock:
g_mutex_unlock(&mutex_socket);
return ret;
}
static void agent_finalize_communication(int socket)
{
uint32_t i;
g_info("Communication socket closed.");
g_mutex_lock(&mutex_socket);
g_assert(socket == fileno(communication_f));
fclose(communication_f);
communication_f = NULL;
for (i = 0; i < nb_recorders; i++) {
recorders[i]->trace = 0;
}
g_mutex_unlock(&mutex_socket);
}
static void forward_quality(const char *quality)
{
if (!forward_quality_cb) {
g_warning("Quality: No callback set, dropping the message (%s).", quality);
return;
}
g_info("Quality: Forwarding '%s'", quality);
forward_quality_cb(forward_quality_cb_data, quality);
}
static int agent_process_communication(int socket)
{
static char msg_in[128];
static long unsigned int len = 0;
g_assert(socket == fileno(communication_f));
int nbytes = read(socket, msg_in + len, 1);
if (nbytes < 0 && errno == EINTR) {
return 0;
}
if (nbytes <= 0) {
agent_finalize_communication(socket);
return -1; // socket closed
}
if (msg_in[len] == '\0') {
// process quality indicator
forward_quality(msg_in);
len = 0;
return 0;
}
len += nbytes;
if (len >= sizeof(msg_in) - 1) {
msg_in[sizeof(msg_in) - 1] = '\0';
g_warning("Invalid message received (too long?): %s", msg_in);
len = 0;
}
return 0;
}
static int make_socket(guint port)
{
struct sockaddr_in servaddr;
int listen_socket = socket(AF_INET, SOCK_STREAM, 0);
if (listen_socket == -1) {
g_critical("socket creation failed");
return -1;
}
int enable = 1;
if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) {
g_critical("setsockopt(SO_REUSEADDR) failed");
close(listen_socket);
return -1;
}
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if (bind(listen_socket, (SA *) &servaddr, sizeof(servaddr)) != 0) {
g_critical("socket bind failed");
close(listen_socket);
return -1;
}
return listen_socket;
}
static gpointer handle_communications(gpointer user_data)
{
struct pollfd fds[3];
int nb_fd = 0;
int listen_socket;
int i;
guint port = GPOINTER_TO_UINT(user_data);
listen_socket = make_socket(port);
if (listen_socket < 0) {
return NULL;
}
g_debug("Listening!");
if ((listen(listen_socket, 1)) != 0) {
g_critical("listen failed: %m");
return NULL;
}
fds[0].fd = terminate_efd;
fds[0].events = POLLIN;
fds[1].fd = listen_socket;
fds[1].events = POLLIN;
nb_fd = 2;
while (!agent_terminated) {
/* Block until input arrives on one or more active sockets. */
int ret = poll(fds, nb_fd, -1);
if (ret < 0) {
g_critical("poll failed: %m");
break;
}
/* Service all the sockets with input pending. */
for (i = 0; i < nb_fd; i++) {
int fd = fds[i].fd;
if (fd == terminate_efd) {
if (fds[i].revents & POLLIN) {
g_assert(agent_terminated);
break;
}
} else if (fd == listen_socket) {
if (fds[i].revents & ~POLLIN) {
g_critical("server socket closed");
break;
}
if (!(fds[i].revents & POLLIN)) {
continue;
}
/* Connection request on original socket. */
int new_fd = accept(listen_socket, NULL, NULL);
if (new_fd < 0) {
g_critical("accept failed: %m");
break;
}
if (nb_fd == 3) {
close(new_fd);
g_warning("Too many clients accepted ...");
continue;
}
g_debug("Agent Interface: client connected!");
if (agent_initialize_communication(new_fd)) {
close(new_fd);
g_warning("Initialization failed ...");
continue;
}
fds[nb_fd].fd = new_fd;
fds[nb_fd].events = POLLIN;
nb_fd++;
/* fds array modified, restart the poll. */
break;
} else {
if (!(fds[i].revents & POLLIN)) {
continue;
}
/* Data arriving on an already-connected socket. */
if (agent_process_communication(fd) < 0) {
nb_fd--;
}
}
}
}
close(terminate_efd);
close(listen_socket);
g_info("Agent interface thread: bye!");
return NULL;
}
static void recorder_deregister(void);
static void recorder_initialization(unsigned int port)
{
GError *error = NULL;
terminate_efd = eventfd(0, 0);
if (terminate_efd == -1) {
g_critical("eventfd failed: %m");
return;
}
recorder_comm_thr = g_thread_try_new("smart_agent_interface",
handle_communications,
GUINT_TO_POINTER((guint) port), &error);
if (error) {
g_assert(!recorder_comm_thr);
g_critical("Error: Could not start the agent interface thread: %s", error->message);
g_error_free(error);
return;
}
atexit(recorder_deregister);
}
static void recorder_interrupt_communications(void)
{
agent_terminated = true;
uint64_t msg = 1;
ssize_t s = write(terminate_efd, &msg, sizeof(uint64_t));
if (s != sizeof(uint64_t)) {
g_warning("failed to send recorder thread termination event: %m");
}
}
static void recorder_deregister(void)
{
if (recorder_comm_thr) {
recorder_interrupt_communications();
g_thread_join(recorder_comm_thr);
recorder_comm_thr = NULL;
}
}
void recorder_activate(recorder_info *recorder)
{
if (nb_recorders >= NB_MAX_RECORDERS) {
g_critical("Too many recorders configured (nb max: %d)", NB_MAX_RECORDERS);
return;
}
recorders[nb_recorders] = recorder;
nb_recorders++;
}
static void do_send_entry(FILE *dest, recorder_info *info, recorder_entry *entry, va_list args)
{
fprintf(dest, "Name: %s\nFunction: %s\nTime: %lu\n",
info->name, entry->where, entry->timestamp);
vfprintf(dest, entry->format, args);
fprintf(dest, "\n\n");
fflush(dest);
}
static void recorder_trace_entry(recorder_info *info, recorder_entry *entry, ...)
// ----------------------------------------------------------------------------
// Show a recorder entry when a trace is enabled
// ----------------------------------------------------------------------------
{
va_list args;
if (strchr(entry->format, '\n') != NULL) {
g_critical("Agent records cannot contain '\n' char ... (%s)", entry->where);
return;
}
// send info/entry to the socket
g_mutex_lock(&mutex_socket);
if (communication_f == NULL) {
g_mutex_unlock(&mutex_socket);
return;
}
va_start(args, entry);
do_send_entry(communication_f, info, entry, args);
va_end(args);
if (g_strcmp0(g_getenv("SPICE_AGENT_LOG_RECORDS"), "1") == 0) {
va_start(args, entry);
do_send_entry(stderr, info, entry, args);
va_end(args);
}
g_mutex_unlock(&mutex_socket);
}
void agent_interface_start(unsigned int port)
{
g_info("Launch on port %u", port);
recorder_initialization(port);
}
void agent_interface_set_forward_quality_cb(forward_quality_cb_t cb, void *data)
{
g_debug("Received forward_quality callback");
forward_quality_cb = cb;
forward_quality_cb_data = data;
}
void agent_interface_set_on_connect_cb(on_connect_cb_t cb, void *data)
{
g_debug("Received on_connect callback");
on_connect_cb = cb;
on_connect_cb_data = data;
}
void recorder_append(recorder_info *rec,
const char *where,
const char *format,
uintptr_t a0,
uintptr_t a1,
uintptr_t a2,
uintptr_t a3)
// ----------------------------------------------------------------------------
// Enter a record entry in ring buffer with given set of args
// ----------------------------------------------------------------------------
{
recorder_entry entry;
if (!rec->trace) {
return;
}
entry.format = format;
entry.timestamp = recorder_tick();
entry.where = where;
recorder_trace_entry(rec, &entry, a0, a1, a2, a3);
}
void recorder_append2(recorder_info *rec,
const char *where,
const char *format,
uintptr_t a0,
uintptr_t a1,
uintptr_t a2,
uintptr_t a3,
uintptr_t a4,
uintptr_t a5,
uintptr_t a6,
uintptr_t a7)
// ----------------------------------------------------------------------------
// Enter a double record (up to 8 args)
// ----------------------------------------------------------------------------
{
recorder_entry entry;
if (!rec->trace) {
return;
}
entry.format = format;
entry.timestamp = recorder_tick();
entry.where = where;
recorder_trace_entry(rec, &entry, a0, a1, a2, a3, a4, a5, a6, a7);
}
// ============================================================================
//
// Support functions
//
// ============================================================================
static uintptr_t recorder_tick(void)
// ----------------------------------------------------------------------------
// Return the "ticks" as stored in the recorder
// ----------------------------------------------------------------------------
{
struct timeval t;
gettimeofday(&t, NULL);
return t.tv_sec * RECORDER_HZ + t.tv_usec / (1000000 / RECORDER_HZ);
}