This commit is contained in:
Yonit Halperin 2009-10-16 00:21:43 +02:00 committed by Yaniv Kamay
parent 308e4545cb
commit ef213c66c1
24 changed files with 6016 additions and 83 deletions

View File

@ -15,6 +15,8 @@ RED_COMMON_SRCS = \
canvas_utils.cpp \
red_cairo_canvas.cpp \
red_cairo_canvas.h \
client_net_socket.cpp \
client_net_socket.h \
cmd_line_parser.cpp \
cmd_line_parser.h \
common.h \
@ -71,6 +73,8 @@ RED_COMMON_SRCS = \
screen_layer.cpp \
screen_layer.h \
shared_cache.hpp \
tunnel_channel.cpp \
tunnel_channel.h \
hot_keys.cpp \
hot_keys.h \
threads.cpp \

View File

@ -40,6 +40,7 @@
#include "quic.h"
#include "mutex.h"
#include "cmd_line_parser.h"
#include "tunnel_channel.h"
#include <log4cpp/BasicConfigurator.hh>
#include <log4cpp/FileAppender.hh>
@ -236,7 +237,7 @@ enum AppCommands {
Application::Application()
: _client (*this)
, _enabled_channels(RED_CHANNEL_END, true)
, _enabled_channels (RED_CHANNEL_END, true)
, _main_screen (NULL)
, _quitting (false)
, _active (false)
@ -1323,6 +1324,7 @@ bool Application::set_channels_security(CmdLineParser& parser, bool on, char *va
channels_names["cursor"] = RED_CHANNEL_CURSOR;
channels_names["playback"] = RED_CHANNEL_PLAYBACK;
channels_names["record"] = RED_CHANNEL_RECORD;
channels_names["tunnel"] = RED_CHANNEL_TUNNEL;
if (!strcmp(val, "all")) {
if ((val = parser.next_argument())) {
@ -1382,6 +1384,7 @@ bool Application::set_enable_channels(CmdLineParser& parser, bool enable, char *
channels_names["cursor"] = RED_CHANNEL_CURSOR;
channels_names["playback"] = RED_CHANNEL_PLAYBACK;
channels_names["record"] = RED_CHANNEL_RECORD;
channels_names["tunnel"] = RED_CHANNEL_TUNNEL;
if (!strcmp(val, "all")) {
if ((val = parser.next_argument())) {
@ -1460,6 +1463,7 @@ bool Application::process_cmd_line(int argc, char** argv)
_peer_con_opt[RED_CHANNEL_CURSOR] = RedPeer::ConnectionOptions::CON_OP_INVALID;
_peer_con_opt[RED_CHANNEL_PLAYBACK] = RedPeer::ConnectionOptions::CON_OP_INVALID;
_peer_con_opt[RED_CHANNEL_RECORD] = RedPeer::ConnectionOptions::CON_OP_INVALID;
_peer_con_opt[RED_CHANNEL_TUNNEL] = RedPeer::ConnectionOptions::CON_OP_INVALID;
parser.begin(argc, argv);
@ -1595,6 +1599,10 @@ bool Application::process_cmd_line(int argc, char** argv)
_client.register_channel_factory(RecordChannel::Factory());
}
if (_enabled_channels[RED_CHANNEL_TUNNEL]) {
_client.register_channel_factory(TunnelChannel::Factory());
}
_client.init(host.c_str(), port, sport, password.c_str(), auto_display_res);
if (auto_display_res) {
Monitor* mon = find_monitor(0);

View File

@ -0,0 +1,386 @@
/*
Copyright (C) 2009 Red Hat, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Author:
yhalperi@redhat.com
*/
#include "common.h"
#include "client_net_socket.h"
#include "debug.h"
#include "red_error_codes.h"
#include "utils.h"
ClientNetSocket::ClientNetSocket(uint16_t id, const struct in_addr& dst_addr, uint16_t dst_port,
EventsLoop& events_loop, EventHandler& event_handler)
: _id (id)
, _local_addr (dst_addr)
, _local_port (dst_port)
, _peer (INVALID_SOCKET)
, _events_loop (events_loop)
, _event_handler (event_handler)
, _num_recv_tokens (0)
, _send_message (NULL)
, _send_pos (0)
, _status (SOCKET_STATUS_CLOSED)
, _fin_pending (false)
, _close_pending (false)
{
}
ClientNetSocket::~ClientNetSocket()
{
close();
}
bool ClientNetSocket::connect(uint32_t recv_tokens)
{
struct sockaddr_in addr;
int no_delay;
ASSERT(_peer == INVALID_SOCKET && _status == SOCKET_STATUS_CLOSED);
addr.sin_port = _local_port;
addr.sin_addr.s_addr = _local_addr.s_addr;
addr.sin_family = AF_INET;
if ((_peer = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) {
int err = sock_error();
THROW("%s: failed to create socket: %s", __FUNCTION__, sock_err_message(err));
}
no_delay = 1;
if (setsockopt(_peer, IPPROTO_TCP, TCP_NODELAY,
(const char*)&no_delay, sizeof(no_delay)) == SOCKET_ERROR) {
LOG_WARN("set TCP_NODELAY failed");
}
LOG_INFO("connect to ip=%s port=%d (connection_id=%d)",
inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), _id);
if (::connect(_peer, (struct sockaddr*)&addr, sizeof(sockaddr_in)) == SOCKET_ERROR) {
int err = sock_error();
closesocket(_peer);
_peer = INVALID_SOCKET;
LOG_INFO("connect to ip=%s port=%d failed %s (connection_id=%d)",
inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), sock_err_message(err), _id);
return false;
}
_events_loop.add_socket(*this);
_status = SOCKET_STATUS_OPEN;
_num_recv_tokens = recv_tokens;
return true;
}
void ClientNetSocket::push_disconnect()
{
if ((_status == SOCKET_STATUS_CLOSED) || _close_pending) {
THROW("%s: disconnect attempt for disconnected socket %s %d", __FUNCTION__,
inet_ntoa(_local_addr), ntohs(_local_port));
}
_close_pending = true;
if (!during_send()) {
close_and_tell();
}
}
void ClientNetSocket::push_fin()
{
if ((_status == SOCKET_STATUS_OPEN) || (_status == SOCKET_STATUS_RECEIVED_FIN)) {
_fin_pending = true;
if (!during_send()) {
try {
apply_guest_fin();
} catch (ClientNetSocket::ShutdownExcpetion&) {
close_and_tell();
}
}
} else {
THROW("%s: unexpected fin connection_id=%d (status=%d)", __FUNCTION__,
_id, _status);
}
}
void ClientNetSocket::add_recv_tokens(uint32_t num_tokens)
{
if ((_status == SOCKET_STATUS_CLOSED) || _close_pending) {
THROW("%s: ack attempt for disconnected socket connection_id=%d", __FUNCTION__,
_id);
}
_num_recv_tokens += num_tokens;
// recv might have not been called because tokens weren't available
if (_num_recv_tokens && (_num_recv_tokens == num_tokens)) {
if (can_receive()) {
receive();
}
}
}
void ClientNetSocket::push_send(SendBuffer& buf)
{
if (!can_send()) {
THROW("%s: unexpected send attempt for connection_id=%d (status = %d)",
__FUNCTION__, _id, _status);
}
if (_fin_pending || _close_pending) {
THROW("%s: unexpected send attempt for connection_id=% - shutdown send pending",
__FUNCTION__, _id);
}
_send_messages.push_back(buf.ref());
send();
}
void ClientNetSocket::on_event()
{
if (can_send()) {
send();
}
if (!during_send()) {
if (_close_pending) {
close_and_tell();
} else if (_fin_pending) {
apply_guest_fin();
}
}
if (can_receive()) {
receive();
}
}
void ClientNetSocket::apply_guest_fin()
{
if (_status == SOCKET_STATUS_OPEN) {
if (shutdown(_peer, SHUT_WR) == SOCKET_ERROR) {
int err = sock_error();
LOG_INFO("shutdown in connection_id=%d failed %s", _id, sock_err_message(err));
throw ClientNetSocket::ShutdownExcpetion();
}
_fin_pending = false;
_status = SOCKET_STATUS_SENT_FIN;
} else if (_status == SOCKET_STATUS_RECEIVED_FIN) {
close_and_tell();
}
}
void ClientNetSocket::handle_client_fin()
{
if (_status == SOCKET_STATUS_OPEN) {
_status = SOCKET_STATUS_RECEIVED_FIN;
_event_handler.on_socket_fin_recv(*this);
} else if (_status == SOCKET_STATUS_SENT_FIN) {
close_and_tell();
}
}
inline bool ClientNetSocket::during_send()
{
return ((!_send_messages.empty()) || _send_message);
}
inline bool ClientNetSocket::can_send()
{
return ((_status == SOCKET_STATUS_OPEN) || (_status == SOCKET_STATUS_RECEIVED_FIN));
}
inline bool ClientNetSocket::can_receive()
{
return ((_status == SOCKET_STATUS_OPEN) || (_status == SOCKET_STATUS_SENT_FIN));
}
void ClientNetSocket::send_message_done()
{
_send_message->unref();
_send_message = NULL;
_send_pos = 0;
_event_handler.on_socket_message_send_done(*this);
}
void ClientNetSocket::send()
{
ASSERT(_peer != INVALID_SOCKET);
try {
if (_send_message) {
_send_pos += send_buf(_send_message->data() + _send_pos,
_send_message->size() - _send_pos);
if (_send_pos != _send_message->size()) {
return;
} else {
send_message_done();
}
}
while (!_send_messages.empty()) {
_send_message = _send_messages.front();
_send_messages.pop_front();
_send_pos = send_buf(_send_message->data(), _send_message->size());
if (_send_pos != _send_message->size()) {
return;
} else {
send_message_done();
}
}
} catch (ClientNetSocket::SendException&) {
close_and_tell();
}
}
uint32_t ClientNetSocket::send_buf(const uint8_t* buf, uint32_t size)
{
const uint8_t* pos = buf;
ASSERT(_peer != INVALID_SOCKET);
while (size) {
int now;
if ((now = ::send(_peer, (char*)pos, size, MSG_NOSIGNAL)) == SOCKET_ERROR) {
int err = sock_error();
if (err == WOULDBLOCK_ERR) {
break;
}
if (err == INTERRUPTED_ERR) {
continue;
}
LOG_INFO("send in connection_id=%d failed %s", _id, sock_err_message(err));
throw ClientNetSocket::SendException();
}
size -= now;
pos += now;
}
return pos - buf;
}
void ClientNetSocket::receive()
{
ASSERT(_peer != INVALID_SOCKET);
bool shutdown;
while (_num_recv_tokens) {
ReceiveBuffer& rcv_buf = alloc_receive_buffer();
uint32_t size;
try {
size = receive_buf(rcv_buf.buf(), rcv_buf.buf_max_size(), shutdown);
} catch (ClientNetSocket::ReceiveException&) {
rcv_buf.release_buf();
close_and_tell();
return;
}
if (size) {
rcv_buf.set_buf_size(size);
_num_recv_tokens--;
_event_handler.on_socket_message_recv_done(*this, rcv_buf);
} else {
rcv_buf.release_buf();
}
if (shutdown) {
handle_client_fin();
return;
}
if (size < rcv_buf.buf_max_size()) {
return;
}
}
}
uint32_t ClientNetSocket::receive_buf(uint8_t* buf, uint32_t max_size, bool& shutdown)
{
uint8_t* pos = buf;
ASSERT(_peer != INVALID_SOCKET);
shutdown = false;
while (max_size) {
int now;
if ((now = ::recv(_peer, (char*)pos, max_size, 0)) <= 0) {
if (now == 0) {
shutdown = true;
break; // a case where fin is received, but before that, there is a msg
}
int err = sock_error();
if (err == WOULDBLOCK_ERR) {
break;
}
if (err == INTERRUPTED_ERR) {
continue;
}
LOG_INFO("receive in connection_id=%d failed errno=%s", _id, sock_err_message(err));
throw ClientNetSocket::ReceiveException();
}
max_size -= now;
pos += now;
}
return (pos - buf);
}
void ClientNetSocket::release_wait_send_messages()
{
if (_send_message) {
_send_message->unref();
_send_message = NULL;
_send_pos = 0;
}
while (!_send_messages.empty()) {
_send_messages.front()->unref();
_send_messages.pop_front();
}
}
void ClientNetSocket::close()
{
release_wait_send_messages();
apply_disconnect();
}
void ClientNetSocket::close_and_tell()
{
close();
_event_handler.on_socket_disconnect(*this);
}
void ClientNetSocket::apply_disconnect()
{
if (_peer != INVALID_SOCKET) {
_events_loop.remove_socket(*this);
closesocket(_peer);
_peer = INVALID_SOCKET;
LOG_INFO("closing connection_id=%d", _id);
}
_status = SOCKET_STATUS_CLOSED;
_close_pending = false;
_fin_pending = false;
}

154
client/client_net_socket.h Normal file
View File

@ -0,0 +1,154 @@
/*
Copyright (C) 2009 Red Hat, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Author:
yhalperi@redhat.com
*/
#ifndef _H_CLIENT_NET_SOCKET
#define _H_CLIENT_NET_SOCKET
#include "platform_utils.h"
#include "common.h"
#include "events_loop.h"
/* intterface for connenctions inside client LAN */
typedef enum {
SOCKET_STATUS_OPEN,
SOCKET_STATUS_SENT_FIN,
SOCKET_STATUS_RECEIVED_FIN,
SOCKET_STATUS_CLOSED,
} SocketStatus;
class ClientNetSocket: public EventsLoop::Socket {
public:
class ReceiveBuffer;
class SendBuffer;
class EventHandler;
class SendException {};
class ReceiveException {};
class ShutdownExcpetion {};
ClientNetSocket(uint16_t id, const struct in_addr& dst_addr, uint16_t dst_port,
EventsLoop& events_loop, ClientNetSocket::EventHandler& event_handler);
virtual ~ClientNetSocket();
bool connect(uint32_t recv_tokens);
void push_disconnect();
void push_fin();
void push_send(SendBuffer& buf);
void add_recv_tokens(uint32_t num_tokens);
bool is_connected() {return _status != SOCKET_STATUS_CLOSED;}
inline uint16_t id() {return _id;}
inline const struct in_addr& local_addr() {return _local_addr;}
inline uint16_t local_port() {return _local_port;}
/* EventsLoop::Socket interface */
void on_event();
int get_socket() {return _peer;}
protected:
virtual ReceiveBuffer& alloc_receive_buffer() = 0;
private:
void send();
void receive();
uint32_t send_buf(const uint8_t* buf, uint32_t size);
uint32_t receive_buf(uint8_t* buf, uint32_t max_size, bool& shutdown);
bool can_receive();
bool can_send();
void apply_disconnect();
void apply_guest_fin();
void close();
void close_and_tell();
void handle_client_fin();
bool during_send();
void release_wait_send_messages();
void clear();
void send_message_done();
private:
uint16_t _id;
struct in_addr _local_addr;
uint16_t _local_port;
SOCKET _peer;
EventsLoop& _events_loop;
EventHandler& _event_handler;
uint32_t _num_recv_tokens;
std::list<SendBuffer*> _send_messages;
SendBuffer* _send_message;
uint32_t _send_pos;
SocketStatus _status;
bool _fin_pending;
bool _close_pending;
};
class ClientNetSocket::ReceiveBuffer {
public:
ReceiveBuffer() {}
virtual uint8_t* buf() = 0;
virtual uint32_t buf_max_size() = 0;
virtual void set_buf_size(uint32_t size) = 0;
virtual void release_buf() = 0;
protected:
virtual ~ReceiveBuffer() {}
};
class ClientNetSocket::SendBuffer {
public:
SendBuffer() {};
virtual const uint8_t* data() = 0;
virtual uint32_t size() = 0;
virtual ClientNetSocket::SendBuffer* ref() = 0;
virtual void unref() = 0;
protected:
virtual ~SendBuffer() {}
};
class ClientNetSocket::EventHandler {
public:
EventHandler() {}
virtual ~EventHandler() {}
virtual void on_socket_message_recv_done(ClientNetSocket& sckt, ReceiveBuffer& buf) = 0;
virtual void on_socket_message_send_done(ClientNetSocket& sckt) = 0;
virtual void on_socket_disconnect(ClientNetSocket& sckt) = 0;
virtual void on_socket_fin_recv(ClientNetSocket& sckt) = 0;
};
#endif

View File

@ -133,7 +133,7 @@ void RedChannelBase::link(uint32_t connection_id, const std::string& password)
*/
if (RSA_public_encrypt(password.length() + 1, (unsigned char *)password.c_str(),
(uint8_t *)bufEncrypted.get(),
rsa, RSA_PKCS1_OAEP_PADDING) > 0 ) {
rsa, RSA_PKCS1_OAEP_PADDING) > 0) {
send((uint8_t*)bufEncrypted.get(), nRSASize);
} else {
THROW("could not encrypt password");
@ -425,8 +425,10 @@ void RedChannel::run()
_outgoing_message = NULL;
}
_incomming_header_pos = 0;
delete _incomming_message;
_incomming_message = NULL;
if (_incomming_message) {
_incomming_message->unref();
_incomming_message = NULL;
}
case DISCONNECT_ACTION:
close();
on_disconnect();
@ -525,19 +527,19 @@ void RedChannel::recive_messages()
_incomming_header_pos = n;
return;
}
std::auto_ptr<CompundInMessage> message(new CompundInMessage(_incomming_header.serial,
_incomming_header.type,
_incomming_header.size,
_incomming_header.sub_list));
n = RedPeer::recive(message->data(), message->compund_size());
if (n != message->compund_size()) {
AutoRef<CompundInMessage> message(new CompundInMessage(_incomming_header.serial,
_incomming_header.type,
_incomming_header.size,
_incomming_header.sub_list));
n = RedPeer::recive((*message)->data(), (*message)->compund_size());
if (n != (*message)->compund_size()) {
_incomming_message = message.release();
_incomming_message_pos = n;
return;
}
on_message_recived();
_message_handler->handle_message(*message.get());
on_message_complition(message->serial());
_message_handler->handle_message(*(*message));
on_message_complition((*message)->serial());
}
}
@ -577,11 +579,11 @@ void RedChannel::on_event()
if (_incomming_message_pos != _incomming_message->compund_size()) {
return;
}
std::auto_ptr<CompundInMessage> message(_incomming_message);
AutoRef<CompundInMessage> message(_incomming_message);
_incomming_message = NULL;
on_message_recived();
_message_handler->handle_message(*message.get());
on_message_complition(message->serial());
_message_handler->handle_message(*(*message));
on_message_complition((*message)->serial());
}
recive_messages();
}
@ -616,18 +618,18 @@ void RedChannel::handle_migrate(RedPeer::InMessage* message)
if (migrate->flags & RED_MIGRATE_NEED_FLUSH) {
send_migrate_flush_mark();
}
std::auto_ptr<RedPeer::CompundInMessage> data_message;
AutoRef<CompundInMessage> data_message;
if (migrate->flags & RED_MIGRATE_NEED_DATA_TRANSFER) {
data_message.reset(recive());
}
_client.migrate_channel(*this);
if (migrate->flags & RED_MIGRATE_NEED_DATA_TRANSFER) {
if (data_message->type() != RED_MIGRATE_DATA) {
if ((*data_message)->type() != RED_MIGRATE_DATA) {
THROW("expect RED_MIGRATE_DATA");
}
std::auto_ptr<RedPeer::OutMessage> message(new RedPeer::OutMessage(REDC_MIGRATE_DATA,
data_message->size()));
memcpy(message->data(), data_message->data(), data_message->size());
(*data_message)->size()));
memcpy(message->data(), (*data_message)->data(), (*data_message)->size());
send(*message);
}
_loop.add_socket(*this);

