interprocess communication

This commit is contained in:
pigeatgarlic 2024-04-04 03:56:53 -07:00
parent 3f6c88419f
commit e241d90586
5 changed files with 79 additions and 84 deletions

View File

@ -49,6 +49,8 @@ configure_file("${CMAKE_SOURCE_DIR}/src/version.h.in" version.h @ONLY)
include_directories("${CMAKE_CURRENT_BINARY_DIR}") # required for importing version.h
set(SUNSHINE_TARGET_FILES
"${CMAKE_SOURCE_DIR}/src/interprocess.h"
"${CMAKE_SOURCE_DIR}/src/interprocess.cpp"
"${CMAKE_SOURCE_DIR}/src/cbs.cpp"
"${CMAKE_SOURCE_DIR}/src/utility.h"
"${CMAKE_SOURCE_DIR}/src/config.h"

View File

@ -8,7 +8,8 @@ add_executable(sunshine ${SUNSHINE_TARGET_FILES})
add_executable(parent
"${CMAKE_SOURCE_DIR}/src/parent.cpp"
"${CMAKE_SOURCE_DIR}/src/interprocess.h"
"${CMAKE_SOURCE_DIR}/src/interprocess.cpp")
"${CMAKE_SOURCE_DIR}/src/interprocess.cpp"
)
set_target_properties(parent PROPERTIES CXX_STANDARD 17)
target_link_libraries(parent
${CMAKE_THREAD_LIBS_INIT}
@ -64,15 +65,3 @@ set_source_files_properties("${CMAKE_SOURCE_DIR}/third-party/ViGEmClient/src/ViG
PROPERTIES
COMPILE_DEFINITIONS "UNICODE=1;ERROR_INVALID_DEVICE_OBJECT_PARAMETER=650"
COMPILE_FLAGS ${VIGEM_COMPILE_FLAGS})
# src/nvhttp
string(TOUPPER "x${CMAKE_BUILD_TYPE}" BUILD_TYPE)
if("${BUILD_TYPE}" STREQUAL "XDEBUG")
if(WIN32)
set_source_files_properties("${CMAKE_SOURCE_DIR}/src/nvhttp.cpp"
DIRECTORY "${CMAKE_SOURCE_DIR}" "${CMAKE_SOURCE_DIR}/tests"
PROPERTIES COMPILE_FLAGS -O2)
endif()
else()
add_definitions(-DNDEBUG)
endif()

View File

@ -5,6 +5,9 @@
#include "interprocess.h"
#include <thread>
#include <stdio.h>
#include <iostream>
#include <boost/interprocess/sync/scoped_lock.hpp>
using namespace boost::interprocess;
@ -35,6 +38,33 @@ int queue_size(int* queue) {
return i;
}
int find_available_slot(int* orders) {
int available = -1;
// find available packet slot
for (int k = 0; k < QUEUE_SIZE; k++) {
int fnd = 0;
int* copy = orders;
int j = 0;
while ( *copy != -1 && j != QUEUE_SIZE) {
if (*copy == k) {
fnd = 1;
break;
}
j++;
copy++;
}
if (!fnd) {
available = k;
break;
}
}
return available;
}
void
push_audio_packet(SharedMemory* memory, void* data, int size){
@ -44,28 +74,8 @@ push_audio_packet(SharedMemory* memory, void* data, int size){
scoped_lock<interprocess_mutex> lock(memory->lock);
int available = -1;
// find available packet slot
for (int k = 0; k < QUEUE_SIZE; k++) {
int fnd = 0;
int j = 0;
while ( memory->audio_order[j] != -1 && j != QUEUE_SIZE) {
if (memory->audio_order[j] == k) {
fnd = 1;
break;
}
j++;
}
if (fnd)
continue;
available = k;
}
int available = find_available_slot(memory->audio_order);
memory->audio_order[queue_size(memory->audio_order)] = available;
Packet* block = &memory->audio[available];
memcpy(block->data,data,size);
@ -79,32 +89,12 @@ push_video_packet(SharedMemory* memory,
int size,
VideoMetadata metadata){
// wait while queue is full
while (queue_size(memory->audio_order) == QUEUE_SIZE)
while (queue_size(memory->video_order) == QUEUE_SIZE)
std::this_thread::sleep_for(1ms);
scoped_lock<interprocess_mutex> lock(memory->lock);
int available = -1;
for (int k = 0; k < QUEUE_SIZE; k++) {
int fnd = 0;
int j = 0;
while ( memory->video_order[j] != -1 && j != QUEUE_SIZE) {
if (memory->video_order[j] == k) {
fnd = 1;
break;
}
j++;
}
if (fnd)
continue;
available = k;
}
int available = find_available_slot(memory->video_order);
memory->video_order[queue_size(memory->video_order)] = available;
Packet* block = &memory->video[available];
memcpy(block->data,data,size);
@ -128,6 +118,7 @@ pop_audio_packet(SharedMemory* memory, void* data, int* size){
std::this_thread::sleep_for(1ms);
scoped_lock<interprocess_mutex> lock(memory->lock);
// std::cout << "Audio buffer size : " << queue_size(memory->audio_order) << "\n";
int pop = memory->audio_order[0];
Packet *block = &memory->audio[pop];
@ -148,6 +139,7 @@ pop_video_packet(SharedMemory* memory, void* data, int* size){
std::this_thread::sleep_for(1ms);
scoped_lock<interprocess_mutex> lock(memory->lock);
// std::cout << "Video buffer size : " << queue_size(memory->video_order) << "\n";
int pop = memory->video_order[0];
Packet *block = &memory->video[pop];

View File

@ -173,34 +173,27 @@ main(int argc, char *argv[]) {
auto video_packets = mail::man->queue<video::packet_t>(mail::video_packets);
auto audio_packets = mail::man->queue<audio::packet_t>(mail::audio_packets);
auto video_handle = std::thread{[&](){
auto push = std::thread{[&](){
while (!shutdown_event->peek()) {
if(!video_packets->peek()) {
std::this_thread::sleep_for(1ms);
continue;
while(video_packets->peek()) {
auto packet = video_packets->pop();
push_video_packet(memory,packet->data(),packet->data_size(),VideoMetadata{
packet->is_idr()
});
}
while(audio_packets->peek()) {
auto packet = audio_packets->pop();
push_audio_packet(memory,packet->second.begin(),packet->second.size());
}
auto packet = video_packets->pop();
BOOST_LOG(info) << "Receive video packet size " << packet->data_size() << "\n";
}
}};
auto audio_handle = std::thread{[&](){
while (!shutdown_event->peek()) {
if(!audio_packets->peek()) {
std::this_thread::sleep_for(1ms);
continue;
}
auto packet = audio_packets->pop();
BOOST_LOG(info) << "Receive audio packet size " << packet->second.size() << "\n";
std::this_thread::sleep_for(100us);
}
}};
while (!shutdown_event->peek())
std::this_thread::sleep_for(1s);
video_handle.join();
audio_handle.join();
push.join();
audio_capture.join();
video_capture.join();
task_pool.stop();

View File

@ -26,6 +26,7 @@ int main (int argc, char *argv[]) {
//Allocate a portion of the segment (raw memory)
std::size_t free_memory = segment.get_free_memory();
SharedMemory* memory = (SharedMemory*)segment.allocate(sizeof(SharedMemory));
init_shared_memory(memory);
//Check invariant
if(free_memory <= segment.get_free_memory())
@ -33,19 +34,36 @@ int main (int argc, char *argv[]) {
auto running = true;
auto thread = std::thread{[&](){
int size;
char buffer[PACKET_SIZE] = {0};
while (running) {
// std::this_thread::sleep_for(1ms);
// if (!video_buffer->read) {
// std::cout << "Audio buffer received : " << video_buffer->size << "\n";
// video_buffer->read = true;
// }
// if (!audio_buffer->read) {
// std::cout << "Video buffer received : " << audio_buffer->size << "\n";
// audio_buffer->read = true;
// }
while(peek_audio_packet(memory)) {
pop_audio_packet(memory,buffer,&size);
std::cout << "Audio buffer received : " << size << "\n";
}
while(peek_video_packet(memory)) {
pop_video_packet(memory,buffer,&size);
std::cout << "Video buffer received : " << size << "\n";
}
std::this_thread::sleep_for(100us);
}
}};
// auto thread_test = std::thread{[&](){
// int size = 120;
// char buffer[PACKET_SIZE] = {0};
// while (running) {
// std::this_thread::sleep_for(1ms);
// push_audio_packet(memory,buffer,size);
// push_video_packet(memory,buffer,size,VideoMetadata{0});
// size++;
// }
// }};
// while (running)
// std::this_thread::sleep_for(1s);
//An handle from the base address can identify any byte of the shared
//memory segment even if it is mapped in different base addresses
managed_shared_memory::handle_t handle = segment.get_handle_from_address((void*)memory);
@ -66,5 +84,6 @@ int main (int argc, char *argv[]) {
running = false;
thread.join();
// thread_test.join();
return 0;
}