diff --git a/lib/interprocess.cpp b/lib/interprocess.cpp index 0c37861e..84176703 100644 --- a/lib/interprocess.cpp +++ b/lib/interprocess.cpp @@ -9,8 +9,6 @@ #include #include -#include -#include #include @@ -24,25 +22,6 @@ using namespace std::literals; #endif -typedef struct { - Queue queues[QueueType::Max]; - Event events[EVENT_TYPE_MAX]; - interprocess_mutex lock; -}SharedMemoryInternal; - - -EXPORTS(void) -lock_shared_memory(SharedMemory* memory){ - SharedMemoryInternal* internal = (SharedMemoryInternal*) memory; - internal->lock.lock(); -} - -EXPORTS(void) -unlock_shared_memory(SharedMemory* memory){ - SharedMemoryInternal* internal = (SharedMemoryInternal*) memory; - internal->lock.unlock(); -} - std::string gen_random(const int len) { static const char alphanum[] = @@ -63,18 +42,18 @@ std::string gen_random(const int len) { } std::string randkey = gen_random(20); -managed_shared_memory segment(create_only, randkey.c_str(), 2 * sizeof(SharedMemoryInternal)); +managed_shared_memory segment(create_only, randkey.c_str(), 2 * sizeof(SharedMemory)); void init_shared_memory(SharedMemory* memory){ - for (int i = 0; i < QUEUE_SIZE; i++) - for (int j = 0; j < QueueType::Max; j++) - memory->queues[j].order[i] = -1; + for (int j = 0; j < QueueType::QueueMax; j++) { + for (int k = 0; k < EventType::EventMax; k++) + memory->queues[j].events[k].read = 1; - for (int i = 0; i < EventType::EVENT_TYPE_MAX; i++) - memory->events[i].read = 1; + memory->queues[j].index = QUEUE_SIZE - 1; + } } @@ -115,7 +94,7 @@ EXPORTS(SharedMemory*) allocate_shared_memory(char* rand) { //Allocate a portion of the segment (raw memory) std::size_t free_memory = segment.get_free_memory(); - SharedMemory* memory = (SharedMemory*)segment.allocate(sizeof(SharedMemoryInternal)); + SharedMemory* memory = (SharedMemory*)segment.allocate(sizeof(SharedMemory)); init_shared_memory(memory); //Check invariant diff --git a/lib/interprocess.h b/lib/interprocess.h index baa96880..1d0088b8 100644 --- a/lib/interprocess.h +++ b/lib/interprocess.h @@ -17,8 +17,6 @@ extern "C" { EXPORT(SharedMemory*) allocate_shared_memory(char* rand) ; EXPORT(SharedMemory*) obtain_shared_memory(char* rand) ; -EXPORT(void) lock_shared_memory(SharedMemory* memory); -EXPORT(void) unlock_shared_memory(SharedMemory* memory); EXPORT(void) free_shared_memory(SharedMemory* buffer); EXPORT(void) deinit_shared_memory(); } \ No newline at end of file diff --git a/parent.go b/parent.go index c877eb7f..e6445c58 100644 --- a/parent.go +++ b/parent.go @@ -20,23 +20,6 @@ import ( "golang.org/x/sys/windows" ) -const ( - audio = 1 - video = 2 -) - -type DataType int - -func peek(memory *C.SharedMemory, media DataType) bool { - if media == video { - return memory.queues[C.Video0].order[0] != -1 - } else if media == audio { - return memory.queues[C.Audio].order[0] != -1 - } - - panic(fmt.Errorf("unknown data type")) -} - func byteSliceToString(s []byte) string { n := bytes.IndexByte(s, 0) if n >= 0 { @@ -90,14 +73,6 @@ func main() { if err != nil { panic(err) } - lock, err := mod.FindProc("lock_shared_memory") - if err != nil { - panic(err) - } - unlock, err := mod.FindProc("unlock_shared_memory") - if err != nil { - panic(err) - } buffer := make([]byte, 128) _, _, err = proc.Call( @@ -115,44 +90,42 @@ func main() { } memory := (*C.SharedMemory)(unsafe.Pointer(pointer)) - handle_video := func() { - lock.Call(pointer) - defer unlock.Call(pointer) - - block := memory.queues[C.Video0].array[memory.queues[C.Video0].order[0]] - fmt.Printf("video buffer %d\n", block.size) - - for i := 0; i < C.QUEUE_SIZE-1; i++ { - memory.queues[C.Video0].order[i] = memory.queues[C.Video0].order[i+1] - } - - memory.queues[C.Video0].order[C.QUEUE_SIZE-1] = -1 - } - - handle_audio := func() { - lock.Call(pointer) - defer unlock.Call(pointer) - - block := memory.queues[C.Audio].array[memory.queues[C.Audio].order[0]] - fmt.Printf("audio buffer %d\n", block.size) - - for i := 0; i < C.QUEUE_SIZE-1; i++ { - memory.queues[C.Audio].order[i] = memory.queues[C.Audio].order[i+1] - } - - memory.queues[C.Audio].order[C.QUEUE_SIZE-1] = -1 - } - go func() { for { - for peek(memory, video) { - handle_video() - } - for peek(memory, audio) { - handle_audio() - } + memory.queues[C.Video0].metadata.active = C.int(0) + memory.queues[C.Video1].metadata.active = C.int(0) + memory.queues[C.Audio].metadata.active = C.int(0) - time.Sleep(time.Millisecond) + time.Sleep(time.Second * 10) + + memory.queues[C.Video0].metadata.active = C.int(1) + memory.queues[C.Video1].metadata.active = C.int(1) + memory.queues[C.Audio].metadata.active = C.int(1) + + time.Sleep(time.Second * 10) + } + }() + go func() { + indexes := make([]int,C.QueueMax) + for i,_ := range indexes { + indexes[i] = int(memory.queues[i].index) + } + + for { + for queue_type,_ := range indexes { + if memory.queues[queue_type].metadata.running != 1 { + continue + } + + for int(memory.queues[queue_type].index) > indexes[queue_type] { + new_index := indexes[queue_type] + 1 + real_index := new_index % C.QUEUE_SIZE + block := memory.queues[queue_type].array[real_index] + fmt.Printf("Queue type %d, downstream index %d, upstream index %d, receive size %d\n",queue_type,new_index, memory.queues[queue_type].index,block.size) + indexes[queue_type] = new_index; + } + } + time.Sleep(time.Microsecond * 100) } }() diff --git a/smemory.h b/smemory.h index d22cec1d..6d98c719 100644 --- a/smemory.h +++ b/smemory.h @@ -1,4 +1,4 @@ -#define QUEUE_SIZE 16 +#define QUEUE_SIZE 256 #define PACKET_SIZE 32 * 1024 enum QueueType { @@ -6,52 +6,57 @@ enum QueueType { Video1, Audio, Microphone, - Max + QueueMax }; typedef enum _EventType { - POINTER_VISIBLE, - CHANGE_BITRATE, - CHANGE_FRAMERATE, - IDR_FRAME, - - STOP, - HDR_CALLBACK, - EVENT_TYPE_MAX + Pointer, + Bitrate, + Framerate, + Idr, + Hdr, + Stop, + EventMax } EventType; typedef struct { int is_idr; - enum QueueType type; -}Metadata; +}PacketMetadata; + +typedef struct { + int active; + int running; + + char display[64]; +}QueueMetadata; typedef struct { int size; - Metadata metadata; + PacketMetadata metadata; char data[PACKET_SIZE]; } Packet; typedef enum _DataType { HDR_INFO, + NUMBER, + STRING, } DataType; typedef struct { + int read; + DataType type; + int data_size; int value_number; char value_raw[PACKET_SIZE]; - int data_size; - - DataType type; - - int read; } Event; - typedef struct _Queue{ + int index; + QueueMetadata metadata; + Event events[EventMax]; Packet array[QUEUE_SIZE]; - int order[QUEUE_SIZE]; }Queue; typedef struct { - Queue queues[Max]; - Event events[EVENT_TYPE_MAX]; + Queue queues[QueueMax]; }SharedMemory; \ No newline at end of file diff --git a/src/audio.cpp b/src/audio.cpp index 1995e380..f70897cc 100644 --- a/src/audio.cpp +++ b/src/audio.cpp @@ -96,8 +96,8 @@ namespace audio { auto control_shared = safe::make_shared(start_audio_control, stop_audio_control); void - encodeThread(sample_queue_t samples, config_t config, void *channel_data) { - auto packets = mail::man->queue(mail::audio_packets); + encodeThread(safe::mail_t mail,sample_queue_t samples, config_t config, void *channel_data) { + auto packets = mail->queue(mail::audio_packets); auto stream = &stream_configs[map_stream(config.channels, config.flags[config_t::HIGH_QUALITY])]; // Encoding takes place on this thread @@ -204,7 +204,7 @@ namespace audio { platf::adjust_thread_priority(platf::thread_priority_e::critical); auto samples = std::make_shared(30); - std::thread thread { encodeThread, samples, config, channel_data }; + std::thread thread { encodeThread, mail, samples, config, channel_data }; auto fg = util::fail_guard([&]() { samples->stop(); diff --git a/src/interprocess.cpp b/src/interprocess.cpp index bc6689b6..2817fbeb 100644 --- a/src/interprocess.cpp +++ b/src/interprocess.cpp @@ -10,16 +10,14 @@ #include #include -#include #include using namespace boost::interprocess; -using namespace std::literals; -SharedMemoryInternal* obtain_shared_memory(char* rand) { +SharedMemory* obtain_shared_memory(char* rand) { std::vector strings; std::istringstream f(rand); @@ -40,83 +38,43 @@ SharedMemoryInternal* obtain_shared_memory(char* rand) { static managed_shared_memory segment(open_only, key.c_str()); //Get buffer local address from handle - SharedMemoryInternal* memory = (SharedMemoryInternal*)segment.get_address_from_handle(handle); + SharedMemory* memory = (SharedMemory*)segment.get_address_from_handle(handle); return memory; } -int queue_size(int* queue) { - int i = 0; - while (*queue != -1 && i != QUEUE_SIZE){ // wait while queue is full - queue++; - i++; - } - - return i; -} - -int find_available_slot(int* orders) { - int available = -1; - // find available packet slot - for (int k = 0; k < QUEUE_SIZE; k++) { - int fnd = 0; - - int* copy = orders; - int j = 0; - while ( *copy != -1 && j != QUEUE_SIZE) { - if (*copy == k) { - fnd = 1; - break; - } - - j++; - copy++; - } - - if (!fnd) { - available = k; - break; - } - } - - return available; -} - - - void -push_packet(SharedMemoryInternal* memory, +push_packet(Queue* queue, void* data, int size, - Metadata metadata){ + PacketMetadata metadata){ // wait while queue is full - while (queue_size(memory->queues[metadata.type].order) == QUEUE_SIZE) - std::this_thread::sleep_for(1ms); + auto new_index = queue->index + 1; - scoped_lock lock(memory->lock); - - int available = find_available_slot(memory->queues[metadata.type].order); - memory->queues[metadata.type].order[queue_size(memory->queues[metadata.type].order)] = available; - Packet* block = &memory->queues[metadata.type].array[available]; + auto real_index = new_index % QUEUE_SIZE; + Packet* block = &queue->array[real_index]; memcpy(block->data,data,size); block->size = size; block->metadata = metadata; + + //always update index after write data + queue->index = new_index; } void -raise_event(SharedMemoryInternal* memory, EventType type, Event event){ +raise_event(Queue* queue, EventType type, Event event){ event.read = false; - memcpy(&memory->events[type],&event,sizeof(Event)); + memcpy(&queue->events[type],&event,sizeof(Event)); } int -peek_event(SharedMemoryInternal* memory, EventType type){ +peek_event(Queue* memory, EventType type){ return !memory->events[type].read; } Event -pop_event(SharedMemoryInternal* memory, EventType type){ - memory->events[type].read = true; - return memory->events[type]; +pop_event(Queue* queue, EventType type){ + queue->events[type].read = true; + return queue->events[type]; } \ No newline at end of file diff --git a/src/interprocess.h b/src/interprocess.h index cef9e975..2ae6e580 100644 --- a/src/interprocess.h +++ b/src/interprocess.h @@ -7,28 +7,19 @@ #include "thread_pool.h" #include "thread_safe.h" -#include - - #include -typedef struct { - Queue queues[QueueType::Max]; - Event events[EVENT_TYPE_MAX]; - boost::interprocess::interprocess_mutex lock; -}SharedMemoryInternal; - -SharedMemoryInternal* +SharedMemory* obtain_shared_memory(char* key); void -push_packet(SharedMemoryInternal* memory, void* data, int size, Metadata metadata); +push_packet(Queue* memory, void* data, int size, PacketMetadata metadata); void -raise_event(SharedMemoryInternal* memory, EventType type, Event event); +raise_event(Queue* memory, EventType type, Event event); int -peek_event(SharedMemoryInternal* memory, EventType type); +peek_event(Queue* memory, EventType type); Event -pop_event(SharedMemoryInternal* memory, EventType type); \ No newline at end of file +pop_event(Queue* memory, EventType type); \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 48075f45..159ced18 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -107,8 +107,8 @@ main(int argc, char *argv[]) { task_pool.start(1); // Create signal handler after logging has been initialized - auto shutdown_event = mail::man->event(mail::shutdown); - on_signal(SIGINT, [&force_shutdown, shutdown_event]() { + auto process_shutdown_event = mail::man->event(mail::shutdown); + on_signal(SIGINT, [&force_shutdown, process_shutdown_event]() { BOOST_LOG(info) << "Interrupt handler called"sv; auto task = []() { @@ -117,10 +117,10 @@ main(int argc, char *argv[]) { }; force_shutdown = task_pool.pushDelayed(task, 10s).task_id; - shutdown_event->raise(true); + process_shutdown_event->raise(true); }); - on_signal(SIGTERM, [&force_shutdown, shutdown_event]() { + on_signal(SIGTERM, [&force_shutdown, process_shutdown_event]() { BOOST_LOG(info) << "Terminate handler called"sv; auto task = []() { @@ -129,7 +129,7 @@ main(int argc, char *argv[]) { }; force_shutdown = task_pool.pushDelayed(task, 10s).task_id; - shutdown_event->raise(true); + process_shutdown_event->raise(true); }); // If any of the following fail, we log an error and continue event though sunshine will not function correctly. @@ -146,64 +146,90 @@ main(int argc, char *argv[]) { } //Get buffer local address from handle - SharedMemoryInternal* memory = obtain_shared_memory(argv[1]); + SharedMemory* memory = obtain_shared_memory(argv[1]); - auto video_capture = std::thread{[&](){ - std::optional display = std::string("123"); - if (argc > 2) - display = std::string(argv[2]); + auto video_capture = [&](safe::mail_t mail, char* displayin){ + std::optional display = std::nullopt; + if (strlen(displayin) > 0) + display = std::string(displayin); - video::capture(mail::man,video::config_t{ + video::capture(mail,video::config_t{ display, 1920, 1080, 60, 6000, 1, 0, 1, 0, 0 },NULL); - }}; - auto audio_capture = std::thread{[&](){ - audio::capture(mail::man,audio::config_t{ + }; + + auto audio_capture = [&](safe::mail_t mail){ + audio::capture(mail,audio::config_t{ 10,2,3,0 },NULL); - }}; + }; - auto video_packets = mail::man->queue(mail::video_packets); - auto audio_packets = mail::man->queue(mail::audio_packets); - auto bitrate = mail::man->event(mail::bitrate); - auto framerate = mail::man->event(mail::framerate); - auto idr = mail::man->event(mail::idr); - auto input = input::alloc(mail::man); - auto push = std::thread{[&](){ - while (!shutdown_event->peek()) { + auto push = [process_shutdown_event](safe::mail_t mail, Queue* queue){ + auto video_packets = mail->queue(mail::video_packets); + auto audio_packets = mail->queue(mail::audio_packets); + auto bitrate = mail->event(mail::bitrate); + auto framerate = mail->event(mail::framerate); + auto idr = mail->event(mail::idr); + auto local_shutdown= mail->event(mail::shutdown); + auto input = input::alloc(mail); + + while (!process_shutdown_event->peek() && !local_shutdown->peek() && queue->metadata.active) { while(video_packets->peek()) { auto packet = video_packets->pop(); - push_packet(memory,packet->data(),packet->data_size(),Metadata{ - packet->is_idr(),QueueType::Video0 - }); + push_packet(queue,packet->data(),packet->data_size(),PacketMetadata{ packet->is_idr() }); } while(audio_packets->peek()) { auto packet = audio_packets->pop(); - push_packet(memory,packet->second.begin(),packet->second.size(),Metadata{ - 0,QueueType::Audio - }); + push_packet(queue,packet->second.begin(),packet->second.size(),PacketMetadata{ 0 }); } - if(peek_event(memory,EventType::CHANGE_BITRATE)) - bitrate->raise(pop_event(memory,EventType::CHANGE_BITRATE).value_number); - if(peek_event(memory,EventType::CHANGE_FRAMERATE)) - framerate->raise(pop_event(memory,EventType::CHANGE_FRAMERATE).value_number); - if(peek_event(memory,EventType::POINTER_VISIBLE)) - display_cursor = pop_event(memory,EventType::POINTER_VISIBLE).value_number; - if(peek_event(memory,EventType::IDR_FRAME)) - idr->raise(pop_event(memory,EventType::IDR_FRAME).value_number > 0); + if(peek_event(queue,EventType::Bitrate)) + bitrate->raise(pop_event(queue,EventType::Bitrate).value_number); + if(peek_event(queue,EventType::Framerate)) + framerate->raise(pop_event(queue,EventType::Framerate).value_number); + if(peek_event(queue,EventType::Pointer)) + display_cursor = pop_event(queue,EventType::Pointer).value_number; + if(peek_event(queue,EventType::Idr)) + idr->raise(pop_event(queue,EventType::Idr).value_number > 0); std::this_thread::sleep_for(1ms); } - }}; - while (!shutdown_event->peek()) - std::this_thread::sleep_for(1s); + if (!local_shutdown->peek()) + local_shutdown->raise(true); + + queue->metadata.running = 0; + }; + + std::vector actives; + while (!process_shutdown_event->peek()) { + for (int i = 0; i < QueueType::QueueMax; i++) { + auto queue = &memory->queues[i]; + if (!queue->metadata.active || queue->metadata.running) + continue; + + queue->metadata.running = 1; + if (i == QueueType::Video0 || i == QueueType::Video1) { + auto mail = std::make_shared(); + auto capture = std::thread{video_capture,mail,queue->metadata.display}; + auto forward = std::thread{push,mail,queue}; + capture.detach(); + forward.detach(); + } else if (i == QueueType::Audio) { + auto mail = std::make_shared(); + auto capture = std::thread{audio_capture,mail}; + auto forward = std::thread{push,mail,queue}; + capture.detach(); + forward.detach(); + } + } + + + + std::this_thread::sleep_for(100ms); + } - push.join(); - audio_capture.join(); - video_capture.join(); task_pool.stop(); task_pool.join(); diff --git a/src/video.cpp b/src/video.cpp index 1c330fa1..03cdb2e8 100644 --- a/src/video.cpp +++ b/src/video.cpp @@ -1781,7 +1781,7 @@ namespace video { BOOST_LOG(info) << "framerate "sv << config->framerate; BOOST_LOG(info) << "bitrate "sv << config->bitrate; - auto packets = mail::man->queue(mail::video_packets); + auto packets = mail->queue(mail::video_packets); auto idr_events = mail->event(mail::idr); auto invalidate_ref_frames_events = mail->event>(mail::invalidate_ref_frames); @@ -2243,7 +2243,7 @@ namespace video { ref->encode_session_ctx_queue.raise(sync_session_ctx_t { &join_event, mail->event(mail::shutdown), - mail::man->queue(mail::video_packets), + mail->queue(mail::video_packets), std::move(idr_events), mail->event(mail::hdr), mail->event(mail::touch_port),