View File

@ -16,54 +16,12 @@
*/
#include "common.h"
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#define SHUT_RDWR SD_BOTH
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netdb.h>
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#define closesocket(sock) ::close(sock)
#endif
#include "red.h"
#include "red_peer.h"
#include "utils.h"
#include "debug.h"
#include "platform_utils.h"
#ifdef _WIN32
int inet_aton(const char *ip, struct in_addr *in_addr)
{
unsigned long addr = inet_addr(ip);
if (addr == INADDR_NONE) {
return 0;
}
in_addr->S_un.S_addr = addr;
return 1;
}
#define SHUTDOWN_ERR WSAESHUTDOWN
#define INTERRUPTED_ERR WSAEINTR
#define WOULDBLOCK_ERR WSAEWOULDBLOCK
#define sock_error() WSAGetLastError()
#define sock_err_message(err) sys_err_to_str(err)
#else
#define SHUTDOWN_ERR EPIPE
#define INTERRUPTED_ERR EINTR
#define WOULDBLOCK_ERR EAGAIN
#define sock_error() errno
#define sock_err_message(err) strerror(err)
#endif
static void ssl_error()
{
ERR_print_errors_fp(stderr);
@ -326,11 +284,11 @@ uint32_t RedPeer::recive(uint8_t *buf, uint32_t size)
RedPeer::CompundInMessage* RedPeer::recive()
{
RedDataHeader header;
std::auto_ptr<CompundInMessage> message;
AutoRef<CompundInMessage> message;
recive((uint8_t*)&header, sizeof(RedDataHeader));
message.reset(new CompundInMessage(header.serial, header.type, header.size, header.sub_list));
recive(message->data(), message->compund_size());
recive((*message)->data(), (*message)->compund_size());
return message.release();
}

View File

@ -18,12 +18,6 @@
#ifndef _H_REDPEER
#define _H_REDPEER
#ifdef _WIN32
#include <winsock.h>
#else
typedef int SOCKET;
#endif
#include <openssl/ssl.h>
#include <openssl/err.h>
@ -31,6 +25,7 @@ typedef int SOCKET;
#include "red.h"
#include "events_loop.h"
#include "threads.h"
#include "platform_utils.h"
class RedPeer: protected EventsLoop::Socket {
public:
@ -116,7 +111,7 @@ private:
class RedPeer::InMessage {
public:
InMessage(uint16_t type, uint32_t size, uint8_t* data)
InMessage(uint16_t type, uint32_t size, uint8_t * data)
: _type (type)
, _size (size)
, _data (data)
@ -139,12 +134,14 @@ class RedPeer::CompundInMessage: public RedPeer::InMessage {
public:
CompundInMessage(uint64_t _serial, uint16_t type, uint32_t size, uint32_t sub_list)
: InMessage(type, size, new uint8_t[size])
, _refs (1)
, _serial (_serial)
, _sub_list (sub_list)
{
}
virtual ~CompundInMessage() { delete[] _data;}
RedPeer::InMessage* ref() { _refs++; return this;}
void unref() {if (!--_refs) delete this;}
uint64_t serial() { return _serial;}
uint32_t sub_list() { return _sub_list;}
@ -152,7 +149,11 @@ public:
virtual uint32_t size() { return _sub_list ? _sub_list : _size;}
uint32_t compund_size() {return _size;}
protected:
virtual ~CompundInMessage() { delete[] _data;}
private:
int _refs;
uint64_t _serial;
uint32_t _sub_list;
};

792
client/tunnel_channel.cpp Normal file
View File

@ -0,0 +1,792 @@
/*
Copyright (C) 2009 Red Hat, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Author:
yhalperi@redhat.com
*/
#include "common.h"
#include "tunnel_channel.h"
#include "red.h"
#define SOCKET_WINDOW_SIZE 60
#define SOCKET_TOKENS_TO_SEND 20
/* classes for tunneling msgs without reallocations and memcpy */
class InSocketMessage;
class OutSocketMessage;
class InSocketMessage: public ClientNetSocket::SendBuffer {
public:
InSocketMessage(RedChannel::CompundInMessage& full_msg);
const uint8_t* data();
uint32_t size();
ClientNetSocket::SendBuffer* ref();
void unref();
protected:
virtual ~InSocketMessage() {}
private:
int _refs;
RedChannel::CompundInMessage& _full_msg;
RedTunnelSocketData* _sckt_msg;
uint32_t _buf_size;
};
InSocketMessage::InSocketMessage(RedChannel::CompundInMessage& full_msg)
: _refs (1)
, _full_msg (full_msg)
{
ASSERT(full_msg.type() == RED_TUNNEL_SOCKET_DATA);
_full_msg.ref();
_sckt_msg = (RedTunnelSocketData*)(_full_msg.data());
_buf_size = _full_msg.size() - sizeof(RedTunnelSocketData);
}
const uint8_t* InSocketMessage::data()
{
return _sckt_msg->data;
}
uint32_t InSocketMessage::size()
{
return _buf_size;
}
ClientNetSocket::SendBuffer* InSocketMessage::ref()
{
_full_msg.ref();
_refs++;
return this;
}
void InSocketMessage::unref()
{
_full_msg.unref();
if (!--_refs) {
delete this;
}
}
class OutSocketMessage: public RedPeer::OutMessage,
public RedChannel::OutMessage,
public ClientNetSocket::ReceiveBuffer {
public:
virtual RedPeer::OutMessage& peer_message() { return *this;}
virtual void release();
virtual uint8_t* buf();
virtual uint32_t buf_max_size() {return _max_data_size;}
virtual void set_buf_size(uint32_t size);
virtual void release_buf();
static void init(uint32_t max_data_size);
static OutSocketMessage& alloc_message();
static void clear_free_messages();
protected:
OutSocketMessage();
virtual ~OutSocketMessage() {}
private:
static std::list<OutSocketMessage*> _free_messages;
static uint32_t _max_data_size;
};
std::list<OutSocketMessage*> OutSocketMessage::_free_messages;
uint32_t OutSocketMessage::_max_data_size;
OutSocketMessage::OutSocketMessage()
: RedPeer::OutMessage(REDC_TUNNEL_SOCKET_DATA, sizeof(RedcTunnelSocketData) + _max_data_size)
, RedChannel::OutMessage()
, ClientNetSocket::ReceiveBuffer()
{
}
uint8_t* OutSocketMessage::buf()
{
return ((RedcTunnelSocketData*)RedPeer::OutMessage::data())->data;
}
void OutSocketMessage::set_buf_size(uint32_t size)
{
RedPeer::OutMessage::header().size = size + sizeof(RedcTunnelSocketData);
}
void OutSocketMessage::release()
{
OutSocketMessage::_free_messages.push_front(this);
}
void OutSocketMessage::release_buf()
{
release();
}
void OutSocketMessage::init(uint32_t max_data_size)
{
_max_data_size = max_data_size;
}
OutSocketMessage& OutSocketMessage::alloc_message()
{
OutSocketMessage* ret;
if (!_free_messages.empty()) {
ret = _free_messages.front();
_free_messages.pop_front();
} else {
ret = new OutSocketMessage();
}
return *ret;
}
void OutSocketMessage::clear_free_messages()
{
while (!_free_messages.empty()) {
OutSocketMessage* message = _free_messages.front();
_free_messages.pop_front();
delete message;
}
}
struct TunnelService {
uint32_t type;
uint32_t id;
uint32_t group;
struct in_addr ip;
uint32_t port;
std::string name;
std::string description;
struct in_addr virtual_ip;
#ifdef TUNNEL_CONFIG
TunnelConfigConnectionIfc* service_src;
#endif
};
class TunnelChannel::TunnelSocket: public ClientNetSocket {
public:
TunnelSocket(uint16_t id, TunnelService & dst_service, EventsLoop & events_loop,
EventHandler & event_handler);
virtual ~TunnelSocket() {}
void set_num_tokens(uint32_t tokens) {_num_tokens = tokens;}
void set_server_num_tokens(uint32_t tokens) {_server_num_tokens = tokens;}
void set_guest_closed() {_guest_closed = true;}
uint32_t get_num_tokens() {return _num_tokens;}
uint32_t get_server_num_tokens() {return _server_num_tokens;}
bool get_guest_closed() {return _guest_closed;}
protected:
virtual ReceiveBuffer& alloc_receive_buffer() {return OutSocketMessage::alloc_message();}
private:
uint32_t _num_tokens;
uint32_t _server_num_tokens;
uint32_t _service_id;
bool _guest_closed;
};
TunnelChannel::TunnelSocket::TunnelSocket(uint16_t id, TunnelService& dst_service,
EventsLoop& events_loop,
ClientNetSocket::EventHandler& event_handler)
: ClientNetSocket(id, dst_service.ip, htons((uint16_t)dst_service.port),
events_loop, event_handler)
, _num_tokens (0)
, _server_num_tokens (0)
, _service_id (dst_service.id)
, _guest_closed (false)
{
}
class TunnelHandler: public MessageHandlerImp<TunnelChannel, RED_TUNNEL_MESSAGES_END> {
public:
TunnelHandler(TunnelChannel& channel)
: MessageHandlerImp<TunnelChannel, RED_TUNNEL_MESSAGES_END>(channel) {}
};
TunnelChannel::TunnelChannel(RedClient& client, uint32_t id)
: RedChannel(client, RED_CHANNEL_TUNNEL, id, new TunnelHandler(*this))
, _max_socket_data_size(0)
, _service_id(0)
, _service_group(0)
{
TunnelHandler* handler = static_cast<TunnelHandler*>(get_message_handler());
handler->set_handler(RED_MIGRATE, &TunnelChannel::handle_migrate, 0);
handler->set_handler(RED_SET_ACK, &TunnelChannel::handle_set_ack, sizeof(RedSetAck));
handler->set_handler(RED_PING, &TunnelChannel::handle_ping, sizeof(RedPing));
handler->set_handler(RED_WAIT_FOR_CHANNELS, &TunnelChannel::handle_wait_for_channels,
sizeof(RedWaitForChannels));
handler->set_handler(RED_TUNNEL_INIT,
&TunnelChannel::handle_init, sizeof(RedTunnelInit));
handler->set_handler(RED_TUNNEL_SERVICE_IP_MAP,
&TunnelChannel::handle_service_ip_map, sizeof(RedTunnelServiceIpMap));
handler->set_handler(RED_TUNNEL_SOCKET_OPEN,
&TunnelChannel::handle_socket_open, sizeof(RedTunnelSocketOpen));
handler->set_handler(RED_TUNNEL_SOCKET_CLOSE,
&TunnelChannel::handle_socket_close, sizeof(RedTunnelSocketClose));
handler->set_handler(RED_TUNNEL_SOCKET_FIN,
&TunnelChannel::handle_socket_fin, sizeof(RedTunnelSocketFin));
handler->set_handler(RED_TUNNEL_SOCKET_TOKEN,
&TunnelChannel::handle_socket_token, sizeof(RedTunnelSocketTokens));
handler->set_handler(RED_TUNNEL_SOCKET_CLOSED_ACK,
&TunnelChannel::handle_socket_closed_ack,
sizeof(RedTunnelSocketClosedAck));
handler->set_handler(RED_TUNNEL_SOCKET_DATA,
&TunnelChannel::handle_socket_data, sizeof(RedTunnelSocketData));
#ifdef TUNNEL_CONFIG
_config_listener = new TunnelConfigListenerIfc(*this);
#endif
}
TunnelChannel::~TunnelChannel()
{
destroy_sockets();
OutSocketMessage::clear_free_messages();
#ifdef TUNNEL_CONFIG
delete _config_listener;
#endif
}
void TunnelChannel::handle_init(RedPeer::InMessage* message)
{
RedTunnelInit* init_msg = (RedTunnelInit*)message->data();
_max_socket_data_size = init_msg->max_socket_data_size;
OutSocketMessage::init(_max_socket_data_size);
_sockets.resize(init_msg->max_num_of_sockets);
}
void TunnelChannel::send_service(TunnelService& service)
{
int msg_size = 0;
msg_size += service.name.length() + 1;
msg_size += service.description.length() + 1;
if (service.type == RED_TUNNEL_SERVICE_TYPE_IPP) {
msg_size += sizeof(RedcTunnelAddPrintService) + sizeof(RedTunnelIPv4);
} else if (service.type == RED_TUNNEL_SERVICE_TYPE_GENERIC) {
msg_size += sizeof(RedcTunnelAddGenericService);
} else {
THROW("%s: invalid service type", __FUNCTION__);
}
Message* service_msg = new Message(REDC_TUNNEL_SERVICE_ADD, msg_size);
RedcTunnelAddGenericService* out_service = (RedcTunnelAddGenericService*)service_msg->data();
out_service->id = service.id;
out_service->group = service.group;
out_service->type = service.type;
out_service->port = service.port;
int cur_offset;
if (service.type == RED_TUNNEL_SERVICE_TYPE_IPP) {
cur_offset = sizeof(RedcTunnelAddPrintService);
((RedcTunnelAddPrintService*)out_service)->ip.type = RED_TUNNEL_IP_TYPE_IPv4;
memcpy(((RedcTunnelAddPrintService*)out_service)->ip.data, &(service.ip.s_addr),
sizeof(RedTunnelIPv4));
cur_offset += sizeof(RedTunnelIPv4);
} else {
cur_offset = sizeof(RedcTunnelAddGenericService);
}
out_service->name = cur_offset;
service.name.copy((char*)(service_msg->data() + cur_offset), service.name.length());
(service_msg->data() + cur_offset)[service.name.length()] = '\0';
cur_offset += service.name.length() + 1;
out_service->description = cur_offset;
service.description.copy((char*)(service_msg->data() + cur_offset),
service.description.length());
(service_msg->data() + cur_offset)[service.description.length()] = '\0';
cur_offset += service.description.length() + 1;
post_message(service_msg);
}
void TunnelChannel::handle_service_ip_map(RedPeer::InMessage* message)
{
RedTunnelServiceIpMap* service_ip_msg = (RedTunnelServiceIpMap*)message->data();
TunnelService* service = find_service(service_ip_msg->service_id);
if (!service) {
THROW("%s: attempt to map non-existing service id=%d", __FUNCTION__,
service_ip_msg->service_id);
}
if (service_ip_msg->virtual_ip.type == RED_TUNNEL_IP_TYPE_IPv4) {
memcpy(&service->virtual_ip.s_addr, service_ip_msg->virtual_ip.data,
sizeof(RedTunnelIPv4));
} else {
THROW("unexpected ip type %d", service_ip_msg->virtual_ip.type);
}
DBG(0, "service_id=%d (%s), virtual_ip=%s", service->id, service->name.c_str(),
inet_ntoa(service->virtual_ip));
#ifdef TUNNEL_CONFIG
service->service_src->send_virtual_ip(service->virtual_ip);
#endif
}
void TunnelChannel::handle_socket_open(RedPeer::InMessage* message)
{
RedTunnelSocketOpen* open_msg = (RedTunnelSocketOpen*)message->data();
TunnelSocket* sckt;
Message* out_msg;
if (_sockets[open_msg->connection_id]) {
THROW("%s: attempt to open an already opened connection id=%d", __FUNCTION__,
open_msg->connection_id);
}
TunnelService* service = find_service(open_msg->service_id);
if (!service) {
THROW("%s: attempt to access non-existing service id=%d", __FUNCTION__,
open_msg->service_id);
}
sckt = new TunnelSocket(open_msg->connection_id, *service, get_events_loop(), *this);
if (sckt->connect(open_msg->tokens)) {
_sockets[open_msg->connection_id] = sckt;
out_msg = new Message(REDC_TUNNEL_SOCKET_OPEN_ACK, sizeof(RedcTunnelSocketOpenAck));
sckt->set_num_tokens(0);
sckt->set_server_num_tokens(SOCKET_WINDOW_SIZE);
((RedcTunnelSocketOpenAck*)out_msg->data())->connection_id = open_msg->connection_id;
((RedcTunnelSocketOpenAck*)out_msg->data())->tokens = SOCKET_WINDOW_SIZE;
} else {
out_msg = new Message(REDC_TUNNEL_SOCKET_OPEN_NACK, sizeof(RedcTunnelSocketOpenNack));
((RedcTunnelSocketOpenNack*)out_msg->data())->connection_id = open_msg->connection_id;
delete sckt;
}
post_message(out_msg);
}
void TunnelChannel::handle_socket_fin(RedPeer::InMessage* message)
{
RedTunnelSocketFin* fin_msg = (RedTunnelSocketFin*)message->data();
TunnelSocket* sckt = _sockets[fin_msg->connection_id];
if (!sckt) {
THROW("%s: fin connection that doesn't exist id=%d", __FUNCTION__, fin_msg->connection_id);
}
DBG(0, "guest fin connection_id=%d", fin_msg->connection_id);
if (sckt->is_connected()) {
sckt->push_fin();
}
}
void TunnelChannel::handle_socket_close(RedPeer::InMessage* message)
{
RedTunnelSocketClose* close_msg = (RedTunnelSocketClose*)message->data();
TunnelSocket* sckt = _sockets[close_msg->connection_id];
if (!sckt) {
THROW("%s: closing connection that doesn't exist id=%d", __FUNCTION__,
close_msg->connection_id);
}
DBG(0, "guest closed connection_id=%d", close_msg->connection_id);
sckt->set_guest_closed();
if (sckt->is_connected()) {
sckt->push_disconnect();
} else {
// close happend in the server side before it received the client
// close msg. we should ack the server and free the socket
on_socket_disconnect(*sckt);
}
}
void TunnelChannel::handle_socket_closed_ack(RedPeer::InMessage* message)
{
RedTunnelSocketClosedAck* close_ack_msg = (RedTunnelSocketClosedAck*)message->data();
TunnelSocket* sckt = _sockets[close_ack_msg->connection_id];
if (!sckt) {
THROW("%s: close ack to connection that doesn't exist id=%d", __FUNCTION__,
close_ack_msg->connection_id);
}
if (sckt->is_connected()) {
THROW("%s: close ack to connection that is not closed id=%d",
__FUNCTION__, close_ack_msg->connection_id);
}
_sockets[sckt->id()] = NULL;
DBG(0, "guest Acked closed connection_id=%d", close_ack_msg->connection_id);
delete sckt;
}
void TunnelChannel::handle_socket_data(RedPeer::InMessage* message)
{
RedTunnelSocketData* send_msg = (RedTunnelSocketData*)message->data();
TunnelSocket* sckt = _sockets[send_msg->connection_id];
if (!sckt) {
THROW("%s: sending data to connection that doesn't exist id=%d", __FUNCTION__,
send_msg->connection_id);
}
if (!sckt->get_server_num_tokens()) {
THROW("%s: token violation connectio_id=%d", __FUNCTION__, sckt->id());
}
sckt->set_server_num_tokens(sckt->get_server_num_tokens() - 1);
if (!sckt->is_connected()) {
// server hasn't handled the close msg yet
return;
}
InSocketMessage* sckt_msg = new InSocketMessage(*(
static_cast<RedChannel::CompundInMessage*>(message)));
if (sckt_msg->size() > _max_socket_data_size) {
THROW("%s: socket data exceeds size limit %d > %d connection_id=%d", __FUNCTION__,
sckt_msg->size(), _max_socket_data_size, sckt->id());
}
sckt->push_send(*sckt_msg);
sckt_msg->unref();
}
void TunnelChannel::handle_socket_token(RedPeer::InMessage* message)
{
RedTunnelSocketTokens* token_msg = (RedTunnelSocketTokens*)message->data();
TunnelSocket* sckt = _sockets[token_msg->connection_id];
if (!sckt) {
THROW("%s: ack connection that doesn't exist id=%d", __FUNCTION__,
token_msg->connection_id);
}
if (!sckt->is_connected()) {
return;
}
sckt->add_recv_tokens(token_msg->num_tokens);
}
void TunnelChannel::on_socket_message_recv_done(ClientNetSocket& sckt,
ClientNetSocket::ReceiveBuffer& buf)
{
TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt);
OutSocketMessage* out_msg = static_cast<OutSocketMessage*>(&buf);
((RedcTunnelSocketData*)(out_msg->data()))->connection_id = tunnel_sckt->id();
post_message(out_msg);
}
void TunnelChannel::on_socket_fin_recv(ClientNetSocket& sckt)
{
TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt);
Message* out_msg = new Message(REDC_TUNNEL_SOCKET_FIN, sizeof(RedcTunnelSocketFin));
DBG(0, "FIN from client coonection id=%d", tunnel_sckt->id());
((RedcTunnelSocketFin*)out_msg->data())->connection_id = tunnel_sckt->id();
post_message(out_msg);
}
void TunnelChannel::on_socket_disconnect(ClientNetSocket& sckt)
{
TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt);
Message* out_msg;
// close intiated by server -> needs ack
if (tunnel_sckt->get_guest_closed()) {
DBG(0, "send close ack connection_id=%d", tunnel_sckt->id());
out_msg = new Message(REDC_TUNNEL_SOCKET_CLOSED_ACK, sizeof(RedcTunnelSocketClosedAck));
((RedcTunnelSocketClosedAck*)out_msg->data())->connection_id = tunnel_sckt->id();
_sockets[tunnel_sckt->id()] = NULL;
delete &sckt;
} else { // close initiated by client
DBG(0, "send close coonection_id=%d", tunnel_sckt->id());
out_msg = new Message(REDC_TUNNEL_SOCKET_CLOSED, sizeof(RedcTunnelSocketClosed));
((RedcTunnelSocketClosed*)out_msg->data())->connection_id = tunnel_sckt->id();
}
post_message(out_msg);
}
void TunnelChannel::on_socket_message_send_done(ClientNetSocket& sckt)
{
TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt);
uint32_t num_tokens = tunnel_sckt->get_num_tokens();
num_tokens++;
if (num_tokens == SOCKET_TOKENS_TO_SEND) {
Message* out_msg = new Message(REDC_TUNNEL_SOCKET_TOKEN, sizeof(RedcTunnelSocketTokens));
RedcTunnelSocketTokens* tokens_msg = (RedcTunnelSocketTokens*)out_msg->data();
tokens_msg->connection_id = tunnel_sckt->id();
tokens_msg->num_tokens = num_tokens;
post_message(out_msg);
tunnel_sckt->set_num_tokens(0);
tunnel_sckt->set_server_num_tokens(tunnel_sckt->get_server_num_tokens() + num_tokens);
ASSERT(tunnel_sckt->get_server_num_tokens() <= SOCKET_WINDOW_SIZE);
} else {
tunnel_sckt->set_num_tokens(num_tokens);
}
}
TunnelService* TunnelChannel::find_service(uint32_t id)
{
for (std::list<TunnelService*>::iterator iter = _services.begin();
iter != _services.end(); iter++) {
if ((*iter)->id == id) {
return *iter;
}
}
return NULL;
}
/* returns the first service with the same ip */
TunnelService* TunnelChannel::find_service(struct in_addr& ip)
{
for (std::list<TunnelService*>::iterator iter = _services.begin();
iter != _services.end(); iter++) {
if ((*iter)->ip.s_addr == ip.s_addr) {
return *iter;
}
}
return NULL;
}
TunnelService* TunnelChannel::find_service(struct in_addr& ip, uint32_t port)
{
for (std::list<TunnelService*>::iterator iter = _services.begin();
iter != _services.end(); iter++) {
if (((*iter)->ip.s_addr == ip.s_addr) && ((*iter)->port == port)) {
return *iter;
}
}
return NULL;
}
void TunnelChannel::destroy_sockets()
{
for (unsigned int i = 0; i < _sockets.size(); i++) {
if (_sockets[i]) {
delete _sockets[i];
_sockets[i] = NULL;
}
}
}
void TunnelChannel::on_disconnect()
{
destroy_sockets();
OutSocketMessage::clear_free_messages();
}
#ifdef TUNNEL_CONFIG
void TunnelChannel::add_service(TunnelConfigConnectionIfc& source,
uint32_t type, struct in_addr& ip, uint32_t port,
std::string& name, std::string& description)
{
if (find_service(ip, port)) {
LOG_WARN("service ip=%s port=%d was already added",
inet_ntoa(ip), port);
return;
}
TunnelService* new_service = new TunnelService;
TunnelService* service_group = find_service(ip);
new_service->type = type;
new_service->id = _service_id++;
if (service_group) {
if (name != service_group->name) {
LOG_WARN("service ip=%s port=%d was not added because of inconsistent name for ip",
inet_ntoa(ip), port);
delete new_service;
return;
}
new_service->group = service_group->group;
} else {
new_service->group = _service_group++;
}
new_service->ip.s_addr = ip.s_addr;
new_service->port = port;
new_service->name = name;
new_service->description = description;
new_service->service_src = &source;
_services.push_back(new_service);
send_service(*new_service);
}
#endif
class TunnelFactory: public ChannelFactory {
public:
TunnelFactory() : ChannelFactory(RED_CHANNEL_TUNNEL) {}
virtual RedChannel* construct(RedClient& client, uint32_t id)
{
return new TunnelChannel(client, id);
}
};
static TunnelFactory factory;
ChannelFactory& TunnelChannel::Factory()
{
return factory;
}
#ifdef TUNNEL_CONFIG
TunnelConfigListenerIfc::TunnelConfigListenerIfc(TunnelChannel& tunnel)
: _tunnel(tunnel)
{
_listener_ref = NamedPipe::create(TUNNEL_CONFIG_PIPE_NAME, *this);
}
TunnelConfigListenerIfc::~TunnelConfigListenerIfc()
{
for (std::list<TunnelConfigConnectionIfc*>::iterator it = _connections.begin();
it != _connections.end(); ++it) {
if ((*it)->get_ref() != NamedPipe::INVALID_CONNECTION) {
NamedPipe::destroy_connection((*it)->get_ref());
}
delete (*it);
}
NamedPipe::destroy(_listener_ref);
}
NamedPipe::ConnectionInterface& TunnelConfigListenerIfc::create()
{
DBG(0, "new_connection");
TunnelConfigConnectionIfc* new_conn = new TunnelConfigConnectionIfc(_tunnel, *this);
_connections.push_back(new_conn);
return *new_conn;
}
void TunnelConfigListenerIfc::destroy_connection(TunnelConfigConnectionIfc* conn)
{
if (conn->get_ref() != NamedPipe::INVALID_CONNECTION) {
NamedPipe::destroy_connection(conn->get_ref());
}
_connections.remove(conn);
delete conn;
}
TunnelConfigConnectionIfc::TunnelConfigConnectionIfc(TunnelChannel& tunnel,
TunnelConfigListenerIfc& listener)
: _tunnel(tunnel)
, _listener(listener)
, _in_msg_len(0)
, _out_msg("")
, _out_msg_pos(0)
{
}
void TunnelConfigConnectionIfc::bind(NamedPipe::ConnectionRef conn_ref)
{
_opaque = conn_ref;
on_data();
}
void TunnelConfigConnectionIfc::on_data()
{
if (!_out_msg.empty()) {
int ret = NamedPipe::write(_opaque, (uint8_t*)_out_msg.c_str() + _out_msg_pos,
_out_msg.length() - _out_msg_pos);
if (ret == -1) {
_listener.destroy_connection(this);
return;
}
_out_msg_pos += ret;
if (_out_msg_pos == _out_msg.length()) {
_out_msg = "";
_out_msg_pos = 0;
}
} else {
int ret = NamedPipe::read(_opaque, (uint8_t*)_in_msg + _in_msg_len,
TUNNEL_CONFIG_MAX_MSG_LEN - _in_msg_len);
if (ret == -1) {
_listener.destroy_connection(this);
return;
}
_in_msg_len += ret;
if (_in_msg[_in_msg_len - 1] != '\n') {
return;
}
handle_msg();
_in_msg_len = 0;
}
}
void TunnelConfigConnectionIfc::send_virtual_ip(struct in_addr& ip)
{
_out_msg = inet_ntoa(ip);
_out_msg += "\n";
_out_msg_pos = 0;
on_data();
}
void TunnelConfigConnectionIfc::handle_msg()
{
std::string space = " \t";
_in_msg[_in_msg_len - 1] = '\0';
std::string msg(_in_msg);
uint32_t service_type;
struct in_addr ip;
uint32_t port;
std::string name;
std::string desc;
DBG(0, "msg=%s", _in_msg);
size_t start_token = 0;
size_t end_token;
start_token = msg.find_first_not_of(space);
end_token = msg.find_first_of(space, start_token);
if ((end_token - start_token) != 1) {
THROW("unexpected service type length");
}
if (msg[start_token] == '0') {
service_type = RED_TUNNEL_SERVICE_TYPE_GENERIC;
} else if (msg[start_token] == '1') {
service_type = RED_TUNNEL_SERVICE_TYPE_IPP;
} else {
THROW("unexpected service type");
}
start_token = msg.find_first_not_of(space, end_token);
end_token = msg.find_first_of(space, start_token);
inet_aton(msg.substr(start_token, end_token - start_token).c_str(), &ip);
start_token = msg.find_first_not_of(space, end_token);
end_token = msg.find_first_of(space, start_token);
port = atoi(msg.substr(start_token, end_token - start_token).c_str());
start_token = msg.find_first_not_of(space, end_token);
end_token = msg.find_first_of(space, start_token);
name = msg.substr(start_token, end_token - start_token);
start_token = msg.find_first_not_of(space, end_token);
desc = msg.substr(start_token);
_tunnel.add_service(*this, service_type, ip, port, name, desc);
}
#endif

