From 801fb18c32d735526a435f8dfe65c99c2c0fb3bd Mon Sep 17 00:00:00 2001 From: pigeatgarlic Date: Thu, 4 Apr 2024 16:51:36 -0700 Subject: [PATCH] multi channel shared memory --- lib/interprocess.cpp | 7 ++-- lib/interprocess.h | 20 +++++---- parent.go | 31 +++++++------- src/globals.h | 3 +- src/input.cpp | 5 --- src/interprocess.cpp | 88 +++------------------------------------ src/interprocess.h | 44 ++++++-------------- src/main.cpp | 28 +++++++++++-- src/video.cpp | 98 ++++++++++++++++++++++++++++++++------------ src/video.h | 2 + 10 files changed, 148 insertions(+), 178 deletions(-) diff --git a/lib/interprocess.cpp b/lib/interprocess.cpp index dcbc8d94..e3f27400 100644 --- a/lib/interprocess.cpp +++ b/lib/interprocess.cpp @@ -69,10 +69,9 @@ managed_shared_memory segment(create_only, randkey.c_str(), 2 * sizeof(SharedMem void init_shared_memory(SharedMemory* memory){ - for (int i = 0; i < QUEUE_SIZE; i++) { - memory->queues[QueueType::Audio].order[i] = -1; - memory->queues[QueueType::Video].order[i] = -1; - } + for (int i = 0; i < QUEUE_SIZE; i++) + for (int j = 0; j < QueueType::Max; j++) + memory->queues[j].order[i] = -1; for (int i = 0; i < EventType::EVENT_TYPE_MAX; i++) memory->events[i].read = 1; diff --git a/lib/interprocess.h b/lib/interprocess.h index fd9b71cd..2b22776a 100644 --- a/lib/interprocess.h +++ b/lib/interprocess.h @@ -14,13 +14,22 @@ extern "C" { #define QUEUE_SIZE 16 #define PACKET_SIZE 32 * 1024 +enum QueueType { + Video0, + Video1, + Audio, + Microphone, + Max +}; + typedef struct { int is_idr; -}VideoMetadata; + QueueType type; +}Metadata; typedef struct { int size; - VideoMetadata metadata; + Metadata metadata; char data[PACKET_SIZE]; } Packet; @@ -28,7 +37,6 @@ typedef enum _EventType { POINTER_VISIBLE, CHANGE_BITRATE, CHANGE_FRAMERATE, - CHANGE_DISPLAY, IDR_FRAME, STOP, @@ -50,12 +58,6 @@ typedef struct { int read; } Event; -enum QueueType { - Video, - Audio, - Microphone, - Max -}; typedef struct _Queue{ Packet array[QUEUE_SIZE]; diff --git a/parent.go b/parent.go index a5de357d..41803a46 100644 --- a/parent.go +++ b/parent.go @@ -4,13 +4,22 @@ package main #define QUEUE_SIZE 16 #define PACKET_SIZE 32 * 1024 +enum QueueType { + Video0, + Video1, + Audio, + Microphone, + Max +}; + typedef struct { int is_idr; -}VideoMetadata; + enum QueueType type; +}Metadata; typedef struct { int size; - VideoMetadata metadata; + Metadata metadata; char data[PACKET_SIZE]; } Packet; @@ -18,7 +27,6 @@ typedef enum _EventType { POINTER_VISIBLE, CHANGE_BITRATE, CHANGE_FRAMERATE, - CHANGE_DISPLAY, IDR_FRAME, STOP, @@ -40,12 +48,6 @@ typedef struct { int read; } Event; -enum QueueType { - Video, - Audio, - Microphone, - Max -}; typedef struct _Queue{ Packet array[QUEUE_SIZE]; @@ -57,9 +59,6 @@ typedef struct { Event events[EVENT_TYPE_MAX]; }SharedMemory; - - - */ import "C" import ( @@ -87,7 +86,7 @@ type DataType int func peek(memory *C.SharedMemory, media DataType) bool { if media == video { - return memory.queues[C.Video].order[0] != -1 + return memory.queues[C.Video0].order[0] != -1 } else if media == audio { return memory.queues[C.Audio].order[0] != -1 } @@ -177,14 +176,14 @@ func main() { lock.Call(pointer) defer unlock.Call(pointer) - block := memory.queues[C.Video].array[memory.queues[C.Video].order[0]] + block := memory.queues[C.Video0].array[memory.queues[C.Video0].order[0]] fmt.Printf("video buffer %d\n", block.size) for i := 0; i < C.QUEUE_SIZE-1; i++ { - memory.queues[C.Video].order[i] = memory.queues[C.Video].order[i+1] + memory.queues[C.Video0].order[i] = memory.queues[C.Video0].order[i+1] } - memory.queues[C.Video].order[C.QUEUE_SIZE-1] = -1 + memory.queues[C.Video0].order[C.QUEUE_SIZE-1] = -1 } handle_audio := func() { diff --git a/src/globals.h b/src/globals.h index ded8427c..19b38b9e 100644 --- a/src/globals.h +++ b/src/globals.h @@ -28,7 +28,8 @@ namespace mail { MAIL(shutdown); MAIL(video_packets); MAIL(audio_packets); - MAIL(switch_display); + MAIL(bitrate); + MAIL(framerate); // Local mail MAIL(touch_port); diff --git a/src/input.cpp b/src/input.cpp index 89f7291f..c3445755 100644 --- a/src/input.cpp +++ b/src/input.cpp @@ -204,11 +204,6 @@ namespace input { BOOST_LOG(debug) << "Apply Shortcut: 0x"sv << util::hex((std::uint8_t) keyCode).to_string_view(); - if (keyCode >= VK_F1 && keyCode <= VK_F13) { - mail::man->event(mail::switch_display)->raise(keyCode - VK_F1); - return 1; - } - switch (keyCode) { case 0x4E /* VKEY_N */: display_cursor = !display_cursor; diff --git a/src/interprocess.cpp b/src/interprocess.cpp index effd47e1..be886236 100644 --- a/src/interprocess.cpp +++ b/src/interprocess.cpp @@ -83,96 +83,26 @@ int find_available_slot(int* orders) { } -void -push_audio_packet(SharedMemory* memory, void* data, int size){ - // wait while queue is full - while (queue_size(memory->queues[QueueType::Audio].order) == QUEUE_SIZE) - std::this_thread::sleep_for(1ms); - - scoped_lock lock(memory->lock); - - - int available = find_available_slot(memory->queues[QueueType::Audio].order); - memory->queues[QueueType::Audio].order[queue_size(memory->queues[QueueType::Audio].order)] = available; - Packet* block = &memory->queues[QueueType::Audio].array[available]; - memcpy(block->data,data,size); - block->size = size; - -} void -push_video_packet(SharedMemory* memory, +push_packet(SharedMemory* memory, void* data, int size, - VideoMetadata metadata){ + Metadata metadata){ // wait while queue is full - while (queue_size(memory->queues[QueueType::Video].order) == QUEUE_SIZE) + while (queue_size(memory->queues[metadata.type].order) == QUEUE_SIZE) std::this_thread::sleep_for(1ms); scoped_lock lock(memory->lock); - int available = find_available_slot(memory->queues[QueueType::Video].order); - memory->queues[QueueType::Video].order[queue_size(memory->queues[QueueType::Video].order)] = available; - Packet* block = &memory->queues[QueueType::Video].array[available]; + int available = find_available_slot(memory->queues[metadata.type].order); + memory->queues[metadata.type].order[queue_size(memory->queues[metadata.type].order)] = available; + Packet* block = &memory->queues[metadata.type].array[available]; memcpy(block->data,data,size); block->size = size; block->metadata = metadata; } -int -peek_video_packet(SharedMemory* memory){ - return memory->queues[QueueType::Video].order[0] != -1; -} - -int -peek_audio_packet(SharedMemory* memory){ - return memory->queues[QueueType::Audio].order[0] != -1; -} - -void -pop_audio_packet(SharedMemory* memory, void* data, int* size){ - while (!peek_audio_packet(memory)) - std::this_thread::sleep_for(1ms); - - scoped_lock lock(memory->lock); - // std::cout << "Audio buffer size : " << queue_size(memory->queues[QueueType::Audio].order) << "\n"; - - int pop = memory->queues[QueueType::Audio].order[0]; - Packet *block = &memory->queues[QueueType::Audio].array[pop]; - memcpy(data,block->data,block->size); - *size = block->size; - - // reorder - for (int i = 0; i < QUEUE_SIZE - 1; i++) - memory->queues[QueueType::Audio].order[i] = memory->queues[QueueType::Audio].order[i+1]; - - memory->queues[QueueType::Audio].order[QUEUE_SIZE - 1] = -1; - -} - -VideoMetadata -pop_video_packet(SharedMemory* memory, void* data, int* size){ - while (!peek_video_packet(memory)) - std::this_thread::sleep_for(1ms); - - scoped_lock lock(memory->lock); - // std::cout << "Video buffer size : " << queue_size(memory->queues[QueueType::Video].order) << "\n"; - - int pop = memory->queues[QueueType::Video].order[0]; - Packet *block = &memory->queues[QueueType::Video].array[pop]; - memcpy(data,block->data,block->size); - *size = block->size; - auto copy = block->metadata; - - // reorder - for (int i = 0; i < QUEUE_SIZE - 1; i++) - memory->queues[QueueType::Video].order[i] = memory->queues[QueueType::Video].order[i+1]; - - memory->queues[QueueType::Video].order[QUEUE_SIZE - 1] = -1; - - - return copy; -} void raise_event(SharedMemory* memory, EventType type, Event event){ @@ -189,10 +119,4 @@ Event pop_event(SharedMemory* memory, EventType type){ memory->events[type].read = true; return memory->events[type]; -} - -void -wait_event(SharedMemory* memory, EventType type){ - while(memory->events[type].read) - std::this_thread::sleep_for(1ms); } \ No newline at end of file diff --git a/src/interprocess.h b/src/interprocess.h index 5dae6c24..54a8b30e 100644 --- a/src/interprocess.h +++ b/src/interprocess.h @@ -14,15 +14,22 @@ #define QUEUE_SIZE 16 #define PACKET_SIZE 32 * 1024 +enum QueueType { + Video0, + Video1, + Audio, + Microphone, + Max +}; + typedef struct { int is_idr; - - -}VideoMetadata; + QueueType type; +}Metadata; typedef struct { int size; - VideoMetadata metadata; + Metadata metadata; char data[PACKET_SIZE]; } Packet; @@ -30,7 +37,6 @@ typedef enum _EventType { POINTER_VISIBLE, CHANGE_BITRATE, CHANGE_FRAMERATE, - CHANGE_DISPLAY, IDR_FRAME, STOP, @@ -52,12 +58,6 @@ typedef struct { int read; } Event; -enum QueueType { - Video, - Audio, - Microphone, - Max -}; typedef struct _Queue{ Packet array[QUEUE_SIZE]; @@ -74,22 +74,7 @@ SharedMemory* obtain_shared_memory(char* key); void -push_audio_packet(SharedMemory* memory, void* data, int size); - -void -push_video_packet(SharedMemory* memory, void* data, int size, VideoMetadata metadata); - -int -peek_video_packet(SharedMemory* memory); - -int -peek_audio_packet(SharedMemory* memory); - -void -pop_audio_packet(SharedMemory* memory, void* data, int* size); - -VideoMetadata -pop_video_packet(SharedMemory* memory, void* data, int* size); +push_packet(SharedMemory* memory, void* data, int size, Metadata metadata); void raise_event(SharedMemory* memory, EventType type, Event event); @@ -98,7 +83,4 @@ int peek_event(SharedMemory* memory, EventType type); Event -pop_event(SharedMemory* memory, EventType type); - -void -wait_event(SharedMemory* memory, EventType type); +pop_event(SharedMemory* memory, EventType type); \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 81e8ae94..a5bd63ac 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -149,8 +149,12 @@ main(int argc, char *argv[]) { SharedMemory* memory = obtain_shared_memory(argv[1]); auto video_capture = std::thread{[&](){ + std::optional display = std::string("123"); + if (argc > 2) + display = std::string(argv[2]); + video::capture(mail::man,video::config_t{ - 1920, 1080, 60, 6000, 1, 0, 1, 0, 0 + display, 1920, 1080, 60, 6000, 1, 0, 1, 0, 0 },NULL); }}; auto audio_capture = std::thread{[&](){ @@ -161,19 +165,35 @@ main(int argc, char *argv[]) { auto video_packets = mail::man->queue(mail::video_packets); auto audio_packets = mail::man->queue(mail::audio_packets); + auto bitrate = mail::man->event(mail::bitrate); + auto framerate = mail::man->event(mail::framerate); + auto idr = mail::man->event(mail::idr); + auto input = input::alloc(mail::man); + auto push = std::thread{[&](){ while (!shutdown_event->peek()) { while(video_packets->peek()) { auto packet = video_packets->pop(); - push_video_packet(memory,packet->data(),packet->data_size(),VideoMetadata{ - packet->is_idr() + push_packet(memory,packet->data(),packet->data_size(),Metadata{ + packet->is_idr(),QueueType::Video0 }); } while(audio_packets->peek()) { auto packet = audio_packets->pop(); - push_audio_packet(memory,packet->second.begin(),packet->second.size()); + push_packet(memory,packet->second.begin(),packet->second.size(),Metadata{ + 0,QueueType::Audio + }); } + if(peek_event(memory,EventType::CHANGE_BITRATE)) + bitrate->raise(pop_event(memory,EventType::CHANGE_BITRATE).value_number); + if(peek_event(memory,EventType::CHANGE_FRAMERATE)) + framerate->raise(pop_event(memory,EventType::CHANGE_FRAMERATE).value_number); + if(peek_event(memory,EventType::POINTER_VISIBLE)) + display_cursor = pop_event(memory,EventType::POINTER_VISIBLE).value_number; + if(peek_event(memory,EventType::IDR_FRAME)) + idr->raise(pop_event(memory,EventType::IDR_FRAME).value_number > 0); + std::this_thread::sleep_for(1ms); } }}; diff --git a/src/video.cpp b/src/video.cpp index 6c1938c2..1c330fa1 100644 --- a/src/video.cpp +++ b/src/video.cpp @@ -1029,7 +1029,6 @@ namespace video { } }); - auto switch_display_event = mail::man->event(mail::switch_display); // Wait for the initial capture context or a request to stop the queue auto initial_capture_ctx = capture_ctx_queue->pop(); @@ -1163,6 +1162,12 @@ namespace video { capture_ctx->images->raise(img); } + if (capture_ctx->config.display.has_value() && + capture_ctx->config.display.value() != display_names[display_p]) { + artificial_reinit = true; + return false; + } + ++capture_ctx; }) @@ -1174,10 +1179,6 @@ namespace video { capture_ctxs.emplace_back(std::move(*capture_ctx_queue->pop())); } - if (switch_display_event->peek()) { - artificial_reinit = true; - return false; - } return true; }; @@ -1232,9 +1233,20 @@ namespace video { refresh_displays(encoder.platform_formats->dev_type, display_names, display_p); // Process any pending display switch with the new list of displays - if (switch_display_event->peek()) { - display_p = std::clamp(*switch_display_event->pop(), 0, (int) display_names.size() - 1); + if (capture_ctxs.front().config.display.has_value() && + capture_ctxs.front().config.display.value() != display_names[display_p]) { + for (int i = 0; i < display_names.size();i ++) { + if (capture_ctxs.front().config.display.value() == display_names[i]) { + BOOST_LOG(info) << "Switched to display " << display_names[i]; + display_p = i; + goto next; + } + } + + BOOST_LOG(info) << "Display " << capture_ctxs.front().config.display.value() << " not found"; + capture_ctxs.front().config.display = std::nullopt; } + next: // reset_display() will sleep between retries reset_display(disp, encoder.platform_formats->dev_type, display_names[display_p], capture_ctxs.front().config); @@ -1751,18 +1763,24 @@ namespace video { int &frame_nr, // Store progress of the frame number safe::mail_t mail, img_event_t images, - config_t config, + config_t* config, std::shared_ptr disp, std::unique_ptr encode_device, safe::signal_t &reinit_event, const encoder_t &encoder, void *channel_data) { - auto session = make_encode_session(disp.get(), encoder, config, disp->width, disp->height, std::move(encode_device)); + auto session = make_encode_session(disp.get(), encoder, *config, disp->width, disp->height, std::move(encode_device)); if (!session) { return; } auto shutdown_event = mail->event(mail::shutdown); + auto bitrate_events = mail->event(mail::bitrate); + auto framerate_events = mail->event(mail::framerate); + + BOOST_LOG(info) << "framerate "sv << config->framerate; + BOOST_LOG(info) << "bitrate "sv << config->bitrate; + auto packets = mail::man->queue(mail::video_packets); auto idr_events = mail->event(mail::idr); auto invalidate_ref_frames_events = mail->event>(mail::invalidate_ref_frames); @@ -1778,6 +1796,11 @@ namespace video { } } + std::optional frame_timestamp; + auto timestamp = std::chrono::steady_clock::now(); + auto now = std::chrono::steady_clock::now(); + auto cycle = std::chrono::nanoseconds(1s) / config->framerate; + auto sleep_period = 0ns; while (true) { if (shutdown_event->peek() || reinit_event.peek() || !images->running()) { break; @@ -1791,6 +1814,16 @@ namespace video { } } + if (bitrate_events->peek()) { + config->bitrate = bitrate_events->pop().value(); + BOOST_LOG(info) << "bitrate changed to "sv << config->bitrate; + break; + } else if (framerate_events->peek()) { + config->framerate = framerate_events->pop().value(); + BOOST_LOG(info) << "framerate changed to "sv << config->framerate; + break; + } + if (idr_events->peek()) { requested_idr_frame = true; idr_events->pop(); @@ -1800,11 +1833,8 @@ namespace video { session->request_idr_frame(); } - std::optional frame_timestamp; - - // Encode at a minimum of 10 FPS to avoid image quality issues with static content if (!requested_idr_frame || images->peek()) { - if (auto img = images->pop(100ms)) { + if (auto img = images->pop()) { frame_timestamp = img->frame_timestamp; if (session->convert(*img)) { BOOST_LOG(error) << "Could not convert image"sv; @@ -1816,11 +1846,18 @@ namespace video { } } + sleep_period = 1s / config->framerate - cycle; + if(sleep_period > 0s) + std::this_thread::sleep_for(sleep_period); + if (encode(frame_nr++, *session, packets, channel_data, frame_timestamp)) { BOOST_LOG(error) << "Could not encode video packet"sv; return; } + now = std::chrono::steady_clock::now(); + cycle = now - timestamp; + timestamp = now; session->request_normal_frame(); } } @@ -1929,8 +1966,6 @@ namespace video { std::shared_ptr disp; - auto switch_display_event = mail::man->event(mail::switch_display); - if (synced_session_ctxs.empty()) { auto ctx = encode_session_ctx_queue.pop(); if (!ctx) { @@ -1943,11 +1978,20 @@ namespace video { while (encode_session_ctx_queue.running()) { // Refresh display names since a display removal might have caused the reinitialization refresh_displays(encoder.platform_formats->dev_type, display_names, display_p); - // Process any pending display switch with the new list of displays - if (switch_display_event->peek()) { - display_p = std::clamp(*switch_display_event->pop(), 0, (int) display_names.size() - 1); + if (synced_session_ctxs.front()->config.display.has_value() && + synced_session_ctxs.front()->config.display.value() != display_names[display_p]) { + for (int i = 0; i < display_names.size();i ++) { + if (synced_session_ctxs.front()->config.display.value() == display_names[i]) { + BOOST_LOG(info) << "Switched to display " << display_names[i]; + display_p = i; + goto next; + } + } + BOOST_LOG(info) << "Display " << synced_session_ctxs.front()->config.display.value() << " not found"; + synced_session_ctxs.front()->config.display = std::nullopt; } + next: // reset_display() will sleep between retries reset_display(disp, encoder.platform_formats->dev_type, display_names[display_p], synced_session_ctxs.front()->config); @@ -2013,6 +2057,12 @@ namespace video { continue; } + if (ctx->config.display.has_value() && + ctx->config.display.value() != display_names[display_p]) { + ec = platf::capture_e::reinit; + return false; + } + if (ctx->idr_events->peek()) { pos->session->request_idr_frame(); ctx->idr_events->pop(); @@ -2042,10 +2092,6 @@ namespace video { ++pos; }) - if (switch_display_event->peek()) { - ec = platf::capture_e::reinit; - return false; - } return true; }; @@ -2173,7 +2219,7 @@ namespace video { encode_run( frame_nr, mail, images, - config, display, + &config, display, std::move(encode_device), ref->reinit_event, *ref->encoder_p, channel_data); @@ -2286,8 +2332,8 @@ namespace video { encoder.av1.capabilities.set(); // First, test encoder viability - config_t config_max_ref_frames { 1920, 1080, 60, 1000, 1, 1, 1, 0, 0 }; - config_t config_autoselect { 1920, 1080, 60, 1000, 1, 0, 1, 0, 0 }; + config_t config_max_ref_frames { std::nullopt, 1920, 1080, 60, 1000, 1, 1, 1, 0, 0 }; + config_t config_autoselect { std::nullopt, 1920, 1080, 60, 1000, 1, 0, 1, 0, 0 }; // If the encoder isn't supported at all (not even H.264), bail early reset_display(disp, encoder.platform_formats->dev_type, config::video.output_name, config_autoselect); @@ -2407,7 +2453,7 @@ namespace video { } std::vector> configs { - { encoder_t::DYNAMIC_RANGE, { 1920, 1080, 60, 1000, 1, 0, 3, 1, 1 } }, + { encoder_t::DYNAMIC_RANGE, { std::nullopt, 1920, 1080, 60, 1000, 1, 0, 3, 1, 1 } }, }; for (auto &[flag, config] : configs) { diff --git a/src/video.h b/src/video.h index eb8eabc3..98ab78e3 100644 --- a/src/video.h +++ b/src/video.h @@ -295,6 +295,8 @@ namespace video { /* Encoding configuration requested by remote client */ struct config_t { + std::optional display; + int width; // Video width in pixels int height; // Video height in pixels int framerate; // Requested framerate, used in individual frame bitrate budget calculation