mirror of
https://gitlab.uni-freiburg.de/opensourcevdi/win32-vd_agent
synced 2026-01-01 20:56:39 +00:00
vdagent: add message_queue for messages written to pipe
This is only part of the message corruption solution. The other part is fixing virtio-serial / spice-qemu-char throttling code. -replace write_[lock/unlock/completion] calls with [new/enqueue]_message -remove clipboard specific _out_msg_* class members -remove ugly loop - while (a->_out_msg && a->write_clipboard()); -add _message_mutex for message queue -fix pending_write race using _write_mutex -TODO: enqueue large message without dividing it to chunks in advance rhbz #846427
This commit is contained in:
parent
04a28a35ed
commit
4e95b73ecf
@ -94,10 +94,10 @@ private:
|
||||
enum { CONTROL_STOP, CONTROL_DESKTOP_SWITCH };
|
||||
void set_control_event(int control_command);
|
||||
void handle_control_event();
|
||||
uint8_t* write_lock(DWORD bytes = 0);
|
||||
void write_unlock(DWORD bytes = 0);
|
||||
VDPipeMessage* new_message(DWORD bytes = 0);
|
||||
void enqueue_message(VDPipeMessage* msg);
|
||||
bool write_message(uint32_t type, uint32_t size, void* data);
|
||||
bool write_clipboard();
|
||||
bool write_clipboard(VDAgentMessage* msg, uint32_t size);
|
||||
bool connect_pipe();
|
||||
bool send_input();
|
||||
void set_display_depth(uint32_t depth);
|
||||
@ -119,9 +119,6 @@ private:
|
||||
HANDLE _clipboard_event;
|
||||
VDAgentMessage* _in_msg;
|
||||
uint32_t _in_msg_pos;
|
||||
VDAgentMessage* _out_msg;
|
||||
uint32_t _out_msg_pos;
|
||||
uint32_t _out_msg_size;
|
||||
bool _pending_input;
|
||||
bool _pending_write;
|
||||
bool _running;
|
||||
@ -131,7 +128,9 @@ private:
|
||||
VDPipeState _pipe_state;
|
||||
mutex_t _write_mutex;
|
||||
mutex_t _control_mutex;
|
||||
mutex_t _message_mutex;
|
||||
std::queue<int> _control_queue;
|
||||
std::queue<VDPipeMessage*> _message_queue;
|
||||
|
||||
bool _logon_desktop;
|
||||
bool _display_setting_initialized;
|
||||
@ -167,9 +166,6 @@ VDAgent::VDAgent()
|
||||
, _clipboard_event (NULL)
|
||||
, _in_msg (NULL)
|
||||
, _in_msg_pos (0)
|
||||
, _out_msg (NULL)
|
||||
, _out_msg_pos (0)
|
||||
, _out_msg_size (0)
|
||||
, _pending_input (false)
|
||||
, _pending_write (false)
|
||||
, _running (false)
|
||||
@ -193,6 +189,7 @@ VDAgent::VDAgent()
|
||||
ZeroMemory(&_pipe_state, sizeof(VDPipeState));
|
||||
MUTEX_INIT(_write_mutex);
|
||||
MUTEX_INIT(_control_mutex);
|
||||
MUTEX_INIT(_message_mutex);
|
||||
|
||||
_singleton = this;
|
||||
}
|
||||
@ -538,7 +535,7 @@ bool VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config, uint32_t port
|
||||
}
|
||||
|
||||
DWORD msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply);
|
||||
reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
|
||||
reply_pipe_msg = new_message(msg_size);
|
||||
if (!reply_pipe_msg) {
|
||||
return false;
|
||||
}
|
||||
@ -553,10 +550,7 @@ bool VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config, uint32_t port
|
||||
reply = (VDAgentReply*)reply_msg->data;
|
||||
reply->type = VD_AGENT_MONITORS_CONFIG;
|
||||
reply->error = display_count ? VD_AGENT_SUCCESS : VD_AGENT_ERROR;
|
||||
write_unlock(msg_size);
|
||||
if (!_pending_write) {
|
||||
write_completion(0, 0, &_pipe_state.write.overlap);
|
||||
}
|
||||
enqueue_message(reply_pipe_msg);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -669,7 +663,7 @@ bool VDAgent::send_announce_capabilities(bool request)
|
||||
uint32_t internal_msg_size = sizeof(VDAgentAnnounceCapabilities) + VD_AGENT_CAPS_BYTES;
|
||||
|
||||
msg_size = VD_MESSAGE_HEADER_SIZE + internal_msg_size;
|
||||
caps_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
|
||||
caps_pipe_msg = new_message(msg_size);
|
||||
if (!caps_pipe_msg) {
|
||||
return false;
|
||||
}
|
||||
@ -694,10 +688,7 @@ bool VDAgent::send_announce_capabilities(bool request)
|
||||
for (uint32_t i = 0 ; i < caps_size; ++i) {
|
||||
vd_printf("%X", caps->caps[i]);
|
||||
}
|
||||
write_unlock(msg_size);
|
||||
if (!_pending_write) {
|
||||
write_completion(0, 0, &_pipe_state.write.overlap);
|
||||
}
|
||||
enqueue_message(caps_pipe_msg);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -750,11 +741,10 @@ bool VDAgent::handle_display_config(VDAgentDisplayConfig* display_config, uint32
|
||||
}
|
||||
|
||||
msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply);
|
||||
reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
|
||||
reply_pipe_msg = new_message(msg_size);
|
||||
if (!reply_pipe_msg) {
|
||||
return false;
|
||||
}
|
||||
|
||||
reply_pipe_msg->type = VD_AGENT_COMMAND;
|
||||
reply_pipe_msg->opaque = port;
|
||||
reply_pipe_msg->size = sizeof(VDAgentMessage) + sizeof(VDAgentReply);
|
||||
@ -766,10 +756,7 @@ bool VDAgent::handle_display_config(VDAgentDisplayConfig* display_config, uint32
|
||||
reply = (VDAgentReply*)reply_msg->data;
|
||||
reply->type = VD_AGENT_DISPLAY_CONFIG;
|
||||
reply->error = VD_AGENT_SUCCESS;
|
||||
write_unlock(msg_size);
|
||||
if (!_pending_write) {
|
||||
write_completion(0, 0, &_pipe_state.write.overlap);
|
||||
}
|
||||
enqueue_message(reply_pipe_msg);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -778,16 +765,13 @@ bool VDAgent::handle_control(VDPipeMessage* msg)
|
||||
switch (msg->type) {
|
||||
case VD_AGENT_RESET: {
|
||||
vd_printf("Agent reset");
|
||||
VDPipeMessage* ack = (VDPipeMessage*)write_lock(sizeof(VDPipeMessage));
|
||||
VDPipeMessage* ack = new_message(sizeof(VDPipeMessage));
|
||||
if (!ack) {
|
||||
return false;
|
||||
}
|
||||
ack->type = VD_AGENT_RESET_ACK;
|
||||
ack->opaque = msg->opaque;
|
||||
write_unlock(sizeof(VDPipeMessage));
|
||||
if (!_pending_write) {
|
||||
write_completion(0, 0, &_pipe_state.write.overlap);
|
||||
}
|
||||
enqueue_message(ack);
|
||||
break;
|
||||
}
|
||||
case VD_AGENT_SESSION_LOGON:
|
||||
@ -816,30 +800,30 @@ bool VDAgent::handle_control(VDPipeMessage* msg)
|
||||
|
||||
//FIXME: division to max size chunks should NOT be here, but in the service
|
||||
// here we should write the max possible size to the pipe
|
||||
bool VDAgent::write_clipboard()
|
||||
bool VDAgent::write_clipboard(VDAgentMessage* msg, uint32_t size)
|
||||
{
|
||||
ASSERT(_out_msg);
|
||||
DWORD n = MIN(sizeof(VDPipeMessage) + _out_msg_size - _out_msg_pos, VD_AGENT_MAX_DATA_SIZE);
|
||||
VDPipeMessage* pipe_msg = (VDPipeMessage*)write_lock(n);
|
||||
if (!pipe_msg) {
|
||||
return false;
|
||||
uint32_t pos = 0;
|
||||
bool ret = true;
|
||||
|
||||
ASSERT(msg && size);
|
||||
//FIXME: do it smarter - no loop, no memcopy
|
||||
MUTEX_LOCK(_message_mutex);
|
||||
while (pos < size) {
|
||||
DWORD n = MIN(sizeof(VDPipeMessage) + size - pos, VD_AGENT_MAX_DATA_SIZE);
|
||||
VDPipeMessage* pipe_msg = new_message(n);
|
||||
if (!pipe_msg) {
|
||||
ret = false;
|
||||
break;
|
||||
}
|
||||
pipe_msg->type = VD_AGENT_COMMAND;
|
||||
pipe_msg->opaque = VDP_CLIENT_PORT;
|
||||
pipe_msg->size = n - sizeof(VDPipeMessage);
|
||||
memcpy(pipe_msg->data, (char*)msg + pos, n - sizeof(VDPipeMessage));
|
||||
enqueue_message(pipe_msg);
|
||||
pos += (n - sizeof(VDPipeMessage));
|
||||
}
|
||||
pipe_msg->type = VD_AGENT_COMMAND;
|
||||
pipe_msg->opaque = VDP_CLIENT_PORT;
|
||||
pipe_msg->size = n - sizeof(VDPipeMessage);
|
||||
memcpy(pipe_msg->data, (char*)_out_msg + _out_msg_pos, n - sizeof(VDPipeMessage));
|
||||
write_unlock(n);
|
||||
if (!_pending_write) {
|
||||
write_completion(0, 0, &_pipe_state.write.overlap);
|
||||
}
|
||||
_out_msg_pos += (n - sizeof(VDPipeMessage));
|
||||
if (_out_msg_pos == _out_msg_size) {
|
||||
delete[] (uint8_t *)_out_msg;
|
||||
_out_msg = NULL;
|
||||
_out_msg_size = 0;
|
||||
_out_msg_pos = 0;
|
||||
}
|
||||
return true;
|
||||
MUTEX_UNLOCK(_message_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL)
|
||||
@ -847,7 +831,7 @@ bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL)
|
||||
VDPipeMessage* pipe_msg;
|
||||
VDAgentMessage* msg;
|
||||
|
||||
pipe_msg = (VDPipeMessage*)write_lock(VD_MESSAGE_HEADER_SIZE + size);
|
||||
pipe_msg = new_message(VD_MESSAGE_HEADER_SIZE + size);
|
||||
if (!pipe_msg) {
|
||||
return false;
|
||||
}
|
||||
@ -862,10 +846,7 @@ bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL)
|
||||
if (size && data) {
|
||||
memcpy(msg->data, data, size);
|
||||
}
|
||||
write_unlock(VD_MESSAGE_HEADER_SIZE + size);
|
||||
if (!_pending_write) {
|
||||
write_completion(0, 0, &_pipe_state.write.overlap);
|
||||
}
|
||||
enqueue_message(pipe_msg);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -993,6 +974,8 @@ bool VDAgent::handle_clipboard_grab(VDAgentClipboardGrab* clipboard_grab, uint32
|
||||
// VD_AGENT_CLIPBOARD_NONE and no data, so the client will know the request failed.
|
||||
bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_request)
|
||||
{
|
||||
VDAgentMessage* msg;
|
||||
uint32_t msg_size;
|
||||
UINT format;
|
||||
HANDLE clip_data;
|
||||
uint8_t* new_data = NULL;
|
||||
@ -1008,10 +991,6 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques
|
||||
vd_printf("Unsupported clipboard type %u", clipboard_request->type);
|
||||
return false;
|
||||
}
|
||||
if (_out_msg) {
|
||||
vd_printf("clipboard change is already pending");
|
||||
return false;
|
||||
}
|
||||
if (!IsClipboardFormatAvailable(format) || !OpenClipboard(_hwnd)) {
|
||||
return false;
|
||||
}
|
||||
@ -1047,14 +1026,13 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques
|
||||
CloseClipboard();
|
||||
return false;
|
||||
}
|
||||
_out_msg_pos = 0;
|
||||
_out_msg_size = sizeof(VDAgentMessage) + sizeof(VDAgentClipboard) + new_size;
|
||||
_out_msg = (VDAgentMessage*)new uint8_t[_out_msg_size];
|
||||
_out_msg->protocol = VD_AGENT_PROTOCOL;
|
||||
_out_msg->type = VD_AGENT_CLIPBOARD;
|
||||
_out_msg->opaque = 0;
|
||||
_out_msg->size = (uint32_t)(sizeof(VDAgentClipboard) + new_size);
|
||||
VDAgentClipboard* clipboard = (VDAgentClipboard*)_out_msg->data;
|
||||
msg_size = sizeof(VDAgentMessage) + sizeof(VDAgentClipboard) + new_size;
|
||||
msg = (VDAgentMessage*)new uint8_t[msg_size];
|
||||
msg->protocol = VD_AGENT_PROTOCOL;
|
||||
msg->type = VD_AGENT_CLIPBOARD;
|
||||
msg->opaque = 0;
|
||||
msg->size = (uint32_t)(sizeof(VDAgentClipboard) + new_size);
|
||||
VDAgentClipboard* clipboard = (VDAgentClipboard*)msg->data;
|
||||
clipboard->type = clipboard_request->type;
|
||||
|
||||
switch (clipboard_request->type) {
|
||||
@ -1070,7 +1048,8 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques
|
||||
break;
|
||||
}
|
||||
CloseClipboard();
|
||||
write_clipboard();
|
||||
write_clipboard(msg, msg_size);
|
||||
delete[] (uint8_t *)msg;
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1281,8 +1260,8 @@ VOID CALLBACK VDAgent::write_completion(DWORD err, DWORD bytes, LPOVERLAPPED ove
|
||||
{
|
||||
VDAgent* a = _singleton;
|
||||
VDPipeState* ps = &a->_pipe_state;
|
||||
DWORD size_left;
|
||||
|
||||
a->_pending_write = false;
|
||||
if (!a->_running) {
|
||||
return;
|
||||
}
|
||||
@ -1291,40 +1270,57 @@ VOID CALLBACK VDAgent::write_completion(DWORD err, DWORD bytes, LPOVERLAPPED ove
|
||||
a->_running = false;
|
||||
return;
|
||||
}
|
||||
if (!a->write_lock()) {
|
||||
a->_running = false;
|
||||
return;
|
||||
}
|
||||
MUTEX_LOCK(a->_write_mutex);
|
||||
ps->write.start += bytes;
|
||||
if (ps->write.start == ps->write.end) {
|
||||
ps->write.start = ps->write.end = 0;
|
||||
//DEBUG
|
||||
while (a->_out_msg && a->write_clipboard());
|
||||
} else if (WriteFileEx(ps->pipe, ps->write.data + ps->write.start,
|
||||
ps->write.end - ps->write.start, overlap, write_completion)) {
|
||||
a->_pending_write = true;
|
||||
} else {
|
||||
vd_printf("WriteFileEx() failed: %lu", GetLastError());
|
||||
a->_running = false;
|
||||
}
|
||||
a->write_unlock();
|
||||
|
||||
MUTEX_LOCK(a->_message_mutex);
|
||||
size_left = sizeof(a->_pipe_state.write.data) - a->_pipe_state.write.end;
|
||||
while (!a->_message_queue.empty()) {
|
||||
VDPipeMessage* msg = a->_message_queue.front();
|
||||
DWORD size = sizeof(VDPipeMessage) + msg->size;
|
||||
|
||||
if (size > size_left) {
|
||||
break;
|
||||
}
|
||||
a->_message_queue.pop();
|
||||
memcpy(a->_pipe_state.write.data + a->_pipe_state.write.end, msg, size);
|
||||
a->_pipe_state.write.end += size;
|
||||
size_left -= size;
|
||||
delete msg;
|
||||
}
|
||||
MUTEX_UNLOCK(a->_message_mutex);
|
||||
|
||||
if (ps->write.start < ps->write.end) {
|
||||
if (WriteFileEx(ps->pipe, ps->write.data + ps->write.start,
|
||||
ps->write.end - ps->write.start, overlap, write_completion)) {
|
||||
a->_pending_write = true;
|
||||
} else {
|
||||
vd_printf("WriteFileEx() failed: %lu", GetLastError());
|
||||
a->_running = false;
|
||||
}
|
||||
} else {
|
||||
a->_pending_write = false;
|
||||
}
|
||||
MUTEX_UNLOCK(a->_write_mutex);
|
||||
}
|
||||
|
||||
uint8_t* VDAgent::write_lock(DWORD bytes)
|
||||
VDPipeMessage* VDAgent::new_message(DWORD bytes)
|
||||
{
|
||||
return (VDPipeMessage*)(new char[bytes]);
|
||||
}
|
||||
|
||||
void VDAgent::enqueue_message(VDPipeMessage* msg)
|
||||
{
|
||||
MUTEX_LOCK(_message_mutex);
|
||||
_message_queue.push(msg);
|
||||
MUTEX_UNLOCK(_message_mutex);
|
||||
MUTEX_LOCK(_write_mutex);
|
||||
if (_pipe_state.write.end + bytes <= sizeof(_pipe_state.write.data)) {
|
||||
return &_pipe_state.write.data[_pipe_state.write.end];
|
||||
} else {
|
||||
MUTEX_UNLOCK(_write_mutex);
|
||||
vd_printf("write buffer is full");
|
||||
return NULL;
|
||||
if (!_pending_write) {
|
||||
write_completion(0, 0, &_pipe_state.write.overlap);
|
||||
}
|
||||
}
|
||||
|
||||
void VDAgent::write_unlock(DWORD bytes)
|
||||
{
|
||||
_pipe_state.write.end += bytes;
|
||||
MUTEX_UNLOCK(_write_mutex);
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user