138
client/tunnel_channel.h Normal file
View File

@ -0,0 +1,138 @@
/*
Copyright (C) 2009 Red Hat, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Author:
yhalperi@redhat.com
*/
#ifndef _H_TUNNEL_CHANNEL
#define _H_TUNNEL_CHANNEL
#include "common.h"
#include "red_channel.h"
#include "red_client.h"
#include "client_net_socket.h"
#include "platform.h"
#define TUNNEL_CONFIG
#ifdef TUNNEL_CONFIG
class TunnelConfigConnectionIfc;
class TunnelConfigListenerIfc;
#endif
/* channel for tunneling tcp from guest to client network */
typedef struct TunnelService TunnelService;
class TunnelChannel: public RedChannel,
public ClientNetSocket::EventHandler {
public:
TunnelChannel(RedClient& client, uint32_t id);
virtual ~TunnelChannel();
virtual void on_socket_message_recv_done(ClientNetSocket& sckt,
ClientNetSocket::ReceiveBuffer& buf);
virtual void on_socket_message_send_done(ClientNetSocket& sckt);
virtual void on_socket_fin_recv(ClientNetSocket& sckt);
virtual void on_socket_disconnect(ClientNetSocket& sckt);
#ifdef TUNNEL_CONFIG
void add_service(TunnelConfigConnectionIfc& source,
uint32_t type, struct in_addr& ip, uint32_t port,
std::string& name, std::string& description);
#endif
static ChannelFactory& Factory();
protected:
class TunnelSocket;
virtual void on_disconnect();
private:
void handle_init(RedPeer::InMessage* message);
void handle_service_ip_map(RedPeer::InMessage* message);
void handle_socket_open(RedPeer::InMessage* message);
void handle_socket_fin(RedPeer::InMessage* message);
void handle_socket_close(RedPeer::InMessage* message);
void handle_socket_closed_ack(RedPeer::InMessage* message);
void handle_socket_data(RedPeer::InMessage* message);
void handle_socket_token(RedPeer::InMessage* message);
TunnelService* find_service(uint32_t id);
TunnelService* find_service(struct in_addr& ip);
TunnelService* find_service(struct in_addr& ip, uint32_t port);
void send_service(TunnelService& service);
void destroy_sockets();
private:
std::vector<TunnelSocket*> _sockets;
std::list<TunnelService*> _services;
uint32_t _max_socket_data_size;
uint32_t _service_id;
uint32_t _service_group;
#ifdef TUNNEL_CONFIG
TunnelConfigListenerIfc* _config_listener;
#endif
};
#ifdef TUNNEL_CONFIG
#ifdef _WIN32
#define TUNNEL_CONFIG_PIPE_NAME "tunnel-config.pipe"
#else
#define TUNNEL_CONFIG_PIPE_NAME "/tmp/tunnel-config.pipe"
#endif
class TunnelConfigConnectionIfc;
class TunnelConfigListenerIfc: public NamedPipe::ListenerInterface {
public:
TunnelConfigListenerIfc(TunnelChannel& tunnel);
virtual ~TunnelConfigListenerIfc();
virtual NamedPipe::ConnectionInterface& create();
virtual void destroy_connection(TunnelConfigConnectionIfc* conn);
private:
TunnelChannel& _tunnel;
NamedPipe::ListenerRef _listener_ref;
std::list<TunnelConfigConnectionIfc*> _connections;
};
#define TUNNEL_CONFIG_MAX_MSG_LEN 2048
class TunnelConfigConnectionIfc: public NamedPipe::ConnectionInterface {
public:
TunnelConfigConnectionIfc(TunnelChannel& tunnel,
TunnelConfigListenerIfc& listener);
virtual void bind(NamedPipe::ConnectionRef conn_ref);
virtual void on_data();
void send_virtual_ip(struct in_addr& ip);
NamedPipe::ConnectionRef get_ref() {return _opaque;}
void handle_msg();
private:
TunnelChannel& _tunnel;
TunnelConfigListenerIfc& _listener;
char _in_msg[TUNNEL_CONFIG_MAX_MSG_LEN]; // <service_type> <ip> <port> <name> <desc>\n
int _in_msg_len;
std::string _out_msg; // <virtual ip>\n
int _out_msg_pos;
};
#endif
#endif

