single sunshine decode all

This commit is contained in:
pigeatgarlic 2024-04-05 05:38:17 -07:00
parent 25a8eae190
commit 520215a2d6
9 changed files with 162 additions and 232 deletions

View File

@ -9,8 +9,6 @@
#include <sstream>
#include <vector>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
@ -24,25 +22,6 @@ using namespace std::literals;
#endif
typedef struct {
Queue queues[QueueType::Max];
Event events[EVENT_TYPE_MAX];
interprocess_mutex lock;
}SharedMemoryInternal;
EXPORTS(void)
lock_shared_memory(SharedMemory* memory){
SharedMemoryInternal* internal = (SharedMemoryInternal*) memory;
internal->lock.lock();
}
EXPORTS(void)
unlock_shared_memory(SharedMemory* memory){
SharedMemoryInternal* internal = (SharedMemoryInternal*) memory;
internal->lock.unlock();
}
std::string gen_random(const int len) {
static const char alphanum[] =
@ -63,18 +42,18 @@ std::string gen_random(const int len) {
}
std::string randkey = gen_random(20);
managed_shared_memory segment(create_only, randkey.c_str(), 2 * sizeof(SharedMemoryInternal));
managed_shared_memory segment(create_only, randkey.c_str(), 2 * sizeof(SharedMemory));
void
init_shared_memory(SharedMemory* memory){
for (int i = 0; i < QUEUE_SIZE; i++)
for (int j = 0; j < QueueType::Max; j++)
memory->queues[j].order[i] = -1;
for (int j = 0; j < QueueType::QueueMax; j++) {
for (int k = 0; k < EventType::EventMax; k++)
memory->queues[j].events[k].read = 1;
for (int i = 0; i < EventType::EVENT_TYPE_MAX; i++)
memory->events[i].read = 1;
memory->queues[j].index = QUEUE_SIZE - 1;
}
}
@ -115,7 +94,7 @@ EXPORTS(SharedMemory*)
allocate_shared_memory(char* rand) {
//Allocate a portion of the segment (raw memory)
std::size_t free_memory = segment.get_free_memory();
SharedMemory* memory = (SharedMemory*)segment.allocate(sizeof(SharedMemoryInternal));
SharedMemory* memory = (SharedMemory*)segment.allocate(sizeof(SharedMemory));
init_shared_memory(memory);
//Check invariant

View File

@ -17,8 +17,6 @@ extern "C" {
EXPORT(SharedMemory*) allocate_shared_memory(char* rand) ;
EXPORT(SharedMemory*) obtain_shared_memory(char* rand) ;
EXPORT(void) lock_shared_memory(SharedMemory* memory);
EXPORT(void) unlock_shared_memory(SharedMemory* memory);
EXPORT(void) free_shared_memory(SharedMemory* buffer);
EXPORT(void) deinit_shared_memory();
}

View File

@ -20,23 +20,6 @@ import (
"golang.org/x/sys/windows"
)
const (
audio = 1
video = 2
)
type DataType int
func peek(memory *C.SharedMemory, media DataType) bool {
if media == video {
return memory.queues[C.Video0].order[0] != -1
} else if media == audio {
return memory.queues[C.Audio].order[0] != -1
}
panic(fmt.Errorf("unknown data type"))
}
func byteSliceToString(s []byte) string {
n := bytes.IndexByte(s, 0)
if n >= 0 {
@ -90,14 +73,6 @@ func main() {
if err != nil {
panic(err)
}
lock, err := mod.FindProc("lock_shared_memory")
if err != nil {
panic(err)
}
unlock, err := mod.FindProc("unlock_shared_memory")
if err != nil {
panic(err)
}
buffer := make([]byte, 128)
_, _, err = proc.Call(
@ -115,44 +90,42 @@ func main() {
}
memory := (*C.SharedMemory)(unsafe.Pointer(pointer))
handle_video := func() {
lock.Call(pointer)
defer unlock.Call(pointer)
block := memory.queues[C.Video0].array[memory.queues[C.Video0].order[0]]
fmt.Printf("video buffer %d\n", block.size)
for i := 0; i < C.QUEUE_SIZE-1; i++ {
memory.queues[C.Video0].order[i] = memory.queues[C.Video0].order[i+1]
}
memory.queues[C.Video0].order[C.QUEUE_SIZE-1] = -1
}
handle_audio := func() {
lock.Call(pointer)
defer unlock.Call(pointer)
block := memory.queues[C.Audio].array[memory.queues[C.Audio].order[0]]
fmt.Printf("audio buffer %d\n", block.size)
for i := 0; i < C.QUEUE_SIZE-1; i++ {
memory.queues[C.Audio].order[i] = memory.queues[C.Audio].order[i+1]
}
memory.queues[C.Audio].order[C.QUEUE_SIZE-1] = -1
}
go func() {
for {
for peek(memory, video) {
handle_video()
}
for peek(memory, audio) {
handle_audio()
}
memory.queues[C.Video0].metadata.active = C.int(0)
memory.queues[C.Video1].metadata.active = C.int(0)
memory.queues[C.Audio].metadata.active = C.int(0)
time.Sleep(time.Millisecond)
time.Sleep(time.Second * 10)
memory.queues[C.Video0].metadata.active = C.int(1)
memory.queues[C.Video1].metadata.active = C.int(1)
memory.queues[C.Audio].metadata.active = C.int(1)
time.Sleep(time.Second * 10)
}
}()
go func() {
indexes := make([]int,C.QueueMax)
for i,_ := range indexes {
indexes[i] = int(memory.queues[i].index)
}
for {
for queue_type,_ := range indexes {
if memory.queues[queue_type].metadata.running != 1 {
continue
}
for int(memory.queues[queue_type].index) > indexes[queue_type] {
new_index := indexes[queue_type] + 1
real_index := new_index % C.QUEUE_SIZE
block := memory.queues[queue_type].array[real_index]
fmt.Printf("Queue type %d, downstream index %d, upstream index %d, receive size %d\n",queue_type,new_index, memory.queues[queue_type].index,block.size)
indexes[queue_type] = new_index;
}
}
time.Sleep(time.Microsecond * 100)
}
}()

View File

@ -1,4 +1,4 @@
#define QUEUE_SIZE 16
#define QUEUE_SIZE 256
#define PACKET_SIZE 32 * 1024
enum QueueType {
@ -6,52 +6,57 @@ enum QueueType {
Video1,
Audio,
Microphone,
Max
QueueMax
};
typedef enum _EventType {
POINTER_VISIBLE,
CHANGE_BITRATE,
CHANGE_FRAMERATE,
IDR_FRAME,
STOP,
HDR_CALLBACK,
EVENT_TYPE_MAX
Pointer,
Bitrate,
Framerate,
Idr,
Hdr,
Stop,
EventMax
} EventType;
typedef struct {
int is_idr;
enum QueueType type;
}Metadata;
}PacketMetadata;
typedef struct {
int active;
int running;
char display[64];
}QueueMetadata;
typedef struct {
int size;
Metadata metadata;
PacketMetadata metadata;
char data[PACKET_SIZE];
} Packet;
typedef enum _DataType {
HDR_INFO,
NUMBER,
STRING,
} DataType;
typedef struct {
int read;
DataType type;
int data_size;
int value_number;
char value_raw[PACKET_SIZE];
int data_size;
DataType type;
int read;
} Event;
typedef struct _Queue{
int index;
QueueMetadata metadata;
Event events[EventMax];
Packet array[QUEUE_SIZE];
int order[QUEUE_SIZE];
}Queue;
typedef struct {
Queue queues[Max];
Event events[EVENT_TYPE_MAX];
Queue queues[QueueMax];
}SharedMemory;

View File

@ -96,8 +96,8 @@ namespace audio {
auto control_shared = safe::make_shared<audio_ctx_t>(start_audio_control, stop_audio_control);
void
encodeThread(sample_queue_t samples, config_t config, void *channel_data) {
auto packets = mail::man->queue<packet_t>(mail::audio_packets);
encodeThread(safe::mail_t mail,sample_queue_t samples, config_t config, void *channel_data) {
auto packets = mail->queue<packet_t>(mail::audio_packets);
auto stream = &stream_configs[map_stream(config.channels, config.flags[config_t::HIGH_QUALITY])];
// Encoding takes place on this thread
@ -204,7 +204,7 @@ namespace audio {
platf::adjust_thread_priority(platf::thread_priority_e::critical);
auto samples = std::make_shared<sample_queue_t::element_type>(30);
std::thread thread { encodeThread, samples, config, channel_data };
std::thread thread { encodeThread, mail, samples, config, channel_data };
auto fg = util::fail_guard([&]() {
samples->stop();

View File

@ -10,16 +10,14 @@
#include <iostream>
#include <sstream>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
using namespace boost::interprocess;
using namespace std::literals;
SharedMemoryInternal* obtain_shared_memory(char* rand) {
SharedMemory* obtain_shared_memory(char* rand) {
std::vector<std::string> strings;
std::istringstream f(rand);
@ -40,83 +38,43 @@ SharedMemoryInternal* obtain_shared_memory(char* rand) {
static managed_shared_memory segment(open_only, key.c_str());
//Get buffer local address from handle
SharedMemoryInternal* memory = (SharedMemoryInternal*)segment.get_address_from_handle(handle);
SharedMemory* memory = (SharedMemory*)segment.get_address_from_handle(handle);
return memory;
}
int queue_size(int* queue) {
int i = 0;
while (*queue != -1 && i != QUEUE_SIZE){ // wait while queue is full
queue++;
i++;
}
return i;
}
int find_available_slot(int* orders) {
int available = -1;
// find available packet slot
for (int k = 0; k < QUEUE_SIZE; k++) {
int fnd = 0;
int* copy = orders;
int j = 0;
while ( *copy != -1 && j != QUEUE_SIZE) {
if (*copy == k) {
fnd = 1;
break;
}
j++;
copy++;
}
if (!fnd) {
available = k;
break;
}
}
return available;
}
void
push_packet(SharedMemoryInternal* memory,
push_packet(Queue* queue,
void* data,
int size,
Metadata metadata){
PacketMetadata metadata){
// wait while queue is full
while (queue_size(memory->queues[metadata.type].order) == QUEUE_SIZE)
std::this_thread::sleep_for(1ms);
auto new_index = queue->index + 1;
scoped_lock<interprocess_mutex> lock(memory->lock);
int available = find_available_slot(memory->queues[metadata.type].order);
memory->queues[metadata.type].order[queue_size(memory->queues[metadata.type].order)] = available;
Packet* block = &memory->queues[metadata.type].array[available];
auto real_index = new_index % QUEUE_SIZE;
Packet* block = &queue->array[real_index];
memcpy(block->data,data,size);
block->size = size;
block->metadata = metadata;
//always update index after write data
queue->index = new_index;
}
void
raise_event(SharedMemoryInternal* memory, EventType type, Event event){
raise_event(Queue* queue, EventType type, Event event){
event.read = false;
memcpy(&memory->events[type],&event,sizeof(Event));
memcpy(&queue->events[type],&event,sizeof(Event));
}
int
peek_event(SharedMemoryInternal* memory, EventType type){
peek_event(Queue* memory, EventType type){
return !memory->events[type].read;
}
Event
pop_event(SharedMemoryInternal* memory, EventType type){
memory->events[type].read = true;
return memory->events[type];
pop_event(Queue* queue, EventType type){
queue->events[type].read = true;
return queue->events[type];
}

View File

@ -7,28 +7,19 @@
#include "thread_pool.h"
#include "thread_safe.h"
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <smemory.h>
typedef struct {
Queue queues[QueueType::Max];
Event events[EVENT_TYPE_MAX];
boost::interprocess::interprocess_mutex lock;
}SharedMemoryInternal;
SharedMemoryInternal*
SharedMemory*
obtain_shared_memory(char* key);
void
push_packet(SharedMemoryInternal* memory, void* data, int size, Metadata metadata);
push_packet(Queue* memory, void* data, int size, PacketMetadata metadata);
void
raise_event(SharedMemoryInternal* memory, EventType type, Event event);
raise_event(Queue* memory, EventType type, Event event);
int
peek_event(SharedMemoryInternal* memory, EventType type);
peek_event(Queue* memory, EventType type);
Event
pop_event(SharedMemoryInternal* memory, EventType type);
pop_event(Queue* memory, EventType type);

View File

@ -107,8 +107,8 @@ main(int argc, char *argv[]) {
task_pool.start(1);
// Create signal handler after logging has been initialized
auto shutdown_event = mail::man->event<bool>(mail::shutdown);
on_signal(SIGINT, [&force_shutdown, shutdown_event]() {
auto process_shutdown_event = mail::man->event<bool>(mail::shutdown);
on_signal(SIGINT, [&force_shutdown, process_shutdown_event]() {
BOOST_LOG(info) << "Interrupt handler called"sv;
auto task = []() {
@ -117,10 +117,10 @@ main(int argc, char *argv[]) {
};
force_shutdown = task_pool.pushDelayed(task, 10s).task_id;
shutdown_event->raise(true);
process_shutdown_event->raise(true);
});
on_signal(SIGTERM, [&force_shutdown, shutdown_event]() {
on_signal(SIGTERM, [&force_shutdown, process_shutdown_event]() {
BOOST_LOG(info) << "Terminate handler called"sv;
auto task = []() {
@ -129,7 +129,7 @@ main(int argc, char *argv[]) {
};
force_shutdown = task_pool.pushDelayed(task, 10s).task_id;
shutdown_event->raise(true);
process_shutdown_event->raise(true);
});
// If any of the following fail, we log an error and continue event though sunshine will not function correctly.
@ -146,64 +146,90 @@ main(int argc, char *argv[]) {
}
//Get buffer local address from handle
SharedMemoryInternal* memory = obtain_shared_memory(argv[1]);
SharedMemory* memory = obtain_shared_memory(argv[1]);
auto video_capture = std::thread{[&](){
std::optional<std::string> display = std::string("123");
if (argc > 2)
display = std::string(argv[2]);
auto video_capture = [&](safe::mail_t mail, char* displayin){
std::optional<std::string> display = std::nullopt;
if (strlen(displayin) > 0)
display = std::string(displayin);
video::capture(mail::man,video::config_t{
video::capture(mail,video::config_t{
display, 1920, 1080, 60, 6000, 1, 0, 1, 0, 0
},NULL);
}};
auto audio_capture = std::thread{[&](){
audio::capture(mail::man,audio::config_t{
};
auto audio_capture = [&](safe::mail_t mail){
audio::capture(mail,audio::config_t{
10,2,3,0
},NULL);
}};
};
auto video_packets = mail::man->queue<video::packet_t>(mail::video_packets);
auto audio_packets = mail::man->queue<audio::packet_t>(mail::audio_packets);
auto bitrate = mail::man->event<int>(mail::bitrate);
auto framerate = mail::man->event<int>(mail::framerate);
auto idr = mail::man->event<bool>(mail::idr);
auto input = input::alloc(mail::man);
auto push = std::thread{[&](){
while (!shutdown_event->peek()) {
auto push = [process_shutdown_event](safe::mail_t mail, Queue* queue){
auto video_packets = mail->queue<video::packet_t>(mail::video_packets);
auto audio_packets = mail->queue<audio::packet_t>(mail::audio_packets);
auto bitrate = mail->event<int>(mail::bitrate);
auto framerate = mail->event<int>(mail::framerate);
auto idr = mail->event<bool>(mail::idr);
auto local_shutdown= mail->event<bool>(mail::shutdown);
auto input = input::alloc(mail);
while (!process_shutdown_event->peek() && !local_shutdown->peek() && queue->metadata.active) {
while(video_packets->peek()) {
auto packet = video_packets->pop();
push_packet(memory,packet->data(),packet->data_size(),Metadata{
packet->is_idr(),QueueType::Video0
});
push_packet(queue,packet->data(),packet->data_size(),PacketMetadata{ packet->is_idr() });
}
while(audio_packets->peek()) {
auto packet = audio_packets->pop();
push_packet(memory,packet->second.begin(),packet->second.size(),Metadata{
0,QueueType::Audio
});
push_packet(queue,packet->second.begin(),packet->second.size(),PacketMetadata{ 0 });
}
if(peek_event(memory,EventType::CHANGE_BITRATE))
bitrate->raise(pop_event(memory,EventType::CHANGE_BITRATE).value_number);
if(peek_event(memory,EventType::CHANGE_FRAMERATE))
framerate->raise(pop_event(memory,EventType::CHANGE_FRAMERATE).value_number);
if(peek_event(memory,EventType::POINTER_VISIBLE))
display_cursor = pop_event(memory,EventType::POINTER_VISIBLE).value_number;
if(peek_event(memory,EventType::IDR_FRAME))
idr->raise(pop_event(memory,EventType::IDR_FRAME).value_number > 0);
if(peek_event(queue,EventType::Bitrate))
bitrate->raise(pop_event(queue,EventType::Bitrate).value_number);
if(peek_event(queue,EventType::Framerate))
framerate->raise(pop_event(queue,EventType::Framerate).value_number);
if(peek_event(queue,EventType::Pointer))
display_cursor = pop_event(queue,EventType::Pointer).value_number;
if(peek_event(queue,EventType::Idr))
idr->raise(pop_event(queue,EventType::Idr).value_number > 0);
std::this_thread::sleep_for(1ms);
}
}};
while (!shutdown_event->peek())
std::this_thread::sleep_for(1s);
if (!local_shutdown->peek())
local_shutdown->raise(true);
queue->metadata.running = 0;
};
std::vector<QueueType> actives;
while (!process_shutdown_event->peek()) {
for (int i = 0; i < QueueType::QueueMax; i++) {
auto queue = &memory->queues[i];
if (!queue->metadata.active || queue->metadata.running)
continue;
queue->metadata.running = 1;
if (i == QueueType::Video0 || i == QueueType::Video1) {
auto mail = std::make_shared<safe::mail_raw_t>();
auto capture = std::thread{video_capture,mail,queue->metadata.display};
auto forward = std::thread{push,mail,queue};
capture.detach();
forward.detach();
} else if (i == QueueType::Audio) {
auto mail = std::make_shared<safe::mail_raw_t>();
auto capture = std::thread{audio_capture,mail};
auto forward = std::thread{push,mail,queue};
capture.detach();
forward.detach();
}
}
std::this_thread::sleep_for(100ms);
}
push.join();
audio_capture.join();
video_capture.join();
task_pool.stop();
task_pool.join();

View File

@ -1781,7 +1781,7 @@ namespace video {
BOOST_LOG(info) << "framerate "sv << config->framerate;
BOOST_LOG(info) << "bitrate "sv << config->bitrate;
auto packets = mail::man->queue<packet_t>(mail::video_packets);
auto packets = mail->queue<packet_t>(mail::video_packets);
auto idr_events = mail->event<bool>(mail::idr);
auto invalidate_ref_frames_events = mail->event<std::pair<int64_t, int64_t>>(mail::invalidate_ref_frames);
@ -2243,7 +2243,7 @@ namespace video {
ref->encode_session_ctx_queue.raise(sync_session_ctx_t {
&join_event,
mail->event<bool>(mail::shutdown),
mail::man->queue<packet_t>(mail::video_packets),
mail->queue<packet_t>(mail::video_packets),
std::move(idr_events),
mail->event<hdr_info_t>(mail::hdr),
mail->event<input::touch_port_t>(mail::touch_port),