diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index 48a78c17..5b60abc0 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -110,46 +110,10 @@ public: _map_addr_session->emplace(addr, std::make_pair(0u, &session)); } - void erase_session(session_t &session) { - auto lg = _map_addr_session.lock(); - - auto pos = std::find_if(std::begin(_map_addr_session.raw), std::end(_map_addr_session.raw), [session_p=&session](auto ¤t_port_and_session) { - return session_p == current_port_and_session.second.second; - }); - - _map_addr_session->erase(pos); - } - // Get session associated with address. // If none are found, try to find a session not yet claimed. (It will be marked by a port of value 0 // If none of those are found, return nullptr - session_t *get_session(const ENetAddress &address) { - TUPLE_2D(port, addr_string, platf::from_sockaddr_ex((sockaddr*)&address.address)); - - auto lg = _map_addr_session.lock(); - TUPLE_2D(begin, end, _map_addr_session->equal_range(addr_string)); - - auto it = std::end(_map_addr_session.raw); - for(auto pos = begin; pos != end; ++pos) { - TUPLE_2D_REF(session_port, session_p, pos->second); - - if(port == session_port) { - return session_p; - } - else if(session_port == 0) { - it = pos; - } - } - - if(it != std::end(_map_addr_session.raw)) { - TUPLE_2D_REF(session_port, session_p, it->second); - session_port = port; - - return session_p; - } - - return nullptr; - } + session_t *get_session(const net::peer_t peer); // Circular dependency: // iterate refers to session @@ -158,11 +122,6 @@ public: // Therefore, iterate is implemented further down the source file void iterate(std::chrono::milliseconds timeout); - template - void iterate(std::chrono::duration timeout) { - iterate(std::chrono::floor(timeout)); - } - void map(uint16_t type, std::function cb) { _map_type_cb.emplace(type, std::move(cb)); } @@ -227,10 +186,16 @@ struct session_t { udp::endpoint peer; } audio; + struct { + net::peer_t peer; + } control; + crypto::aes_t gcm_key; crypto::aes_t iv; safe::signal_t shutdown_event; + safe::signal_t controlEnd; + std::atomic state; }; @@ -242,12 +207,42 @@ std::shared_ptr input; static auto broadcast = safe::make_shared(start_broadcast, end_broadcast); safe::signal_t broadcast_shutdown_event; +session_t *control_server_t::get_session(const net::peer_t peer) { + TUPLE_2D(port, addr_string, platf::from_sockaddr_ex((sockaddr*)&peer->address.address)); + + auto lg = _map_addr_session.lock(); + TUPLE_2D(begin, end, _map_addr_session->equal_range(addr_string)); + + auto it = std::end(_map_addr_session.raw); + for(auto pos = begin; pos != end; ++pos) { + TUPLE_2D_REF(session_port, session_p, pos->second); + + if(port == session_port) { + return session_p; + } + else if(session_port == 0) { + it = pos; + } + } + + if(it != std::end(_map_addr_session.raw)) { + TUPLE_2D_REF(session_port, session_p, it->second); + + session_p->control.peer = peer; + session_port = port; + + return session_p; + } + + return nullptr; +} + void control_server_t::iterate(std::chrono::milliseconds timeout) { ENetEvent event; auto res = enet_host_service(_host.get(), &event, timeout.count()); if(res > 0) { - auto session = get_session(event.peer->address); + auto session = get_session(event.peer); if(!session) { BOOST_LOG(warning) << "Rejected connection from ["sv << platf::from_sockaddr((sockaddr*)&event.peer->address.address) << "]: it's not properly set up"sv; enet_peer_disconnect_now(event.peer, 0); @@ -466,13 +461,26 @@ void controlBroadcastThread(safe::signal_t *shutdown_event, control_server_t *se auto lg = server->_map_addr_session.lock(); auto now = std::chrono::steady_clock::now(); - for(auto &[addr,port_session] : server->_map_addr_session.raw) { + + KITTY_WHILE_LOOP(auto pos = std::begin(*server->_map_addr_session), pos != std::end(*server->_map_addr_session), { + TUPLE_2D_REF(addr, port_session, *pos); auto session = port_session.second; + if(now > session->pingTimeout) { BOOST_LOG(info) << addr << ": Ping Timeout"sv; session::stop(*session); } - } + + if(session->state.load(std::memory_order_acquire) == session::state_e::STOPPING) { + pos = server->_map_addr_session->erase(pos); + + enet_peer_disconnect_now(session->control.peer, 0); + session->controlEnd.raise(true); + continue; + } + + ++pos; + }) } if(proc::proc.running() == -1) { @@ -833,7 +841,6 @@ void stop(session_t &session) { return; } - session.broadcast_ref->control_server.erase_session(session); session.shutdown_event.raise(true); } @@ -842,6 +849,8 @@ void join(session_t &session) { session.videoThread.join(); BOOST_LOG(debug) << "Waiting for audio to end..."sv; session.audioThread.join(); + BOOST_LOG(debug) << "Waiting for control to end..."sv; + session.controlEnd.view(); BOOST_LOG(debug) << "Session ended"sv; } @@ -869,6 +878,7 @@ std::shared_ptr alloc(config_t &config, crypto::aes_t &gcm_key, crypt session->audio.frame = 1; + session->control.peer = nullptr; session->state.store(state_e::STOPPED, std::memory_order_relaxed); return session; diff --git a/sunshine/sync.h b/sunshine/sync.h index c8b21f0a..a0613980 100644 --- a/sunshine/sync.h +++ b/sunshine/sync.h @@ -11,99 +11,72 @@ namespace util { -template +template class sync_t { public: - static_assert(N > 0, "sync_t should have more than zero mutexes"); - using value_type = T; + using value_t = T; + using mutex_t = M; - template - std::lock_guard lock() { - return std::lock_guard { std::get(_lock) }; + std::lock_guard lock() { + return std::lock_guard { _lock }; } template sync_t(Args&&... args) : raw {std::forward(args)... } {} sync_t &operator=(sync_t &&other) noexcept { - for(auto &l : _lock) { - l.lock(); - } - - for(auto &l : other._lock) { - l.lock(); - } + std::lock(_lock, other._lock); raw = std::move(other.raw); - for(auto &l : _lock) { - l.unlock(); - } - - for(auto &l : other._lock) { - l.unlock(); - } + _lock.unlock(); + other._lock.unlock(); return *this; } sync_t &operator=(sync_t &other) noexcept { - for(auto &l : _lock) { - l.lock(); - } - - for(auto &l : other._lock) { - l.lock(); - } + std::lock(_lock, other._lock); raw = other.raw; - for(auto &l : _lock) { - l.unlock(); - } - - for(auto &l : other._lock) { - l.unlock(); - } + _lock.unlock(); + other._lock.unlock(); return *this; } - sync_t &operator=(const value_type &val) noexcept { - for(auto &l : _lock) { - l.lock(); - } + sync_t &operator=(const value_t &val) noexcept { + auto lg = lock(); raw = val; - for(auto &l : _lock) { - l.unlock(); - } - return *this; } - sync_t &operator=(value_type &&val) noexcept { - for(auto &l : _lock) { - l.lock(); - } + sync_t &operator=(value_t &&val) noexcept { + auto lg = lock(); raw = std::move(val); - for(auto &l : _lock) { - l.unlock(); - } - return *this; } - value_type *operator->() { + value_t *operator->() { return &raw; } - value_type raw; + value_t &operator*() { + return raw; + } + + const value_t &operator*() const { + return raw; + } + + value_t raw; private: - std::array _lock; + mutex_t _lock; }; } diff --git a/sunshine/video.cpp b/sunshine/video.cpp index 72738c66..077cad9a 100644 --- a/sunshine/video.cpp +++ b/sunshine/video.cpp @@ -101,7 +101,6 @@ void captureThread(std::shared_ptr> capture_ctx_que for(auto &capture_ctx : capture_ctx_queue->unsafe()) { capture_ctx.images->stop(); } - }); auto disp = platf::display();