View File

@ -90,7 +90,7 @@ HBITMAP get_alpha_bitmap_res(int id)
AutoDC auto_dc(create_compatible_dc());
BITMAPINFO dest_info;
uint8_t *dest;
uint8_t* dest;
dest_info.bmiHeader.biSize = sizeof(dest_info.bmiHeader);
dest_info.bmiHeader.biWidth = src_info.bmWidth;
dest_info.bmiHeader.biHeight = -src_info.bmHeight;
@ -102,7 +102,7 @@ HBITMAP get_alpha_bitmap_res(int id)
dest_info.bmiHeader.biClrUsed = 0;
dest_info.bmiHeader.biClrImportant = 0;
HBITMAP ret = CreateDIBSection(auto_dc.get(), &dest_info, 0, (VOID **)&dest, NULL, 0);
HBITMAP ret = CreateDIBSection(auto_dc.get(), &dest_info, 0, (VOID**)&dest, NULL, 0);
if (!ret) {
THROW("create bitmap failed, %u", GetLastError());
}
@ -139,7 +139,7 @@ const char* sys_err_to_str(int error)
msg = new char[BUF_SIZE];
_snprintf(msg, BUF_SIZE, "errno %d", error);
} else {
char *new_line;
char* new_line;
if ((new_line = strrchr(msg, '\r'))) {
*new_line = 0;
}
@ -149,3 +149,14 @@ const char* sys_err_to_str(int error)
return errors_map[error];
}
int inet_aton(const char* ip, struct in_addr* in_addr)
{
unsigned long addr = inet_addr(ip);
if (addr == INADDR_NONE) {
return 0;
}
in_addr->S_un.S_addr = addr;
return 1;
}

