/* Copyright (C) 2009-2016 Red Hat, Inc. This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, see . */ #include #include #include #include #include #include #include #ifndef _WIN32 #include #endif #include "dispatcher.h" #define DISPATCHER_MESSAGE_TYPE_CUSTOM 0x7fffffffu /* structure to store message header information. * That structure is sent through a socketpair so it's optimized * to be transfered via sockets. * Is also packaged to not leave holes in both 32 and 64 environments * so memory instrumentation tools should not find uninitialised bytes. */ struct DispatcherMessage { dispatcher_handle_message handler; uint32_t size; uint32_t type:31; uint32_t ack:1; }; struct DispatcherPrivate { SPICE_CXX_GLIB_ALLOCATOR DispatcherPrivate(uint32_t max_message_type): max_message_type(max_message_type) { } ~DispatcherPrivate(); int recv_fd; int send_fd; pthread_mutex_t lock; DispatcherMessage *messages; const guint max_message_type; void *payload; /* allocated as max of message sizes */ size_t payload_size; /* used to track realloc calls */ void *opaque; dispatcher_handle_any_message any_handler; }; DispatcherPrivate::~DispatcherPrivate() { g_free(messages); socket_close(send_fd); socket_close(recv_fd); pthread_mutex_destroy(&lock); g_free(payload); } Dispatcher::~Dispatcher() { } Dispatcher::Dispatcher(uint32_t max_message_type): priv(new DispatcherPrivate(max_message_type)) { int channels[2]; if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { spice_error("socketpair failed %s", strerror(errno)); return; } pthread_mutex_init(&priv->lock, NULL); priv->recv_fd = channels[0]; priv->send_fd = channels[1]; priv->messages = g_new0(DispatcherMessage, priv->max_message_type); } #define ACK 0xffffffff /* * read_safe * helper. reads until size bytes accumulated in buf, if an error other then * EINTR is encountered returns -1, otherwise returns 0. * @block if 1 the read will block (the fd is always blocking). * if 0 poll first, return immediately if no bytes available, otherwise * read size in blocking mode. */ static int read_safe(int fd, uint8_t *buf, size_t size, int block) { int read_size = 0; int ret; if (size == 0) { return 0; } if (!block) { #ifndef _WIN32 struct pollfd pollfd = {.fd = fd, .events = POLLIN, .revents = 0}; while ((ret = poll(&pollfd, 1, 0)) == -1) { if (errno == EINTR) { spice_debug("EINTR in poll"); continue; } spice_error("poll failed"); return -1; } if (!(pollfd.revents & POLLIN)) { return 0; } #else struct timeval tv = { 0, 0 }; fd_set fds; FD_ZERO(&fds); FD_SET(fd, &fds); if (select(1, &fds, NULL, NULL, &tv) < 1) { return 0; } #endif } while (read_size < size) { ret = socket_read(fd, buf + read_size, size - read_size); if (ret == -1) { if (errno == EINTR) { spice_debug("EINTR in read"); continue; } #ifdef _WIN32 // Windows turns this socket not-blocking if (errno == EAGAIN) { fd_set fds; FD_ZERO(&fds); FD_SET(fd, &fds); select(1, &fds, NULL, NULL, NULL); continue; } #endif return -1; } if (ret == 0) { spice_error("broken pipe on read"); return -1; } read_size += ret; } return read_size; } /* * write_safe * @return -1 for error, otherwise number of written bytes. may be zero. */ static int write_safe(int fd, uint8_t *buf, size_t size) { int written_size = 0; int ret; while (written_size < size) { ret = socket_write(fd, buf + written_size, size - written_size); if (ret == -1) { if (errno != EINTR) { return -1; } spice_debug("EINTR in write"); continue; } written_size += ret; } return written_size; } int Dispatcher::handle_single_read(Dispatcher *dispatcher) { int ret; DispatcherMessage msg[1]; void *payload; uint32_t ack = ACK; if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)msg, sizeof(msg), 0)) == -1) { g_warning("error reading from dispatcher: %d", errno); return 0; } if (ret == 0) { /* no message */ return 0; } if (G_UNLIKELY(msg->size > dispatcher->priv->payload_size)) { dispatcher->priv->payload = g_realloc(dispatcher->priv->payload, msg->size); dispatcher->priv->payload_size = msg->size; } payload = dispatcher->priv->payload; if (read_safe(dispatcher->priv->recv_fd, (uint8_t*) payload, msg->size, 1) == -1) { g_warning("error reading from dispatcher: %d", errno); /* TODO: close socketpair? */ return 0; } if (dispatcher->priv->any_handler && msg->type != DISPATCHER_MESSAGE_TYPE_CUSTOM) { dispatcher->priv->any_handler(dispatcher->priv->opaque, msg->type, payload); } if (msg->handler) { msg->handler(dispatcher->priv->opaque, payload); } else { g_warning("error: no handler for message type %d", msg->type); } if (msg->ack) { if (write_safe(dispatcher->priv->recv_fd, (uint8_t*)&ack, sizeof(ack)) == -1) { g_warning("error writing ack for message %d", msg->type); /* TODO: close socketpair? */ } } return 1; } /* * handle_event * doesn't handle being in the middle of a message. all reads are blocking. */ void Dispatcher::handle_event(int fd, int event, Dispatcher* dispatcher) { while (dispatcher->handle_single_read(dispatcher)) { } } void Dispatcher::send_message_internal(const DispatcherMessage* msg, void *payload) { uint32_t ack; int send_fd = priv->send_fd; pthread_mutex_lock(&priv->lock); if (write_safe(send_fd, (uint8_t*)msg, sizeof(*msg)) == -1) { g_warning("error: failed to send message header for message %d", msg->type); goto unlock; } if (write_safe(send_fd, (uint8_t*) payload, msg->size) == -1) { g_warning("error: failed to send message body for message %d", msg->type); goto unlock; } if (msg->ack) { if (read_safe(send_fd, (uint8_t*)&ack, sizeof(ack), 1) == -1) { g_warning("error: failed to read ack"); } else if (ack != ACK) { g_warning("error: got wrong ack value in dispatcher " "for message %d\n", msg->type); /* TODO handling error? */ } } unlock: pthread_mutex_unlock(&priv->lock); } void Dispatcher::send_message(uint32_t message_type, void *payload) { DispatcherMessage *msg; assert(priv->max_message_type > message_type); assert(priv->messages[message_type].handler); msg = &priv->messages[message_type]; send_message_internal(msg, payload); } void Dispatcher::send_message_custom(dispatcher_handle_message handler, void *payload, uint32_t payload_size, bool ack) { DispatcherMessage msg = { .handler = handler, .size = payload_size, .type = DISPATCHER_MESSAGE_TYPE_CUSTOM, .ack = ack, }; send_message_internal(&msg, payload); } void Dispatcher::register_handler(uint32_t message_type, dispatcher_handle_message handler, size_t size, bool ack) { DispatcherMessage *msg; assert(message_type < priv->max_message_type); assert(priv->messages[message_type].handler == NULL); msg = &priv->messages[message_type]; msg->handler = handler; msg->size = size; msg->type = message_type; msg->ack = ack; if (msg->size > priv->payload_size) { priv->payload = g_realloc(priv->payload, msg->size); priv->payload_size = msg->size; } } void Dispatcher::register_universal_handler(dispatcher_handle_any_message any_handler) { priv->any_handler = any_handler; } SpiceWatch *Dispatcher::create_watch(SpiceCoreInterfaceInternal *core) { return core->watch_new(priv->recv_fd, SPICE_WATCH_EVENT_READ, handle_event, this); } void Dispatcher::set_opaque(void *opaque) { priv->opaque = opaque; }