multi channel shared memory

This commit is contained in:
pigeatgarlic 2024-04-04 16:51:36 -07:00
parent 53f16cc51a
commit 801fb18c32
10 changed files with 148 additions and 178 deletions

View File

@ -69,10 +69,9 @@ managed_shared_memory segment(create_only, randkey.c_str(), 2 * sizeof(SharedMem
void
init_shared_memory(SharedMemory* memory){
for (int i = 0; i < QUEUE_SIZE; i++) {
memory->queues[QueueType::Audio].order[i] = -1;
memory->queues[QueueType::Video].order[i] = -1;
}
for (int i = 0; i < QUEUE_SIZE; i++)
for (int j = 0; j < QueueType::Max; j++)
memory->queues[j].order[i] = -1;
for (int i = 0; i < EventType::EVENT_TYPE_MAX; i++)
memory->events[i].read = 1;

View File

@ -14,13 +14,22 @@ extern "C" {
#define QUEUE_SIZE 16
#define PACKET_SIZE 32 * 1024
enum QueueType {
Video0,
Video1,
Audio,
Microphone,
Max
};
typedef struct {
int is_idr;
}VideoMetadata;
QueueType type;
}Metadata;
typedef struct {
int size;
VideoMetadata metadata;
Metadata metadata;
char data[PACKET_SIZE];
} Packet;
@ -28,7 +37,6 @@ typedef enum _EventType {
POINTER_VISIBLE,
CHANGE_BITRATE,
CHANGE_FRAMERATE,
CHANGE_DISPLAY,
IDR_FRAME,
STOP,
@ -50,12 +58,6 @@ typedef struct {
int read;
} Event;
enum QueueType {
Video,
Audio,
Microphone,
Max
};
typedef struct _Queue{
Packet array[QUEUE_SIZE];

View File

@ -4,13 +4,22 @@ package main
#define QUEUE_SIZE 16
#define PACKET_SIZE 32 * 1024
enum QueueType {
Video0,
Video1,
Audio,
Microphone,
Max
};
typedef struct {
int is_idr;
}VideoMetadata;
enum QueueType type;
}Metadata;
typedef struct {
int size;
VideoMetadata metadata;
Metadata metadata;
char data[PACKET_SIZE];
} Packet;
@ -18,7 +27,6 @@ typedef enum _EventType {
POINTER_VISIBLE,
CHANGE_BITRATE,
CHANGE_FRAMERATE,
CHANGE_DISPLAY,
IDR_FRAME,
STOP,
@ -40,12 +48,6 @@ typedef struct {
int read;
} Event;
enum QueueType {
Video,
Audio,
Microphone,
Max
};
typedef struct _Queue{
Packet array[QUEUE_SIZE];
@ -57,9 +59,6 @@ typedef struct {
Event events[EVENT_TYPE_MAX];
}SharedMemory;
*/
import "C"
import (
@ -87,7 +86,7 @@ type DataType int
func peek(memory *C.SharedMemory, media DataType) bool {
if media == video {
return memory.queues[C.Video].order[0] != -1
return memory.queues[C.Video0].order[0] != -1
} else if media == audio {
return memory.queues[C.Audio].order[0] != -1
}
@ -177,14 +176,14 @@ func main() {
lock.Call(pointer)
defer unlock.Call(pointer)
block := memory.queues[C.Video].array[memory.queues[C.Video].order[0]]
block := memory.queues[C.Video0].array[memory.queues[C.Video0].order[0]]
fmt.Printf("video buffer %d\n", block.size)
for i := 0; i < C.QUEUE_SIZE-1; i++ {
memory.queues[C.Video].order[i] = memory.queues[C.Video].order[i+1]
memory.queues[C.Video0].order[i] = memory.queues[C.Video0].order[i+1]
}
memory.queues[C.Video].order[C.QUEUE_SIZE-1] = -1
memory.queues[C.Video0].order[C.QUEUE_SIZE-1] = -1
}
handle_audio := func() {

View File

@ -28,7 +28,8 @@ namespace mail {
MAIL(shutdown);
MAIL(video_packets);
MAIL(audio_packets);
MAIL(switch_display);
MAIL(bitrate);
MAIL(framerate);
// Local mail
MAIL(touch_port);

View File

@ -204,11 +204,6 @@ namespace input {
BOOST_LOG(debug) << "Apply Shortcut: 0x"sv << util::hex((std::uint8_t) keyCode).to_string_view();
if (keyCode >= VK_F1 && keyCode <= VK_F13) {
mail::man->event<int>(mail::switch_display)->raise(keyCode - VK_F1);
return 1;
}
switch (keyCode) {
case 0x4E /* VKEY_N */:
display_cursor = !display_cursor;

View File

@ -83,96 +83,26 @@ int find_available_slot(int* orders) {
}
void
push_audio_packet(SharedMemory* memory, void* data, int size){
// wait while queue is full
while (queue_size(memory->queues[QueueType::Audio].order) == QUEUE_SIZE)
std::this_thread::sleep_for(1ms);
scoped_lock<interprocess_mutex> lock(memory->lock);
int available = find_available_slot(memory->queues[QueueType::Audio].order);
memory->queues[QueueType::Audio].order[queue_size(memory->queues[QueueType::Audio].order)] = available;
Packet* block = &memory->queues[QueueType::Audio].array[available];
memcpy(block->data,data,size);
block->size = size;
}
void
push_video_packet(SharedMemory* memory,
push_packet(SharedMemory* memory,
void* data,
int size,
VideoMetadata metadata){
Metadata metadata){
// wait while queue is full
while (queue_size(memory->queues[QueueType::Video].order) == QUEUE_SIZE)
while (queue_size(memory->queues[metadata.type].order) == QUEUE_SIZE)
std::this_thread::sleep_for(1ms);
scoped_lock<interprocess_mutex> lock(memory->lock);
int available = find_available_slot(memory->queues[QueueType::Video].order);
memory->queues[QueueType::Video].order[queue_size(memory->queues[QueueType::Video].order)] = available;
Packet* block = &memory->queues[QueueType::Video].array[available];
int available = find_available_slot(memory->queues[metadata.type].order);
memory->queues[metadata.type].order[queue_size(memory->queues[metadata.type].order)] = available;
Packet* block = &memory->queues[metadata.type].array[available];
memcpy(block->data,data,size);
block->size = size;
block->metadata = metadata;
}
int
peek_video_packet(SharedMemory* memory){
return memory->queues[QueueType::Video].order[0] != -1;
}
int
peek_audio_packet(SharedMemory* memory){
return memory->queues[QueueType::Audio].order[0] != -1;
}
void
pop_audio_packet(SharedMemory* memory, void* data, int* size){
while (!peek_audio_packet(memory))
std::this_thread::sleep_for(1ms);
scoped_lock<interprocess_mutex> lock(memory->lock);
// std::cout << "Audio buffer size : " << queue_size(memory->queues[QueueType::Audio].order) << "\n";
int pop = memory->queues[QueueType::Audio].order[0];
Packet *block = &memory->queues[QueueType::Audio].array[pop];
memcpy(data,block->data,block->size);
*size = block->size;
// reorder
for (int i = 0; i < QUEUE_SIZE - 1; i++)
memory->queues[QueueType::Audio].order[i] = memory->queues[QueueType::Audio].order[i+1];
memory->queues[QueueType::Audio].order[QUEUE_SIZE - 1] = -1;
}
VideoMetadata
pop_video_packet(SharedMemory* memory, void* data, int* size){
while (!peek_video_packet(memory))
std::this_thread::sleep_for(1ms);
scoped_lock<interprocess_mutex> lock(memory->lock);
// std::cout << "Video buffer size : " << queue_size(memory->queues[QueueType::Video].order) << "\n";
int pop = memory->queues[QueueType::Video].order[0];
Packet *block = &memory->queues[QueueType::Video].array[pop];
memcpy(data,block->data,block->size);
*size = block->size;
auto copy = block->metadata;
// reorder
for (int i = 0; i < QUEUE_SIZE - 1; i++)
memory->queues[QueueType::Video].order[i] = memory->queues[QueueType::Video].order[i+1];
memory->queues[QueueType::Video].order[QUEUE_SIZE - 1] = -1;
return copy;
}
void
raise_event(SharedMemory* memory, EventType type, Event event){
@ -189,10 +119,4 @@ Event
pop_event(SharedMemory* memory, EventType type){
memory->events[type].read = true;
return memory->events[type];
}
void
wait_event(SharedMemory* memory, EventType type){
while(memory->events[type].read)
std::this_thread::sleep_for(1ms);
}

View File

@ -14,15 +14,22 @@
#define QUEUE_SIZE 16
#define PACKET_SIZE 32 * 1024
enum QueueType {
Video0,
Video1,
Audio,
Microphone,
Max
};
typedef struct {
int is_idr;
}VideoMetadata;
QueueType type;
}Metadata;
typedef struct {
int size;
VideoMetadata metadata;
Metadata metadata;
char data[PACKET_SIZE];
} Packet;
@ -30,7 +37,6 @@ typedef enum _EventType {
POINTER_VISIBLE,
CHANGE_BITRATE,
CHANGE_FRAMERATE,
CHANGE_DISPLAY,
IDR_FRAME,
STOP,
@ -52,12 +58,6 @@ typedef struct {
int read;
} Event;
enum QueueType {
Video,
Audio,
Microphone,
Max
};
typedef struct _Queue{
Packet array[QUEUE_SIZE];
@ -74,22 +74,7 @@ SharedMemory*
obtain_shared_memory(char* key);
void
push_audio_packet(SharedMemory* memory, void* data, int size);
void
push_video_packet(SharedMemory* memory, void* data, int size, VideoMetadata metadata);
int
peek_video_packet(SharedMemory* memory);
int
peek_audio_packet(SharedMemory* memory);
void
pop_audio_packet(SharedMemory* memory, void* data, int* size);
VideoMetadata
pop_video_packet(SharedMemory* memory, void* data, int* size);
push_packet(SharedMemory* memory, void* data, int size, Metadata metadata);
void
raise_event(SharedMemory* memory, EventType type, Event event);
@ -98,7 +83,4 @@ int
peek_event(SharedMemory* memory, EventType type);
Event
pop_event(SharedMemory* memory, EventType type);
void
wait_event(SharedMemory* memory, EventType type);
pop_event(SharedMemory* memory, EventType type);

View File

@ -149,8 +149,12 @@ main(int argc, char *argv[]) {
SharedMemory* memory = obtain_shared_memory(argv[1]);
auto video_capture = std::thread{[&](){
std::optional<std::string> display = std::string("123");
if (argc > 2)
display = std::string(argv[2]);
video::capture(mail::man,video::config_t{
1920, 1080, 60, 6000, 1, 0, 1, 0, 0
display, 1920, 1080, 60, 6000, 1, 0, 1, 0, 0
},NULL);
}};
auto audio_capture = std::thread{[&](){
@ -161,19 +165,35 @@ main(int argc, char *argv[]) {
auto video_packets = mail::man->queue<video::packet_t>(mail::video_packets);
auto audio_packets = mail::man->queue<audio::packet_t>(mail::audio_packets);
auto bitrate = mail::man->event<int>(mail::bitrate);
auto framerate = mail::man->event<int>(mail::framerate);
auto idr = mail::man->event<bool>(mail::idr);
auto input = input::alloc(mail::man);
auto push = std::thread{[&](){
while (!shutdown_event->peek()) {
while(video_packets->peek()) {
auto packet = video_packets->pop();
push_video_packet(memory,packet->data(),packet->data_size(),VideoMetadata{
packet->is_idr()
push_packet(memory,packet->data(),packet->data_size(),Metadata{
packet->is_idr(),QueueType::Video0
});
}
while(audio_packets->peek()) {
auto packet = audio_packets->pop();
push_audio_packet(memory,packet->second.begin(),packet->second.size());
push_packet(memory,packet->second.begin(),packet->second.size(),Metadata{
0,QueueType::Audio
});
}
if(peek_event(memory,EventType::CHANGE_BITRATE))
bitrate->raise(pop_event(memory,EventType::CHANGE_BITRATE).value_number);
if(peek_event(memory,EventType::CHANGE_FRAMERATE))
framerate->raise(pop_event(memory,EventType::CHANGE_FRAMERATE).value_number);
if(peek_event(memory,EventType::POINTER_VISIBLE))
display_cursor = pop_event(memory,EventType::POINTER_VISIBLE).value_number;
if(peek_event(memory,EventType::IDR_FRAME))
idr->raise(pop_event(memory,EventType::IDR_FRAME).value_number > 0);
std::this_thread::sleep_for(1ms);
}
}};

View File

@ -1029,7 +1029,6 @@ namespace video {
}
});
auto switch_display_event = mail::man->event<int>(mail::switch_display);
// Wait for the initial capture context or a request to stop the queue
auto initial_capture_ctx = capture_ctx_queue->pop();
@ -1163,6 +1162,12 @@ namespace video {
capture_ctx->images->raise(img);
}
if (capture_ctx->config.display.has_value() &&
capture_ctx->config.display.value() != display_names[display_p]) {
artificial_reinit = true;
return false;
}
++capture_ctx;
})
@ -1174,10 +1179,6 @@ namespace video {
capture_ctxs.emplace_back(std::move(*capture_ctx_queue->pop()));
}
if (switch_display_event->peek()) {
artificial_reinit = true;
return false;
}
return true;
};
@ -1232,9 +1233,20 @@ namespace video {
refresh_displays(encoder.platform_formats->dev_type, display_names, display_p);
// Process any pending display switch with the new list of displays
if (switch_display_event->peek()) {
display_p = std::clamp(*switch_display_event->pop(), 0, (int) display_names.size() - 1);
if (capture_ctxs.front().config.display.has_value() &&
capture_ctxs.front().config.display.value() != display_names[display_p]) {
for (int i = 0; i < display_names.size();i ++) {
if (capture_ctxs.front().config.display.value() == display_names[i]) {
BOOST_LOG(info) << "Switched to display " << display_names[i];
display_p = i;
goto next;
}
}
BOOST_LOG(info) << "Display " << capture_ctxs.front().config.display.value() << " not found";
capture_ctxs.front().config.display = std::nullopt;
}
next:
// reset_display() will sleep between retries
reset_display(disp, encoder.platform_formats->dev_type, display_names[display_p], capture_ctxs.front().config);
@ -1751,18 +1763,24 @@ namespace video {
int &frame_nr, // Store progress of the frame number
safe::mail_t mail,
img_event_t images,
config_t config,
config_t* config,
std::shared_ptr<platf::display_t> disp,
std::unique_ptr<platf::encode_device_t> encode_device,
safe::signal_t &reinit_event,
const encoder_t &encoder,
void *channel_data) {
auto session = make_encode_session(disp.get(), encoder, config, disp->width, disp->height, std::move(encode_device));
auto session = make_encode_session(disp.get(), encoder, *config, disp->width, disp->height, std::move(encode_device));
if (!session) {
return;
}
auto shutdown_event = mail->event<bool>(mail::shutdown);
auto bitrate_events = mail->event<int>(mail::bitrate);
auto framerate_events = mail->event<int>(mail::framerate);
BOOST_LOG(info) << "framerate "sv << config->framerate;
BOOST_LOG(info) << "bitrate "sv << config->bitrate;
auto packets = mail::man->queue<packet_t>(mail::video_packets);
auto idr_events = mail->event<bool>(mail::idr);
auto invalidate_ref_frames_events = mail->event<std::pair<int64_t, int64_t>>(mail::invalidate_ref_frames);
@ -1778,6 +1796,11 @@ namespace video {
}
}
std::optional<std::chrono::steady_clock::time_point> frame_timestamp;
auto timestamp = std::chrono::steady_clock::now();
auto now = std::chrono::steady_clock::now();
auto cycle = std::chrono::nanoseconds(1s) / config->framerate;
auto sleep_period = 0ns;
while (true) {
if (shutdown_event->peek() || reinit_event.peek() || !images->running()) {
break;
@ -1791,6 +1814,16 @@ namespace video {
}
}
if (bitrate_events->peek()) {
config->bitrate = bitrate_events->pop().value();
BOOST_LOG(info) << "bitrate changed to "sv << config->bitrate;
break;
} else if (framerate_events->peek()) {
config->framerate = framerate_events->pop().value();
BOOST_LOG(info) << "framerate changed to "sv << config->framerate;
break;
}
if (idr_events->peek()) {
requested_idr_frame = true;
idr_events->pop();
@ -1800,11 +1833,8 @@ namespace video {
session->request_idr_frame();
}
std::optional<std::chrono::steady_clock::time_point> frame_timestamp;
// Encode at a minimum of 10 FPS to avoid image quality issues with static content
if (!requested_idr_frame || images->peek()) {
if (auto img = images->pop(100ms)) {
if (auto img = images->pop()) {
frame_timestamp = img->frame_timestamp;
if (session->convert(*img)) {
BOOST_LOG(error) << "Could not convert image"sv;
@ -1816,11 +1846,18 @@ namespace video {
}
}
sleep_period = 1s / config->framerate - cycle;
if(sleep_period > 0s)
std::this_thread::sleep_for(sleep_period);
if (encode(frame_nr++, *session, packets, channel_data, frame_timestamp)) {
BOOST_LOG(error) << "Could not encode video packet"sv;
return;
}
now = std::chrono::steady_clock::now();
cycle = now - timestamp;
timestamp = now;
session->request_normal_frame();
}
}
@ -1929,8 +1966,6 @@ namespace video {
std::shared_ptr<platf::display_t> disp;
auto switch_display_event = mail::man->event<int>(mail::switch_display);
if (synced_session_ctxs.empty()) {
auto ctx = encode_session_ctx_queue.pop();
if (!ctx) {
@ -1943,11 +1978,20 @@ namespace video {
while (encode_session_ctx_queue.running()) {
// Refresh display names since a display removal might have caused the reinitialization
refresh_displays(encoder.platform_formats->dev_type, display_names, display_p);
// Process any pending display switch with the new list of displays
if (switch_display_event->peek()) {
display_p = std::clamp(*switch_display_event->pop(), 0, (int) display_names.size() - 1);
if (synced_session_ctxs.front()->config.display.has_value() &&
synced_session_ctxs.front()->config.display.value() != display_names[display_p]) {
for (int i = 0; i < display_names.size();i ++) {
if (synced_session_ctxs.front()->config.display.value() == display_names[i]) {
BOOST_LOG(info) << "Switched to display " << display_names[i];
display_p = i;
goto next;
}
}
BOOST_LOG(info) << "Display " << synced_session_ctxs.front()->config.display.value() << " not found";
synced_session_ctxs.front()->config.display = std::nullopt;
}
next:
// reset_display() will sleep between retries
reset_display(disp, encoder.platform_formats->dev_type, display_names[display_p], synced_session_ctxs.front()->config);
@ -2013,6 +2057,12 @@ namespace video {
continue;
}
if (ctx->config.display.has_value() &&
ctx->config.display.value() != display_names[display_p]) {
ec = platf::capture_e::reinit;
return false;
}
if (ctx->idr_events->peek()) {
pos->session->request_idr_frame();
ctx->idr_events->pop();
@ -2042,10 +2092,6 @@ namespace video {
++pos;
})
if (switch_display_event->peek()) {
ec = platf::capture_e::reinit;
return false;
}
return true;
};
@ -2173,7 +2219,7 @@ namespace video {
encode_run(
frame_nr,
mail, images,
config, display,
&config, display,
std::move(encode_device),
ref->reinit_event, *ref->encoder_p,
channel_data);
@ -2286,8 +2332,8 @@ namespace video {
encoder.av1.capabilities.set();
// First, test encoder viability
config_t config_max_ref_frames { 1920, 1080, 60, 1000, 1, 1, 1, 0, 0 };
config_t config_autoselect { 1920, 1080, 60, 1000, 1, 0, 1, 0, 0 };
config_t config_max_ref_frames { std::nullopt, 1920, 1080, 60, 1000, 1, 1, 1, 0, 0 };
config_t config_autoselect { std::nullopt, 1920, 1080, 60, 1000, 1, 0, 1, 0, 0 };
// If the encoder isn't supported at all (not even H.264), bail early
reset_display(disp, encoder.platform_formats->dev_type, config::video.output_name, config_autoselect);
@ -2407,7 +2453,7 @@ namespace video {
}
std::vector<std::pair<encoder_t::flag_e, config_t>> configs {
{ encoder_t::DYNAMIC_RANGE, { 1920, 1080, 60, 1000, 1, 0, 3, 1, 1 } },
{ encoder_t::DYNAMIC_RANGE, { std::nullopt, 1920, 1080, 60, 1000, 1, 0, 3, 1, 1 } },
};
for (auto &[flag, config] : configs) {

View File

@ -295,6 +295,8 @@ namespace video {
/* Encoding configuration requested by remote client */
struct config_t {
std::optional<std::string> display;
int width; // Video width in pixels
int height; // Video height in pixels
int framerate; // Requested framerate, used in individual frame bitrate budget calculation