View File

@ -18,13 +18,17 @@
#ifndef _H_PLATFORM_UTILS
#define _H_PLATFORM_UTILS
#include <winsock.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#define mb() __asm {lock add [esp], 0}
template<class T, class FreeRes = FreeObject<T>, T invalid = NULL >
class AutoRes {
public:
AutoRes() : res (invalid) {}
AutoRes(T inRes) : res (inRes) {}
AutoRes() : res(invalid) {}
AutoRes(T inRes) : res(inRes) {}
~AutoRes() { set(invalid); }
void set(T inRes) {if (res != invalid) free_res(res); res = inRes; }
@ -67,7 +71,7 @@ HBITMAP get_alpha_bitmap_res(int id);
class WindowDC {
public:
WindowDC(HWND window) : _window (window), _dc (GetDC(window)) {}
WindowDC(HWND window): _window (window), _dc (GetDC(window)) {}
~WindowDC() { ReleaseDC(_window, _dc);}
HDC operator * () { return _dc;}
@ -80,5 +84,17 @@ typedef AutoRes<HDC, Delete_DC> AutoReleaseDC;
const char* sys_err_to_str(int error);
#define SHUT_WR SD_SEND
#define SHUT_RD SD_RECEIVE
#define SHUT_RDWR SD_BOTH
#define MSG_NOSIGNAL 0
#define SHUTDOWN_ERR WSAESHUTDOWN
#define INTERRUPTED_ERR WSAEINTR
#define WOULDBLOCK_ERR WSAEWOULDBLOCK
#define sock_error() WSAGetLastError()
#define sock_err_message(err) sys_err_to_str(err)
int inet_aton(const char* ip, struct in_addr* in_addr);
#endif

View File

@ -204,6 +204,10 @@
RelativePath="..\canvas_utils.cpp"
>
</File>
<File
RelativePath="..\client_net_socket.cpp"
>
</File>
<File
RelativePath="..\cmd_line_parser.cpp"
>
@ -378,6 +382,10 @@
RelativePath="..\threads.cpp"
>
</File>
<File
RelativePath="..\tunnel_channel.cpp"
>
</File>
<File
RelativePath="..\utils.cpp"
>
@ -404,6 +412,10 @@
RelativePath="..\canvas.h"
>
</File>
<File
RelativePath="..\client_net_socket.h"
>
</File>
<File
RelativePath="..\common.h"
>
@ -556,6 +568,10 @@
RelativePath="..\threads.h"
>
</File>
<File
RelativePath="..\tunnel_channel.h"
>
</File>
<File
RelativePath="..\utils.h"
>

View File

@ -39,6 +39,8 @@ RED_COMMON_SRCS = \
$(top_srcdir)/client/red_cairo_canvas.h \
$(top_srcdir)/client/cmd_line_parser.cpp \
$(top_srcdir)/client/cmd_line_parser.h \
$(top_srcdir)/client/client_net_socket.cpp \
$(top_srcdir)/client/client_net_socket.h \
$(top_srcdir)/client/common.h \
$(top_srcdir)/client/cursor_channel.cpp \
$(top_srcdir)/client/cursor_channel.h \
@ -94,9 +96,11 @@ RED_COMMON_SRCS = \
$(top_srcdir)/client/hot_keys.cpp \
$(top_srcdir)/client/hot_keys.h \
$(top_srcdir)/client/threads.cpp \
$(top_srcdir)/client/tunnel_channel.cpp \
$(top_srcdir)/client/tunnel_channel.h \
$(top_srcdir)/client/utils.cpp \
$(top_srcdir)/client/utils.h \
$(top_srcdir)/client/icon.h \
$(top_srcdir)/client/icon.h \
$(NULL)
bin_PROGRAMS = spicec

View File

@ -18,11 +18,28 @@
#ifndef _H_PLATFORM_UTILS
#define _H_PLATFORM_UTILS
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netdb.h>
#ifdef __i386__
#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%esp)": : : "memory")
#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%esp)" : : : "memory")
#else
#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%rsp)": : : "memory")
#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%rsp)" : : : "memory")
#endif
typedef int SOCKET;
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#define closesocket(sock) ::close(sock)
#define SHUTDOWN_ERR EPIPE
#define INTERRUPTED_ERR EINTR
#define WOULDBLOCK_ERR EAGAIN
#define sock_error() errno
#define sock_err_message(err) strerror(err)
#endif

View File

@ -45,8 +45,8 @@
#endif
#define RED_MAGIC (*(uint32_t*)"REDQ")
#define RED_VERSION_MAJOR 1
#define RED_VERSION_MINOR 0
#define RED_VERSION_MAJOR (~(uint32_t)0 - 1)
#define RED_VERSION_MINOR 1
// Encryption & Ticketing Parameters
#define RED_MAX_PASSWORD_LENGTH 60
@ -60,6 +60,7 @@ enum {
RED_CHANNEL_CURSOR,
RED_CHANNEL_PLAYBACK,
RED_CHANNEL_RECORD,
RED_CHANNEL_TUNNEL,
RED_CHANNEL_END
};
@ -675,6 +676,143 @@ typedef struct ATTR_PACKED RedcRecordStartMark {
uint32_t time;
} RedcRecordStartMark;
enum {
RED_TUNNEL_SERVICE_TYPE_INVALID,
RED_TUNNEL_SERVICE_TYPE_GENERIC,
RED_TUNNEL_SERVICE_TYPE_IPP,
};
enum {
RED_TUNNEL_INIT = RED_FIRST_AVAIL_MESSAGE,
RED_TUNNEL_SERVICE_IP_MAP,
RED_TUNNEL_SOCKET_OPEN,
RED_TUNNEL_SOCKET_FIN,
RED_TUNNEL_SOCKET_CLOSE,
RED_TUNNEL_SOCKET_DATA,
RED_TUNNEL_SOCKET_CLOSED_ACK,
RED_TUNNEL_SOCKET_TOKEN,
RED_TUNNEL_MESSAGES_END,
};
typedef struct ATTR_PACKED RedTunnelInit {
uint16_t max_num_of_sockets;
uint32_t max_socket_data_size;
} RedTunnelInit;
enum {
RED_TUNNEL_IP_TYPE_INVALID,
RED_TUNNEL_IP_TYPE_IPv4,
};
typedef struct ATTR_PACKED RedTunnelIpInfo {
uint16_t type;
uint8_t data[0];
} RedTunnelIpInfo;
typedef uint8_t RedTunnelIPv4[4];
typedef struct ATTR_PACKED RedTunnelServiceIpMap {
uint32_t service_id;
RedTunnelIpInfo virtual_ip;
} RedTunnelServiceIpMap;
typedef struct ATTR_PACKED RedTunnelSocketOpen {
uint16_t connection_id;
uint32_t service_id;
uint32_t tokens;
} RedTunnelSocketOpen;
/* connection id must be the first field in msgs directed to a specific connection */
typedef struct ATTR_PACKED RedTunnelSocketFin {
uint16_t connection_id;
} RedTunnelSocketFin;
typedef struct ATTR_PACKED RedTunnelSocketClose {
uint16_t connection_id;
} RedTunnelSocketClose;
typedef struct ATTR_PACKED RedTunnelSocketData {
uint16_t connection_id;
uint8_t data[0];
} RedTunnelSocketData;
typedef struct ATTR_PACKED RedTunnelSocketTokens {
uint16_t connection_id;
uint32_t num_tokens;
} RedTunnelSocketTokens;
typedef struct ATTR_PACKED RedTunnelSocketClosedAck {
uint16_t connection_id;
} RedTunnelSocketClosedAck;
enum {
REDC_TUNNEL_SERVICE_ADD = REDC_FIRST_AVAIL_MESSAGE,
REDC_TUNNEL_SERVICE_REMOVE,
REDC_TUNNEL_SOCKET_OPEN_ACK,
REDC_TUNNEL_SOCKET_OPEN_NACK,
REDC_TUNNEL_SOCKET_FIN,
REDC_TUNNEL_SOCKET_CLOSED,
REDC_TUNNEL_SOCKET_CLOSED_ACK,
REDC_TUNNEL_SOCKET_DATA,
REDC_TUNNEL_SOCKET_TOKEN,
REDC_TUNNEL_MESSGES_END,
};
typedef struct ATTR_PACKED RedcTunnelAddGenericService {
uint32_t type;
uint32_t id;
uint32_t group;
uint32_t port;
uint32_t name;
uint32_t description;
} RedcTunnelAddGenericService;
typedef struct ATTR_PACKED RedcTunnelAddPrintService {
RedcTunnelAddGenericService base;
RedTunnelIpInfo ip;
} RedcTunnelAddPrintService;
typedef struct ATTR_PACKED RedcTunnelRemoveService {
uint32_t id;
} RedcTunnelRemoveService;
/* connection id must be the first field in msgs directed to a specific connection */
typedef struct ATTR_PACKED RedcTunnelSocketOpenAck {
uint16_t connection_id;
uint32_t tokens;
} RedcTunnelSocketOpenAck;
typedef struct ATTR_PACKED RedcTunnelSocketOpenNack {
uint16_t connection_id;
} RedcTunnelSocketOpenNack;
typedef struct ATTR_PACKED RedcTunnelSocketData {
uint16_t connection_id;
uint8_t data[0];
} RedcTunnelSocketData;
typedef struct ATTR_PACKED RedcTunnelSocketFin {
uint16_t connection_id;
} RedcTunnelSocketFin;
typedef struct ATTR_PACKED RedcTunnelSocketClosed {
uint16_t connection_id;
} RedcTunnelSocketClosed;
typedef struct ATTR_PACKED RedcTunnelSocketClosedAck {
uint16_t connection_id;
} RedcTunnelSocketClosedAck;
typedef struct ATTR_PACKED RedcTunnelSocketTokens {
uint16_t connection_id;
uint32_t num_tokens;
} RedcTunnelSocketTokens;
#undef ATTR_PACKED
#ifndef __GNUC__

View File

@ -105,6 +105,11 @@ AC_SUBST(LOG4CPP_CFLAGS)
AC_SUBST(LOG4CPP_LIBS)
SPICE_REQUIRES+=" log4cpp"
PKG_CHECK_MODULES(SLIRP, slirp)
AC_SUBST(SLIRP_CFLAGS)
AC_SUBST(SLIRP_LIBS)
SPICE_REQUIRES+=" slirp"
PKG_CHECK_MODULES(QCAIRO, qcairo >= 1.4.6)
AC_SUBST(QCAIRO_CFLAGS)
AC_SUBST(QCAIRO_LIBS)

View File

@ -9,6 +9,7 @@ INCLUDES = \
$(LOG4CPP_CFLAGS) \
$(SSL_CFLAGS) \
$(CELT051_CFLAGS) \
$(SLIRP_CFLAGS) \
-DCAIRO_CANVAS_IMAGE_CACHE \
-DRED_STATISTICS \
$(WARN_CFLAGS) \
@ -40,6 +41,7 @@ libspice_la_LIBADD = \
$(QCAIRO_LIBS) \
$(SSL_LIBS) \
$(CELT051_LIBS) \
$(SLIRP_LIBS) \
$(LIBRT) \
$(NULL)
@ -64,6 +66,10 @@ libspice_la_SOURCES = \
red_yuv.h \
snd_worker.c \
snd_worker.h \
red_channel.h \
red_channel.c \
red_tunnel_worker.c \
red_tunnel_worker.h \
spice.h \
vd_interface.h \
$(COMMON_SRCS) \

520
server/red_channel.c Normal file
View File

@ -0,0 +1,520 @@
/*
Copyright (C) 2009 Red Hat, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Author:
yhalperi@redhat.com
*/
#include <stdio.h>
#include <stdint.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include "red_channel.h"
static void red_channel_receive(void *data);
static void red_channel_push(RedChannel *channel);
static void red_channel_opaque_push(void *data);
static PipeItem *red_channel_pipe_get(RedChannel *channel);
static void red_channel_pipe_clear(RedChannel *channel);
/* return the number of bytes read. -1 in case of error */
static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size)
{
uint8_t *pos = buf;
while (size) {
int now;
if (peer->shutdown) {
return -1;
}
if ((now = peer->cb_read(peer->ctx, pos, size)) <= 0) {
if (now == 0) {
return -1;
}
ASSERT(now == -1);
if (errno == EAGAIN) {
break;
} else if (errno == EINTR) {
continue;
} else if (errno == EPIPE) {
return -1;
} else {
red_printf("%s", strerror(errno));
return -1;
}
} else {
size -= now;
pos += now;
}
}
return pos - buf;
}
static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
{
int bytes_read;
for (;;) {
int ret_handle;
if (handler->header_pos < sizeof(RedDataHeader)) {
bytes_read = red_peer_receive(peer,
((uint8_t *)&handler->header) + handler->header_pos,
sizeof(RedDataHeader) - handler->header_pos);
if (bytes_read == -1) {
handler->on_error(handler->opaque);
return;
}
handler->header_pos += bytes_read;
if (handler->header_pos != sizeof(RedDataHeader)) {
return;
}
}
if (handler->msg_pos < handler->header.size) {
if (!handler->msg) {
handler->msg = handler->alloc_msg_buf(handler->opaque, &handler->header);
}
bytes_read = red_peer_receive(peer,
handler->msg + handler->msg_pos,
handler->header.size - handler->msg_pos);
if (bytes_read == -1) {
handler->release_msg_buf(handler->opaque, &handler->header, handler->msg);
handler->on_error(handler->opaque);
return;
}
handler->msg_pos += bytes_read;
if (handler->msg_pos != handler->header.size) {
return;
}
}
ret_handle = handler->handle_message(handler->opaque, &handler->header,
handler->msg);
handler->msg_pos = 0;
handler->msg = NULL;
handler->header_pos = 0;
if (!ret_handle) {
handler->on_error(handler->opaque);
return;
}
}
}
static struct iovec *__iovec_skip(struct iovec vec[], int skip, int *vec_size)
{
struct iovec *now = vec;
while ((skip) && (skip >= now->iov_len)) {
skip -= now->iov_len;
--*vec_size;
now++;
}
now->iov_base = (uint8_t *)now->iov_base + skip;
now->iov_len -= skip;
return now;
}
static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
{
int n;
if (handler->size == 0) {
handler->vec = handler->vec_buf;
handler->size = handler->get_msg_size(handler->opaque);
if (!handler->size) { // nothing to be sent
return;
}
handler->prepare(handler->opaque, handler->vec, &handler->vec_size);
}
for (;;) {
if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) {
switch (errno) {
case EAGAIN:
handler->on_block(handler->opaque);
return;
case EINTR:
continue;
case EPIPE:
handler->on_error(handler->opaque);
return;
default:
red_printf("%s", strerror(errno));
handler->on_error(handler->opaque);
return;
}
} else {
handler->pos += n;
handler->vec = __iovec_skip(handler->vec, n, &handler->vec_size);
if (!handler->vec_size) {
if (handler->pos == handler->size) { // finished writing data
handler->on_msg_done(handler->opaque);
handler->vec = handler->vec_buf;
handler->pos = 0;
handler->size = 0;
return;
} else {
// There wasn't enough place for all the outgoing data in one iovec array.
// Filling the rest of the data.
handler->vec = handler->vec_buf;
handler->prepare(handler->opaque, handler->vec, &handler->vec_size);
}
}
}
}
}
static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size);
static void red_channel_peer_on_error(void *opaque)
{
RedChannel *channel = (RedChannel *)opaque;
channel->disconnect(channel);
}
static int red_channel_peer_get_out_msg_size(void *opaque)
{
RedChannel *channel = (RedChannel *)opaque;
return channel->send_data.size;
}
static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size)
{
RedChannel *channel = (RedChannel *)opaque;
red_channel_fill_iovec(channel, vec, vec_size);
}
static void red_channel_peer_on_out_block(void *opaque)
{
RedChannel *channel = (RedChannel *)opaque;
channel->send_data.blocked = TRUE;
channel->core->set_file_handlers(channel->core, channel->peer->socket,
red_channel_receive, red_channel_opaque_push,
channel);
}
static void red_channel_peer_on_out_msg_done(void *opaque)
{
RedChannel *channel = (RedChannel *)opaque;
channel->send_data.size = 0;
channel->send_data.n_bufs = 0;
channel->send_data.not_sent_buf_head = 0;
if (channel->send_data.item) {
channel->release_item(channel, channel->send_data.item, TRUE);
channel->send_data.item = NULL;
}
if (channel->send_data.blocked) {
channel->send_data.blocked = FALSE;
channel->core->set_file_handlers(channel->core, channel->peer->socket,
red_channel_receive, NULL,
channel);
}
}
RedChannel *red_channel_create(int size, RedsStreamContext *peer, CoreInterface *core,
int migrate, int handle_acks,
channel_configure_socket_proc config_socket,
channel_disconnect_proc disconnect,
channel_handle_message_proc handle_message,
channel_alloc_msg_recv_buf_proc alloc_recv_buf,
channel_release_msg_recv_buf_proc release_recv_buf,
channel_send_pipe_item_proc send_item,
channel_release_pipe_item_proc release_item)
{
RedChannel *channel;
ASSERT(size >= sizeof(*channel));
ASSERT(config_socket && disconnect && handle_message && alloc_recv_buf &&
release_item);
if (!(channel = malloc(size))) {
red_printf("malloc failed");
goto error1;
}
memset(channel, 0, size);
channel->handle_acks = handle_acks;
channel->disconnect = disconnect;
channel->send_item = send_item;
channel->release_item = release_item;
channel->peer = peer;
channel->core = core;
channel->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked +
// block flags)
channel->ack_data.client_generation = ~0;
channel->migrate = migrate;
ring_init(&channel->pipe);
channel->incoming.opaque = channel;
channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
channel->incoming.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf;
channel->incoming.handle_message = (handle_message_proc)handle_message;
channel->incoming.on_error = red_channel_peer_on_error;
channel->outgoing.opaque = channel;
channel->outgoing.pos = 0;
channel->outgoing.size = 0;
channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size;
channel->outgoing.prepare = red_channel_peer_prepare_out_msg;
channel->outgoing.on_block = red_channel_peer_on_out_block;
channel->outgoing.on_error = red_channel_peer_on_error;
channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done;
if (!config_socket(channel)) {
goto error2;
}
channel->core->set_file_handlers(channel->core, channel->peer->socket,
red_channel_receive, NULL, channel);
return channel;
error2:
free(channel);
error1:
peer->cb_free(peer);
return NULL;
}
void red_channel_destroy(RedChannel *channel)
{
if (!channel) {
return;
}
red_channel_pipe_clear(channel);
channel->core->set_file_handlers(channel->core, channel->peer->socket,
NULL, NULL, NULL);
channel->peer->cb_free(channel->peer);
free(channel);
}
void red_channel_shutdown(RedChannel *channel)
{
red_printf("");
if (!channel->peer->shutdown) {
channel->core->set_file_handlers(channel->core, channel->peer->socket,
red_channel_receive, NULL, channel);
red_channel_pipe_clear(channel);
shutdown(channel->peer->socket, SHUT_RDWR);
channel->peer->shutdown = TRUE;
}
}
void red_channel_init_outgoing_messages_window(RedChannel *channel)
{
channel->ack_data.messages_window = 0;
red_channel_push(channel);
}
int red_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg)
{
switch (header->type) {
case REDC_ACK_SYNC:
if (header->size != sizeof(uint32_t)) {
red_printf("bad message size");
return FALSE;
}
channel->ack_data.client_generation = *(uint32_t *)(msg);
break;
case REDC_ACK:
if (channel->ack_data.client_generation == channel->ack_data.generation) {
channel->ack_data.messages_window -= CLIENT_ACK_WINDOW;
red_channel_push(channel);
}
break;
default:
red_printf("invalid message type %u", header->type);
return FALSE;
}
return TRUE;
}
static void red_channel_receive(void *data)
{
RedChannel *channel = (RedChannel *)data;
red_peer_handle_incoming(channel->peer, &channel->incoming);
}
static void inline __red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
{
int pos = channel->send_data.n_bufs++;
ASSERT(pos < MAX_SEND_BUFS);
channel->send_data.bufs[pos].size = size;
channel->send_data.bufs[pos].data = data;
}
void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
{
__red_channel_add_buf(channel, data, size);
channel->send_data.header.size += size;
}
void red_channel_reset_send_data(RedChannel *channel)
{
channel->send_data.n_bufs = 0;
channel->send_data.header.size = 0;
channel->send_data.header.sub_list = 0;
++channel->send_data.header.serial;
__red_channel_add_buf(channel, (void *)&channel->send_data.header, sizeof(RedDataHeader));
}
void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item)
{
channel->send_data.header.type = msg_type;
channel->send_data.item = item;
}
static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size)
{
BufDescriptor *buf = channel->send_data.bufs + channel->send_data.not_sent_buf_head;
ASSERT(channel->send_data.not_sent_buf_head < channel->send_data.n_bufs);
*vec_size = 0;
do {
vec[*vec_size].iov_base = buf->data;
vec[*vec_size].iov_len = buf->size;
(*vec_size)++;
buf++;
channel->send_data.not_sent_buf_head++;
} while (((*vec_size) < MAX_SEND_VEC) &&
(channel->send_data.not_sent_buf_head != channel->send_data.n_bufs));
}
static void red_channel_send(RedChannel *channel)
{
red_peer_handle_outgoing(channel->peer, &channel->outgoing);
}
void red_channel_begin_send_massage(RedChannel *channel)
{
channel->send_data.size = channel->send_data.header.size + sizeof(RedDataHeader);
channel->ack_data.messages_window++;
red_channel_send(channel);
}
static void red_channel_push(RedChannel *channel)
{
PipeItem *pipe_item;
if (!channel->during_send) {
channel->during_send = TRUE;
} else {
return;
}
if (channel->send_data.blocked) {
red_channel_send(channel);
}
while ((pipe_item = red_channel_pipe_get(channel))) {
channel->send_item(channel, pipe_item);
}
channel->during_send = FALSE;
}
static void red_channel_opaque_push(void *data)
{
red_channel_push((RedChannel *)data);
}
uint64_t red_channel_get_message_serial(RedChannel *channel)
{
return channel->send_data.header.serial;
}
void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
{
ring_item_init(&item->link);
item->type = type;
}
void red_channel_pipe_add(RedChannel *channel, PipeItem *item)
{
ASSERT(channel);
channel->pipe_size++;
ring_add(&channel->pipe, &item->link);
red_channel_push(channel);
}
int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item)
{
return ring_item_is_linked(&item->link);
}
void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item)
{
ring_remove(&item->link);
}
void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item)
{
ASSERT(channel);
channel->pipe_size++;
ring_add_before(&item->link, &channel->pipe);
red_channel_push(channel);
}
void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type)
{
PipeItem *item = malloc(sizeof(*item));
if (!item) {
red_error("malloc failed");
}
red_channel_pipe_item_init(channel, item, pipe_item_type);
red_channel_pipe_add(channel, item);
red_channel_push(channel);
}
static PipeItem *red_channel_pipe_get(RedChannel *channel)
{
PipeItem *item;
if (!channel || channel->send_data.blocked ||
(channel->handle_acks && (channel->ack_data.messages_window > CLIENT_ACK_WINDOW * 2)) ||
!(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
return NULL;
}
--channel->pipe_size;
ring_remove(&item->link);
return item;
}
static void red_channel_pipe_clear(RedChannel *channel)
{
PipeItem *item;
if (channel->send_data.item) {
channel->release_item(channel, channel->send_data.item, TRUE);
}
while ((item = (PipeItem *)ring_get_head(&channel->pipe))) {
ring_remove(&item->link);
channel->release_item(channel, item, FALSE);
}
}

182
server/red_channel.h Normal file
View File

@ -0,0 +1,182 @@
/*
Copyright (C) 2009 Red Hat, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Author:
yhalperi@redhat.com
*/
#ifndef _H_RED_CHANNEL
#define _H_RED_CHANNEL
#include "red_common.h"
#include "reds.h"
#include "vd_interface.h"
#include "ring.h"
#define MAX_SEND_BUFS 1000
#define MAX_SEND_VEC 50
#define CLIENT_ACK_WINDOW 20
/* Basic interface for channels, without using the RedChannel interface.
The intention is to move towards one channel interface gradually.
At the final stage, this interface shouldn't be exposed. Only RedChannel will use it. */
typedef int (*handle_message_proc)(void *opaque,
RedDataHeader *header, uint8_t *msg);
typedef uint8_t *(*alloc_msg_recv_buf_proc)(void *opaque, RedDataHeader *msg_header);
typedef void (*release_msg_recv_buf_proc)(void *opaque,
RedDataHeader *msg_header, uint8_t *msg);
typedef void (*on_incoming_error_proc)(void *opaque);
typedef struct IncomingHandler {
void *opaque;
RedDataHeader header;
uint32_t header_pos;
uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
uint32_t msg_pos;
handle_message_proc handle_message;
alloc_msg_recv_buf_proc alloc_msg_buf;
on_incoming_error_proc on_error; // recv error or handle_message error
release_msg_recv_buf_proc release_msg_buf; // for errors
} IncomingHandler;
typedef int (*get_outgoing_msg_size_proc)(void *opaque);
typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size);
typedef void (*on_outgoing_error_proc)(void *opaque);
typedef void (*on_outgoing_block_proc)(void *opaque);
typedef void (*on_outgoing_msg_done_proc)(void *opaque);
typedef struct OutgoingHandler {
void *opaque;
struct iovec vec_buf[MAX_SEND_VEC];
int vec_size;
struct iovec *vec;
int pos;
int size;
get_outgoing_msg_size_proc get_msg_size;
prepare_outgoing_proc prepare;
on_outgoing_error_proc on_error;
on_outgoing_block_proc on_block;
on_outgoing_msg_done_proc on_msg_done;
} OutgoingHandler;
/* Red Channel interface */
typedef struct BufDescriptor {
uint32_t size;
uint8_t *data;
} BufDescriptor;
typedef struct PipeItem {
RingItem link;
int type;
} PipeItem;
typedef struct RedChannel RedChannel;
typedef uint8_t *(*channel_alloc_msg_recv_buf_proc)(RedChannel *channel,
RedDataHeader *msg_header);
typedef int (*channel_handle_message_proc)(RedChannel *channel,
RedDataHeader *header, uint8_t *msg);
typedef void (*channel_release_msg_recv_buf_proc)(RedChannel *channel,
RedDataHeader *msg_header, uint8_t *msg);
typedef void (*channel_disconnect_proc)(RedChannel *channel);
typedef int (*channel_configure_socket_proc)(RedChannel *channel);
typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item);
typedef void (*channel_release_pipe_item_proc)(RedChannel *channel,
PipeItem *item, int item_pushed);
struct RedChannel {
RedsStreamContext *peer;
CoreInterface *core;
int migrate;
int handle_acks;
struct {
uint32_t generation;
uint32_t client_generation;
uint32_t messages_window;
} ack_data;
Ring pipe;
uint32_t pipe_size;
struct {
RedDataHeader header;
union {
RedSetAck ack;
RedMigrate migrate;
} u;
uint32_t n_bufs;
BufDescriptor bufs[MAX_SEND_BUFS];
uint32_t size;
uint32_t not_sent_buf_head;
PipeItem *item;
int blocked;
} send_data;
OutgoingHandler outgoing;
IncomingHandler incoming;
channel_disconnect_proc disconnect;
channel_send_pipe_item_proc send_item;
channel_release_pipe_item_proc release_item;
int during_send;
};
/* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
explicitly destroy the channel */
RedChannel *red_channel_create(int size, RedsStreamContext *peer, CoreInterface *core,
int migrate, int handle_acks,
channel_configure_socket_proc config_socket,
channel_disconnect_proc disconnect,
channel_handle_message_proc handle_message,
channel_alloc_msg_recv_buf_proc alloc_recv_buf,
channel_release_msg_recv_buf_proc release_recv_buf,
channel_send_pipe_item_proc send_item,
channel_release_pipe_item_proc release_item);
void red_channel_destroy(RedChannel *channel);
void red_channel_shutdown(RedChannel *channel);
/* should be called when a new channel is ready to send messages */
void red_channel_init_outgoing_messages_window(RedChannel *channel);
/* handles general channel msgs from the client */
int red_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg);
/* when preparing send_data: should call reset, then init and then add_buf per buffer that is
being sent */
void red_channel_reset_send_data(RedChannel *channel);
void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item);
void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size);
uint64_t red_channel_get_message_serial(RedChannel *channel);
/* when sending a msg. should first call red_channel_begin_send_massage */
void red_channel_begin_send_massage(RedChannel *channel);
void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type);
void red_channel_pipe_add(RedChannel *channel, PipeItem *item);
int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item);
void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item);
void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item);
/* for types that use this routine -> the pipe item should be freed */
void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type);
#endif

3510
server/red_tunnel_worker.c Normal file

File diff suppressed because it is too large Load Diff

29
server/red_tunnel_worker.h Executable file
View File

@ -0,0 +1,29 @@
/*
Copyright (C) 2009 Red Hat, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Author:
yhalperi@redhat.com
*/
#ifndef _H_RED_TUNNEL_WORKER
#define _H_RED_TUNNEL_WORKER
#include "vd_interface.h"
void *red_tunnel_attach(CoreInterface *core_interface, NetWireInterface *vlan_interface);
#endif

View File

@ -47,6 +47,7 @@
#include "red_common.h"
#include "red_dispatcher.h"
#include "snd_worker.h"
#include "red_tunnel_worker.h"
#include "reds_stat.h"
#include "stat.h"
#include "ring.h"
@ -83,6 +84,7 @@ static pthread_mutex_t *lock_cs;
static long *lock_count;
uint32_t streaming_video = TRUE;
image_compression_t image_compression = IMAGE_COMPRESS_AUTO_GLZ;
void *red_tunnel = NULL;
int agent_mouse = TRUE;
static void openssl_init();
@ -3921,7 +3923,7 @@ int __attribute__ ((visibility ("default"))) spice_parse_args(const char *in_arg
// All SSL parameters should be either on or off.
if (ssl_port != ssl_key || ssl_key != ssl_certs || ssl_certs != ssl_cafile ||
ssl_cafile != ssl_dhfile || ssl_dhfile != ssl_ciphersuite) {
ssl_cafile != ssl_dhfile || ssl_dhfile != ssl_ciphersuite) {
goto error;
}
@ -4775,6 +4777,19 @@ static void interface_change_notifier(void *opaque, VDInterface *interface,
return;
}
attach_to_red_agent((VDIPortInterface *)interface);
} else if (strcmp(interface->type, VD_INTERFACE_NET_WIRE) == 0) {
NetWireInterface * net_wire = (NetWireInterface *)interface;
red_printf("VD_INTERFACE_NET_WIRE");
if (red_tunnel) {
red_printf("net wire already attached");
return;
}
if (interface->major_version != VD_INTERFACE_NET_WIRE_MAJOR ||
interface->minor_version < VD_INTERFACE_NET_WIRE_MINOR) {
red_printf("unsuported net wire interface");
return;
}
red_tunnel = red_tunnel_attach(core, net_wire);
}
break;
case VD_INTERFACE_REMOVING:

View File

@ -27,6 +27,9 @@ typedef struct RedsStreamContext {
int socket;
/* set it to TRUE if you shutdown the socket. shutdown read doesn't work as accepted -
receive may return data afterwards. check the flag before calling receive*/
int shutdown;
SSL *ssl;
int (*cb_write)(void *, void *, int);

View File

@ -330,5 +330,23 @@ struct VDIPortInterface {
int (*read)(VDIPortInterface *port, VDObjectRef plug, uint8_t *buf, int len);
};
#define VD_INTERFACE_NET_WIRE "net_wire"
#define VD_INTERFACE_NET_WIRE_MAJOR 1
#define VD_INTERFACE_NET_WIRE_MINOR 1
typedef struct NetWireInterface NetWireInterface;
typedef void (*net_wire_packet_route_proc_t)(void *opaque, const uint8_t *pkt, int pkt_len);
struct NetWireInterface {
VDInterface base;
struct in_addr (*get_ip)(NetWireInterface *vlan);
int (*can_send_packet)(NetWireInterface *vlan);
void (*send_packet)(NetWireInterface *vlan, const uint8_t *buf, int size);
VDObjectRef (*register_route_packet)(NetWireInterface *vlan, net_wire_packet_route_proc_t proc,
void *opaque);
void (*unregister_route_packet)(NetWireInterface *vlan, VDObjectRef proc);
};
#endif