/** * @file src/stream.cpp * @brief Definitions for the streaming protocols. */ // standard includes #include #include #include // lib includes #include #include extern "C" { // clang-format off #include #include "rswrapper.h" // clang-format on } // local includes #include "config.h" #include "display_device.h" #include "globals.h" #include "input.h" #include "logging.h" #include "network.h" #include "platform/common.h" #include "process.h" #include "stream.h" #include "sync.h" #include "system_tray.h" #include "thread_safe.h" #include "utility.h" #define IDX_START_A 0 #define IDX_START_B 1 #define IDX_INVALIDATE_REF_FRAMES 2 #define IDX_LOSS_STATS 3 #define IDX_INPUT_DATA 5 #define IDX_RUMBLE_DATA 6 #define IDX_TERMINATION 7 #define IDX_PERIODIC_PING 8 #define IDX_REQUEST_IDR_FRAME 9 #define IDX_ENCRYPTED 10 #define IDX_HDR_MODE 11 #define IDX_RUMBLE_TRIGGER_DATA 12 #define IDX_SET_MOTION_EVENT 13 #define IDX_SET_RGB_LED 14 static const short packetTypes[] = { 0x0305, // Start A 0x0307, // Start B 0x0301, // Invalidate reference frames 0x0201, // Loss Stats 0x0204, // Frame Stats (unused) 0x0206, // Input data 0x010b, // Rumble data 0x0109, // Termination 0x0200, // Periodic Ping 0x0302, // IDR frame 0x0001, // fully encrypted 0x010e, // HDR mode 0x5500, // Rumble triggers (Sunshine protocol extension) 0x5501, // Set motion event (Sunshine protocol extension) 0x5502, // Set RGB LED (Sunshine protocol extension) }; namespace asio = boost::asio; namespace sys = boost::system; using asio::ip::tcp; using asio::ip::udp; using namespace std::literals; namespace stream { enum class socket_e : int { video, ///< Video audio ///< Audio }; #pragma pack(push, 1) struct video_short_frame_header_t { uint8_t *payload() { return (uint8_t *) (this + 1); } std::uint8_t headerType; // Always 0x01 for short headers // Sunshine extension // Frame processing latency, in 1/10 ms units // zero when the frame is repeated or there is no backend implementation boost::endian::little_uint16_at frame_processing_latency; // Currently known values: // 1 = Normal P-frame // 2 = IDR-frame // 4 = P-frame with intra-refresh blocks // 5 = P-frame after reference frame invalidation std::uint8_t frameType; // Length of the final packet payload for codecs that cannot handle // zero padding, such as AV1 (Sunshine extension). boost::endian::little_uint16_at lastPayloadLen; std::uint8_t unknown[2]; }; static_assert( sizeof(video_short_frame_header_t) == 8, "Short frame header must be 8 bytes" ); struct video_packet_raw_t { uint8_t *payload() { return (uint8_t *) (this + 1); } RTP_PACKET rtp; char reserved[4]; NV_VIDEO_PACKET packet; }; struct video_packet_enc_prefix_t { std::uint8_t iv[12]; // 12-byte IV is ideal for AES-GCM std::uint32_t frameNumber; std::uint8_t tag[16]; }; struct audio_packet_t { RTP_PACKET rtp; }; struct control_header_v2 { std::uint16_t type; std::uint16_t payloadLength; uint8_t *payload() { return (uint8_t *) (this + 1); } }; struct control_terminate_t { control_header_v2 header; std::uint32_t ec; }; struct control_rumble_t { control_header_v2 header; std::uint32_t useless; std::uint16_t id; std::uint16_t lowfreq; std::uint16_t highfreq; }; struct control_rumble_triggers_t { control_header_v2 header; std::uint16_t id; std::uint16_t left; std::uint16_t right; }; struct control_set_motion_event_t { control_header_v2 header; std::uint16_t id; std::uint16_t reportrate; std::uint8_t type; }; struct control_set_rgb_led_t { control_header_v2 header; std::uint16_t id; std::uint8_t r; std::uint8_t g; std::uint8_t b; }; struct control_hdr_mode_t { control_header_v2 header; std::uint8_t enabled; // Sunshine protocol extension SS_HDR_METADATA metadata; }; typedef struct control_encrypted_t { std::uint16_t encryptedHeaderType; // Always LE 0x0001 std::uint16_t length; // sizeof(seq) + 16 byte tag + secondary header and data // seq is accepted as an arbitrary value in Moonlight std::uint32_t seq; // Monotonically increasing sequence number (used as IV for AES-GCM) uint8_t *payload() { return (uint8_t *) (this + 1); } // encrypted control_header_v2 and payload data follow } *control_encrypted_p; struct audio_fec_packet_t { RTP_PACKET rtp; AUDIO_FEC_HEADER fecHeader; }; #pragma pack(pop) constexpr std::size_t round_to_pkcs7_padded(std::size_t size) { return ((size + 15) / 16) * 16; } constexpr std::size_t MAX_AUDIO_PACKET_SIZE = 1400; using audio_aes_t = std::array; using av_session_id_t = std::variant; // IP address or SS-Ping-Payload from RTSP handshake using message_queue_t = std::shared_ptr>>; using message_queue_queue_t = std::shared_ptr>>; // return bytes written on success // return -1 on error static inline int encode_audio(bool encrypted, const audio::buffer_t &plaintext, uint8_t *destination, crypto::aes_t &iv, crypto::cipher::cbc_t &cbc) { // If encryption isn't enabled if (!encrypted) { std::copy(std::begin(plaintext), std::end(plaintext), destination); return plaintext.size(); } return cbc.encrypt(std::string_view {(char *) std::begin(plaintext), plaintext.size()}, destination, &iv); } static inline void while_starting_do_nothing(std::atomic &state) { while (state.load(std::memory_order_acquire) == session::state_e::STARTING) { std::this_thread::sleep_for(1ms); } } class control_server_t { public: int bind(net::af_e address_family, std::uint16_t port) { _host = net::host_create(address_family, _addr, port); return !(bool) _host; } // Get session associated with address. // If none are found, try to find a session not yet claimed. (It will be marked by a port of value 0 // If none of those are found, return nullptr session_t *get_session(const net::peer_t peer, uint32_t connect_data); // Circular dependency: // iterate refers to session // session refers to broadcast_ctx_t // broadcast_ctx_t refers to control_server_t // Therefore, iterate is implemented further down the source file void iterate(std::chrono::milliseconds timeout); /** * @brief Call the handler for a given control stream message. * @param type The message type. * @param session The session the message was received on. * @param payload The payload of the message. * @param reinjected `true` if this message is being reprocessed after decryption. */ void call(std::uint16_t type, session_t *session, const std::string_view &payload, bool reinjected); void map(uint16_t type, std::function cb) { _map_type_cb.emplace(type, std::move(cb)); } int send(const std::string_view &payload, net::peer_t peer) { auto packet = enet_packet_create(payload.data(), payload.size(), ENET_PACKET_FLAG_RELIABLE); if (enet_peer_send(peer, 0, packet)) { enet_packet_destroy(packet); return -1; } return 0; } void flush() { enet_host_flush(_host.get()); } // Callbacks std::unordered_map> _map_type_cb; // All active sessions (including those still waiting for a peer to connect) sync_util::sync_t> _sessions; // ENet peer to session mapping for sessions with a peer connected sync_util::sync_t> _peer_to_session; ENetAddress _addr; net::host_t _host; }; struct broadcast_ctx_t { message_queue_queue_t message_queue_queue; std::thread recv_thread; std::thread video_thread; std::thread audio_thread; std::thread control_thread; asio::io_context io_context; udp::socket video_sock {io_context}; udp::socket audio_sock {io_context}; control_server_t control_server; }; struct session_t { config_t config; safe::mail_t mail; std::shared_ptr input; std::thread audioThread; std::thread videoThread; std::chrono::steady_clock::time_point pingTimeout; safe::shared_t::ptr_t broadcast_ref; boost::asio::ip::address localAddress; struct { std::string ping_payload; int lowseq; udp::endpoint peer; std::optional cipher; std::uint64_t gcm_iv_counter; safe::mail_raw_t::event_t idr_events; safe::mail_raw_t::event_t> invalidate_ref_frames_events; std::unique_ptr qos; } video; struct { crypto::cipher::cbc_t cipher; std::string ping_payload; std::uint16_t sequenceNumber; // avRiKeyId == util::endian::big(First (sizeof(avRiKeyId)) bytes of launch_session->iv) std::uint32_t avRiKeyId; std::uint32_t timestamp; udp::endpoint peer; util::buffer_t shards; util::buffer_t shards_p; audio_fec_packet_t fec_packet; std::unique_ptr qos; } audio; struct { crypto::cipher::gcm_t cipher; crypto::aes_t legacy_input_enc_iv; // Only used when the client doesn't support full control stream encryption crypto::aes_t incoming_iv; crypto::aes_t outgoing_iv; std::uint32_t connect_data; // Used for new clients with ML_FF_SESSION_ID_V1 std::string expected_peer_address; // Only used for legacy clients without ML_FF_SESSION_ID_V1 net::peer_t peer; std::uint32_t seq; platf::feedback_queue_t feedback_queue; safe::mail_raw_t::event_t hdr_queue; } control; std::uint32_t launch_session_id; safe::mail_raw_t::event_t shutdown_event; safe::signal_t controlEnd; std::atomic state; }; /** * First part of cipher must be struct of type control_encrypted_t * * returns empty string_view on failure * returns string_view pointing to payload data */ template static inline std::string_view encode_control(session_t *session, const std::string_view &plaintext, std::array &tagged_cipher) { static_assert( max_payload_size >= sizeof(control_encrypted_t) + sizeof(crypto::cipher::tag_size), "max_payload_size >= sizeof(control_encrypted_t) + sizeof(crypto::cipher::tag_size)" ); if (session->config.controlProtocolType != 13) { return plaintext; } auto seq = session->control.seq++; auto &iv = session->control.outgoing_iv; if (session->config.encryptionFlagsEnabled & SS_ENC_CONTROL_V2) { // We use the deterministic IV construction algorithm specified in NIST SP 800-38D // Section 8.2.1. The sequence number is our "invocation" field and the 'CH' in the // high bytes is the "fixed" field. Because each client provides their own unique // key, our values in the fixed field need only uniquely identify each independent // use of the client's key with AES-GCM in our code. // // The sequence number is 32 bits long which allows for 2^32 control stream messages // to be sent to each client before the IV repeats. iv.resize(12); std::copy_n((uint8_t *) &seq, sizeof(seq), std::begin(iv)); iv[10] = 'H'; // Host originated iv[11] = 'C'; // Control stream } else { // Nvidia's old style encryption uses a 16-byte IV iv.resize(16); iv[0] = (std::uint8_t) seq; } auto packet = (control_encrypted_p) tagged_cipher.data(); auto bytes = session->control.cipher.encrypt(plaintext, packet->payload(), &iv); if (bytes <= 0) { BOOST_LOG(error) << "Couldn't encrypt control data"sv; return {}; } std::uint16_t packet_length = bytes + crypto::cipher::tag_size + sizeof(control_encrypted_t::seq); packet->encryptedHeaderType = util::endian::little(0x0001); packet->length = util::endian::little(packet_length); packet->seq = util::endian::little(seq); return std::string_view {(char *) tagged_cipher.data(), packet_length + sizeof(control_encrypted_t) - sizeof(control_encrypted_t::seq)}; } int start_broadcast(broadcast_ctx_t &ctx); void end_broadcast(broadcast_ctx_t &ctx); static auto broadcast = safe::make_shared(start_broadcast, end_broadcast); session_t *control_server_t::get_session(const net::peer_t peer, uint32_t connect_data) { { // Fast path - look up existing session by peer auto lg = _peer_to_session.lock(); auto it = _peer_to_session->find(peer); if (it != _peer_to_session->end()) { return it->second; } } // Slow path - process new session TUPLE_2D(peer_port, peer_addr, platf::from_sockaddr_ex((sockaddr *) &peer->address.address)); auto lg = _sessions.lock(); for (auto pos = std::begin(*_sessions); pos != std::end(*_sessions); ++pos) { auto session_p = *pos; // Skip sessions that are already established if (session_p->control.peer) { continue; } // Identify the connection by the unique connect data if the client supports it. // Only fall back to IP address matching for clients without session ID support. if (session_p->config.mlFeatureFlags & ML_FF_SESSION_ID_V1) { if (session_p->control.connect_data != connect_data) { continue; } else { BOOST_LOG(debug) << "Initialized new control stream session by connect data match [v2]"sv; } } else { if (session_p->control.expected_peer_address != peer_addr) { continue; } else { BOOST_LOG(debug) << "Initialized new control stream session by IP address match [v1]"sv; } } // Once the control stream connection is established, RTSP session state can be torn down rtsp_stream::launch_session_clear(session_p->launch_session_id); session_p->control.peer = peer; // Use the local address from the control connection as the source address // for other communications to the client. This is necessary to ensure // proper routing on multi-homed hosts. auto local_address = platf::from_sockaddr((sockaddr *) &peer->localAddress.address); session_p->localAddress = boost::asio::ip::make_address(local_address); BOOST_LOG(debug) << "Control local address ["sv << local_address << ']'; BOOST_LOG(debug) << "Control peer address ["sv << peer_addr << ':' << peer_port << ']'; // Insert this into the map for O(1) lookups in the future auto ptslg = _peer_to_session.lock(); _peer_to_session->emplace(peer, session_p); return session_p; } return nullptr; } /** * @brief Call the handler for a given control stream message. * @param type The message type. * @param session The session the message was received on. * @param payload The payload of the message. * @param reinjected `true` if this message is being reprocessed after decryption. */ void control_server_t::call(std::uint16_t type, session_t *session, const std::string_view &payload, bool reinjected) { // If we are using the encrypted control stream protocol, drop any messages that come off the wire unencrypted if (session->config.controlProtocolType == 13 && !reinjected && type != packetTypes[IDX_ENCRYPTED]) { BOOST_LOG(error) << "Dropping unencrypted message on encrypted control stream: "sv << util::hex(type).to_string_view(); return; } auto cb = _map_type_cb.find(type); if (cb == std::end(_map_type_cb)) { BOOST_LOG(debug) << "type [Unknown] { "sv << util::hex(type).to_string_view() << " }"sv << std::endl << "---data---"sv << std::endl << util::hex_vec(payload) << std::endl << "---end data---"sv; } else { cb->second(session, payload); } } void control_server_t::iterate(std::chrono::milliseconds timeout) { ENetEvent event; auto res = enet_host_service(_host.get(), &event, timeout.count()); if (res > 0) { auto session = get_session(event.peer, event.data); if (!session) { BOOST_LOG(warning) << "Rejected connection from ["sv << platf::from_sockaddr((sockaddr *) &event.peer->address.address) << "]: it's not properly set up"sv; enet_peer_disconnect_now(event.peer, 0); return; } session->pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; switch (event.type) { case ENET_EVENT_TYPE_RECEIVE: { net::packet_t packet {event.packet}; auto type = *(std::uint16_t *) packet->data; std::string_view payload {(char *) packet->data + sizeof(type), packet->dataLength - sizeof(type)}; call(type, session, payload, false); } break; case ENET_EVENT_TYPE_CONNECT: BOOST_LOG(info) << "CLIENT CONNECTED"sv; break; case ENET_EVENT_TYPE_DISCONNECT: BOOST_LOG(info) << "CLIENT DISCONNECTED"sv; // No more clients to send video data to ^_^ if (session->state == session::state_e::RUNNING) { session::stop(*session); } break; case ENET_EVENT_TYPE_NONE: break; } } } namespace fec { using rs_t = util::safe_ptr; struct fec_t { size_t data_shards; size_t nr_shards; size_t percentage; size_t blocksize; size_t prefixsize; util::buffer_t shards; util::buffer_t headers; util::buffer_t shards_p; std::vector payload_buffers; char *data(size_t el) { return (char *) shards_p[el]; } char *prefix(size_t el) { return prefixsize ? &headers[el * prefixsize] : nullptr; } size_t size() const { return nr_shards; } }; static fec_t encode(const std::string_view &payload, size_t blocksize, size_t fecpercentage, size_t minparityshards, size_t prefixsize) { auto payload_size = payload.size(); auto pad = payload_size % blocksize != 0; auto aligned_data_shards = payload_size / blocksize; auto data_shards = aligned_data_shards + (pad ? 1 : 0); auto parity_shards = (data_shards * fecpercentage + 99) / 100; // increase the FEC percentage for this frame if the parity shard minimum is not met if (parity_shards < minparityshards && fecpercentage != 0) { parity_shards = minparityshards; fecpercentage = (100 * parity_shards) / data_shards; BOOST_LOG(verbose) << "Increasing FEC percentage to "sv << fecpercentage << " to meet parity shard minimum"sv << std::endl; } auto nr_shards = data_shards + parity_shards; // If we need to store a zero-padded data shard, allocate that first to // to keep the shards in order and reduce buffer fragmentation auto parity_shard_offset = pad ? 1 : 0; util::buffer_t shards {(parity_shard_offset + parity_shards) * blocksize}; util::buffer_t shards_p {nr_shards}; std::vector payload_buffers; payload_buffers.reserve(2); // Point into the payload buffer for all except the final padded data shard auto next = std::begin(payload); for (auto x = 0; x < aligned_data_shards; ++x) { shards_p[x] = (uint8_t *) next; next += blocksize; } payload_buffers.emplace_back(std::begin(payload), aligned_data_shards * blocksize); // If the last data shard needs to be zero-padded, we must use the shards buffer if (pad) { shards_p[aligned_data_shards] = (uint8_t *) &shards[0]; // GCC doesn't figure out that std::copy_n() can be replaced with memcpy() here // and ends up compiling a horribly slow element-by-element copy loop, so we // help it by using memcpy()/memset() directly. auto copy_len = std::min(blocksize, std::end(payload) - next); std::memcpy(shards_p[aligned_data_shards], next, copy_len); if (copy_len < blocksize) { // Zero any additional space after the end of the payload std::memset(shards_p[aligned_data_shards] + copy_len, 0, blocksize - copy_len); } } // Add a payload buffer describing the shard buffer payload_buffers.emplace_back(std::begin(shards), shards.size()); if (fecpercentage != 0) { // Point into our allocated buffer for the parity shards for (auto x = 0; x < parity_shards; ++x) { shards_p[data_shards + x] = (uint8_t *) &shards[(parity_shard_offset + x) * blocksize]; } // packets = parity_shards + data_shards rs_t rs {reed_solomon_new(data_shards, parity_shards)}; reed_solomon_encode(rs.get(), shards_p.begin(), nr_shards, blocksize); } return { data_shards, nr_shards, fecpercentage, blocksize, prefixsize, std::move(shards), util::buffer_t {nr_shards * prefixsize}, std::move(shards_p), std::move(payload_buffers), }; } } // namespace fec /** * @brief Combines two buffers and inserts new buffers at each slice boundary of the result. * @param insert_size The number of bytes to insert. * @param slice_size The number of bytes between insertions. * @param data1 The first data buffer. * @param data2 The second data buffer. */ std::vector concat_and_insert(uint64_t insert_size, uint64_t slice_size, const std::string_view &data1, const std::string_view &data2) { auto data_size = data1.size() + data2.size(); auto pad = data_size % slice_size != 0; auto elements = data_size / slice_size + (pad ? 1 : 0); std::vector result; result.resize(elements * insert_size + data_size); auto next = std::begin(data1); auto end = std::end(data1); for (auto x = 0; x < elements; ++x) { void *p = &result[x * (insert_size + slice_size)]; // For the last iteration, only copy to the end of the data if (x == elements - 1) { slice_size = data_size - (x * slice_size); } // Test if this slice will extend into the next buffer if (next + slice_size > end) { // Copy the first portion from the first buffer auto copy_len = end - next; std::copy(next, end, (char *) p + insert_size); // Copy the remaining portion from the second buffer next = std::begin(data2); end = std::end(data2); std::copy(next, next + (slice_size - copy_len), (char *) p + copy_len + insert_size); next += slice_size - copy_len; } else { std::copy(next, next + slice_size, (char *) p + insert_size); next += slice_size; } } return result; } std::vector replace(const std::string_view &original, const std::string_view &old, const std::string_view &_new) { std::vector replaced; replaced.reserve(original.size() + _new.size() - old.size()); auto begin = std::begin(original); auto end = std::end(original); auto next = std::search(begin, end, std::begin(old), std::end(old)); std::copy(begin, next, std::back_inserter(replaced)); if (next != end) { std::copy(std::begin(_new), std::end(_new), std::back_inserter(replaced)); std::copy(next + old.size(), end, std::back_inserter(replaced)); } return replaced; } /** * @brief Pass gamepad feedback data back to the client. * @param session The session object. * @param msg The message to pass. * @return 0 on success. */ int send_feedback_msg(session_t *session, platf::gamepad_feedback_msg_t &msg) { if (!session->control.peer) { BOOST_LOG(warning) << "Couldn't send gamepad feedback data, still waiting for PING from Moonlight"sv; // Still waiting for PING from Moonlight return -1; } std::string payload; if (msg.type == platf::gamepad_feedback_e::rumble) { control_rumble_t plaintext; plaintext.header.type = packetTypes[IDX_RUMBLE_DATA]; plaintext.header.payloadLength = sizeof(plaintext) - sizeof(control_header_v2); auto &data = msg.data.rumble; plaintext.useless = 0xC0FFEE; plaintext.id = util::endian::little(msg.id); plaintext.lowfreq = util::endian::little(data.lowfreq); plaintext.highfreq = util::endian::little(data.highfreq); BOOST_LOG(verbose) << "Rumble: "sv << msg.id << " :: "sv << util::hex(data.lowfreq).to_string_view() << " :: "sv << util::hex(data.highfreq).to_string_view(); std::array encrypted_payload; payload = encode_control(session, util::view(plaintext), encrypted_payload); } else if (msg.type == platf::gamepad_feedback_e::rumble_triggers) { control_rumble_triggers_t plaintext; plaintext.header.type = packetTypes[IDX_RUMBLE_TRIGGER_DATA]; plaintext.header.payloadLength = sizeof(plaintext) - sizeof(control_header_v2); auto &data = msg.data.rumble_triggers; plaintext.id = util::endian::little(msg.id); plaintext.left = util::endian::little(data.left_trigger); plaintext.right = util::endian::little(data.right_trigger); BOOST_LOG(verbose) << "Rumble triggers: "sv << msg.id << " :: "sv << util::hex(data.left_trigger).to_string_view() << " :: "sv << util::hex(data.right_trigger).to_string_view(); std::array encrypted_payload; payload = encode_control(session, util::view(plaintext), encrypted_payload); } else if (msg.type == platf::gamepad_feedback_e::set_motion_event_state) { control_set_motion_event_t plaintext; plaintext.header.type = packetTypes[IDX_SET_MOTION_EVENT]; plaintext.header.payloadLength = sizeof(plaintext) - sizeof(control_header_v2); auto &data = msg.data.motion_event_state; plaintext.id = util::endian::little(msg.id); plaintext.reportrate = util::endian::little(data.report_rate); plaintext.type = data.motion_type; BOOST_LOG(verbose) << "Motion event state: "sv << msg.id << " :: "sv << util::hex(data.report_rate).to_string_view() << " :: "sv << util::hex(data.motion_type).to_string_view(); std::array encrypted_payload; payload = encode_control(session, util::view(plaintext), encrypted_payload); } else if (msg.type == platf::gamepad_feedback_e::set_rgb_led) { control_set_rgb_led_t plaintext; plaintext.header.type = packetTypes[IDX_SET_RGB_LED]; plaintext.header.payloadLength = sizeof(plaintext) - sizeof(control_header_v2); auto &data = msg.data.rgb_led; plaintext.id = util::endian::little(msg.id); plaintext.r = data.r; plaintext.g = data.g; plaintext.b = data.b; BOOST_LOG(verbose) << "RGB: "sv << msg.id << " :: "sv << util::hex(data.r).to_string_view() << util::hex(data.g).to_string_view() << util::hex(data.b).to_string_view(); std::array encrypted_payload; payload = encode_control(session, util::view(plaintext), encrypted_payload); } else { BOOST_LOG(error) << "Unknown gamepad feedback message type"sv; return -1; } if (session->broadcast_ref->control_server.send(payload, session->control.peer)) { TUPLE_2D(port, addr, platf::from_sockaddr_ex((sockaddr *) &session->control.peer->address.address)); BOOST_LOG(warning) << "Couldn't send gamepad feedback to ["sv << addr << ':' << port << ']'; return -1; } return 0; } int send_hdr_mode(session_t *session, video::hdr_info_t hdr_info) { if (!session->control.peer) { BOOST_LOG(warning) << "Couldn't send HDR mode, still waiting for PING from Moonlight"sv; // Still waiting for PING from Moonlight return -1; } control_hdr_mode_t plaintext {}; plaintext.header.type = packetTypes[IDX_HDR_MODE]; plaintext.header.payloadLength = sizeof(control_hdr_mode_t) - sizeof(control_header_v2); plaintext.enabled = hdr_info->enabled; plaintext.metadata = hdr_info->metadata; std::array encrypted_payload; auto payload = encode_control(session, util::view(plaintext), encrypted_payload); if (session->broadcast_ref->control_server.send(payload, session->control.peer)) { TUPLE_2D(port, addr, platf::from_sockaddr_ex((sockaddr *) &session->control.peer->address.address)); BOOST_LOG(warning) << "Couldn't send HDR mode to ["sv << addr << ':' << port << ']'; return -1; } BOOST_LOG(debug) << "Sent HDR mode: " << hdr_info->enabled; return 0; } void controlBroadcastThread(control_server_t *server) { server->map(packetTypes[IDX_PERIODIC_PING], [](session_t *session, const std::string_view &payload) { BOOST_LOG(verbose) << "type [IDX_PERIODIC_PING]"sv; }); server->map(packetTypes[IDX_START_A], [&](session_t *session, const std::string_view &payload) { BOOST_LOG(debug) << "type [IDX_START_A]"sv; }); server->map(packetTypes[IDX_START_B], [&](session_t *session, const std::string_view &payload) { BOOST_LOG(debug) << "type [IDX_START_B]"sv; }); server->map(packetTypes[IDX_LOSS_STATS], [&](session_t *session, const std::string_view &payload) { int32_t *stats = (int32_t *) payload.data(); auto count = stats[0]; std::chrono::milliseconds t {stats[1]}; auto lastGoodFrame = stats[3]; BOOST_LOG(verbose) << "type [IDX_LOSS_STATS]"sv << std::endl << "---begin stats---" << std::endl << "loss count since last report [" << count << ']' << std::endl << "time in milli since last report [" << t.count() << ']' << std::endl << "last good frame [" << lastGoodFrame << ']' << std::endl << "---end stats---"; }); server->map(packetTypes[IDX_REQUEST_IDR_FRAME], [&](session_t *session, const std::string_view &payload) { BOOST_LOG(debug) << "type [IDX_REQUEST_IDR_FRAME]"sv; session->video.idr_events->raise(true); }); server->map(packetTypes[IDX_INVALIDATE_REF_FRAMES], [&](session_t *session, const std::string_view &payload) { auto frames = (std::int64_t *) payload.data(); auto firstFrame = frames[0]; auto lastFrame = frames[1]; BOOST_LOG(debug) << "type [IDX_INVALIDATE_REF_FRAMES]"sv << std::endl << "firstFrame [" << firstFrame << ']' << std::endl << "lastFrame [" << lastFrame << ']'; session->video.invalidate_ref_frames_events->raise(std::make_pair(firstFrame, lastFrame)); }); server->map(packetTypes[IDX_INPUT_DATA], [&](session_t *session, const std::string_view &payload) { BOOST_LOG(debug) << "type [IDX_INPUT_DATA]"sv; auto tagged_cipher_length = util::endian::big(*(int32_t *) payload.data()); std::string_view tagged_cipher {payload.data() + sizeof(tagged_cipher_length), (size_t) tagged_cipher_length}; std::vector plaintext; auto &cipher = session->control.cipher; auto &iv = session->control.legacy_input_enc_iv; if (cipher.decrypt(tagged_cipher, plaintext, &iv)) { // something went wrong :( BOOST_LOG(error) << "Failed to verify tag"sv; session::stop(*session); return; } if (tagged_cipher_length >= 16 + iv.size()) { std::copy(payload.end() - 16, payload.end(), std::begin(iv)); } input::passthrough(session->input, std::move(plaintext)); }); server->map(packetTypes[IDX_ENCRYPTED], [server](session_t *session, const std::string_view &payload) { BOOST_LOG(verbose) << "type [IDX_ENCRYPTED]"sv; auto header = (control_encrypted_p) (payload.data() - 2); auto length = util::endian::little(header->length); auto seq = util::endian::little(header->seq); if (length < (16 + 4 + 4)) { BOOST_LOG(warning) << "Control: Runt packet"sv; return; } auto tagged_cipher_length = length - 4; std::string_view tagged_cipher {(char *) header->payload(), (size_t) tagged_cipher_length}; auto &cipher = session->control.cipher; auto &iv = session->control.incoming_iv; if (session->config.encryptionFlagsEnabled & SS_ENC_CONTROL_V2) { // We use the deterministic IV construction algorithm specified in NIST SP 800-38D // Section 8.2.1. The sequence number is our "invocation" field and the 'CC' in the // high bytes is the "fixed" field. Because each client provides their own unique // key, our values in the fixed field need only uniquely identify each independent // use of the client's key with AES-GCM in our code. // // The sequence number is 32 bits long which allows for 2^32 control stream messages // to be received from each client before the IV repeats. iv.resize(12); std::copy_n((uint8_t *) &seq, sizeof(seq), std::begin(iv)); iv[10] = 'C'; // Client originated iv[11] = 'C'; // Control stream } else { // Nvidia's old style encryption uses a 16-byte IV iv.resize(16); iv[0] = (std::uint8_t) seq; } std::vector plaintext; if (cipher.decrypt(tagged_cipher, plaintext, &iv)) { // something went wrong :( BOOST_LOG(error) << "Failed to verify tag"sv; session::stop(*session); return; } auto type = *(std::uint16_t *) plaintext.data(); std::string_view next_payload {(char *) plaintext.data() + 4, plaintext.size() - 4}; if (type == packetTypes[IDX_ENCRYPTED]) { BOOST_LOG(error) << "Bad packet type [IDX_ENCRYPTED] found"sv; session::stop(*session); return; } // IDX_INPUT_DATA callback will attempt to decrypt unencrypted data, therefore we need pass it directly if (type == packetTypes[IDX_INPUT_DATA]) { plaintext.erase(std::begin(plaintext), std::begin(plaintext) + 4); input::passthrough(session->input, std::move(plaintext)); } else { server->call(type, session, next_payload, true); } }); // This thread handles latency-sensitive control messages platf::adjust_thread_priority(platf::thread_priority_e::critical); // Check for both the full shutdown event and the shutdown event for this // broadcast to ensure we can inform connected clients of our graceful // termination when we shut down. auto shutdown_event = mail::man->event(mail::shutdown); auto broadcast_shutdown_event = mail::man->event(mail::broadcast_shutdown); while (!shutdown_event->peek() && !broadcast_shutdown_event->peek()) { bool has_session_awaiting_peer = false; { auto lg = server->_sessions.lock(); auto now = std::chrono::steady_clock::now(); KITTY_WHILE_LOOP(auto pos = std::begin(*server->_sessions), pos != std::end(*server->_sessions), { // Don't perform additional session processing if we're shutting down if (shutdown_event->peek() || broadcast_shutdown_event->peek()) { break; } auto session = *pos; if (now > session->pingTimeout) { auto address = session->control.peer ? platf::from_sockaddr((sockaddr *) &session->control.peer->address.address) : session->control.expected_peer_address; BOOST_LOG(info) << address << ": Ping Timeout"sv; session::stop(*session); } if (session->state.load(std::memory_order_acquire) == session::state_e::STOPPING) { pos = server->_sessions->erase(pos); if (session->control.peer) { { auto ptslg = server->_peer_to_session.lock(); server->_peer_to_session->erase(session->control.peer); } enet_peer_disconnect_now(session->control.peer, 0); } session->controlEnd.raise(true); continue; } // Remember if we have a session that's waiting for a peer to connect to the // control stream. This ensures the clients are properly notified even when // the app terminates before they finish connecting. if (!session->control.peer) { has_session_awaiting_peer = true; } else { auto &feedback_queue = session->control.feedback_queue; while (feedback_queue->peek()) { auto feedback_msg = feedback_queue->pop(); send_feedback_msg(session, *feedback_msg); } auto &hdr_queue = session->control.hdr_queue; while (session->control.peer && hdr_queue->peek()) { auto hdr_info = hdr_queue->pop(); send_hdr_mode(session, std::move(hdr_info)); } } ++pos; }) } // Don't break until any pending sessions either expire or connect if (proc::proc.running() == 0 && !has_session_awaiting_peer) { BOOST_LOG(info) << "Process terminated"sv; break; } server->iterate(150ms); } // Let all remaining connections know the server is shutting down // reason: graceful termination std::uint32_t reason = 0x80030023; control_terminate_t plaintext; plaintext.header.type = packetTypes[IDX_TERMINATION]; plaintext.header.payloadLength = sizeof(plaintext.ec); plaintext.ec = util::endian::big(reason); std::array encrypted_payload; auto lg = server->_sessions.lock(); for (auto pos = std::begin(*server->_sessions); pos != std::end(*server->_sessions); ++pos) { auto session = *pos; // We may not have gotten far enough to have an ENet connection yet if (session->control.peer) { auto payload = encode_control(session, util::view(plaintext), encrypted_payload); if (server->send(payload, session->control.peer)) { TUPLE_2D(port, addr, platf::from_sockaddr_ex((sockaddr *) &session->control.peer->address.address)); BOOST_LOG(warning) << "Couldn't send termination code to ["sv << addr << ':' << port << ']'; } } session->shutdown_event->raise(true); session->controlEnd.raise(true); } server->flush(); } void recvThread(broadcast_ctx_t &ctx) { std::map peer_to_video_session; std::map peer_to_audio_session; auto &video_sock = ctx.video_sock; auto &audio_sock = ctx.audio_sock; auto &message_queue_queue = ctx.message_queue_queue; auto broadcast_shutdown_event = mail::man->event(mail::broadcast_shutdown); auto &io = ctx.io_context; udp::endpoint peer; std::array buf[2]; std::function recv_func[2]; auto populate_peer_to_session = [&]() { while (message_queue_queue->peek()) { auto message_queue_opt = message_queue_queue->pop(); TUPLE_3D_REF(socket_type, session_id, message_queue, *message_queue_opt); switch (socket_type) { case socket_e::video: if (message_queue) { peer_to_video_session.emplace(session_id, message_queue); } else { peer_to_video_session.erase(session_id); } break; case socket_e::audio: if (message_queue) { peer_to_audio_session.emplace(session_id, message_queue); } else { peer_to_audio_session.erase(session_id); } break; } } }; auto recv_func_init = [&](udp::socket &sock, int buf_elem, std::map &peer_to_session) { recv_func[buf_elem] = [&, buf_elem](const boost::system::error_code &ec, size_t bytes) { auto fg = util::fail_guard([&]() { sock.async_receive_from(asio::buffer(buf[buf_elem]), peer, 0, recv_func[buf_elem]); }); auto type_str = buf_elem ? "AUDIO"sv : "VIDEO"sv; BOOST_LOG(verbose) << "Recv: "sv << peer.address().to_string() << ':' << peer.port() << " :: " << type_str; populate_peer_to_session(); // No data, yet no error if (ec == boost::system::errc::connection_refused || ec == boost::system::errc::connection_reset) { return; } if (ec || !bytes) { BOOST_LOG(error) << "Couldn't receive data from udp socket: "sv << ec.message(); return; } if (bytes == 4) { // For legacy PING packets, find the matching session by address. auto it = peer_to_session.find(peer.address()); if (it != std::end(peer_to_session)) { BOOST_LOG(debug) << "RAISE: "sv << peer.address().to_string() << ':' << peer.port() << " :: " << type_str; it->second->raise(peer, std::string {buf[buf_elem].data(), bytes}); } } else if (bytes >= sizeof(SS_PING)) { auto ping = (PSS_PING) buf[buf_elem].data(); // For new PING packets that include a client identifier, search by payload. auto it = peer_to_session.find(std::string {ping->payload, sizeof(ping->payload)}); if (it != std::end(peer_to_session)) { BOOST_LOG(debug) << "RAISE: "sv << peer.address().to_string() << ':' << peer.port() << " :: " << type_str; it->second->raise(peer, std::string {buf[buf_elem].data(), bytes}); } } }; }; recv_func_init(video_sock, 0, peer_to_video_session); recv_func_init(audio_sock, 1, peer_to_audio_session); video_sock.async_receive_from(asio::buffer(buf[0]), peer, 0, recv_func[0]); audio_sock.async_receive_from(asio::buffer(buf[1]), peer, 0, recv_func[1]); while (!broadcast_shutdown_event->peek()) { io.run(); } } void videoBroadcastThread(udp::socket &sock) { auto shutdown_event = mail::man->event(mail::broadcast_shutdown); auto packets = mail::man->queue(mail::video_packets); auto timebase = boost::posix_time::microsec_clock::universal_time(); // Video traffic is sent on this thread platf::adjust_thread_priority(platf::thread_priority_e::high); logging::min_max_avg_periodic_logger frame_processing_latency_logger(debug, "Frame processing latency", "ms"); logging::time_delta_periodic_logger frame_send_batch_latency_logger(debug, "Network: each send_batch() latency"); logging::time_delta_periodic_logger frame_fec_latency_logger(debug, "Network: each FEC block latency"); logging::time_delta_periodic_logger frame_network_latency_logger(debug, "Network: frame's overall network latency"); crypto::aes_t iv(12); auto timer = platf::create_high_precision_timer(); if (!timer || !*timer) { BOOST_LOG(error) << "Failed to create timer, aborting video broadcast thread"; return; } auto ratecontrol_next_frame_start = std::chrono::steady_clock::now(); while (auto packet = packets->pop()) { if (shutdown_event->peek()) { break; } frame_network_latency_logger.first_point_now(); auto session = (session_t *) packet->channel_data; auto lowseq = session->video.lowseq; std::string_view payload {(char *) packet->data(), packet->data_size()}; std::vector payload_with_replacements; // Apply replacements on the packet payload before performing any other operations. // We need to know the final frame size to calculate the last packet size, and we // must avoid matching replacements against the frame header or any other non-video // part of the payload. if (packet->is_idr() && packet->replacements) { for (auto &replacement : *packet->replacements) { auto frame_old = replacement.old; auto frame_new = replacement._new; payload_with_replacements = replace(payload, frame_old, frame_new); payload = {(char *) payload_with_replacements.data(), payload_with_replacements.size()}; } } video_short_frame_header_t frame_header = {}; frame_header.headerType = 0x01; // Short header type frame_header.frameType = packet->is_idr() ? 2 : packet->after_ref_frame_invalidation ? 5 : 1; frame_header.lastPayloadLen = (payload.size() + sizeof(frame_header)) % (session->config.packetsize - sizeof(NV_VIDEO_PACKET)); if (frame_header.lastPayloadLen == 0) { frame_header.lastPayloadLen = session->config.packetsize - sizeof(NV_VIDEO_PACKET); } if (packet->frame_timestamp) { auto duration_to_latency = [](const std::chrono::steady_clock::duration &duration) { const auto duration_us = std::chrono::duration_cast(duration).count(); return (uint16_t) std::clamp((duration_us + 50) / 100, 0, std::numeric_limits::max()); }; uint16_t latency = duration_to_latency(std::chrono::steady_clock::now() - *packet->frame_timestamp); frame_header.frame_processing_latency = latency; frame_processing_latency_logger.collect_and_log(latency / 10.); } else { frame_header.frame_processing_latency = 0; } auto fecPercentage = config::stream.fec_percentage; // Insert space for packet headers auto blocksize = session->config.packetsize + MAX_RTP_HEADER_SIZE; auto payload_blocksize = blocksize - sizeof(video_packet_raw_t); auto payload_new = concat_and_insert(sizeof(video_packet_raw_t), payload_blocksize, std::string_view {(char *) &frame_header, sizeof(frame_header)}, payload); payload = std::string_view {(char *) payload_new.data(), payload_new.size()}; // There are 2 bits for FEC block count for a maximum of 4 FEC blocks constexpr auto MAX_FEC_BLOCKS = 4; // The max number of data shards per block is found by solving this system of equations for D: // D = 255 - P // P = D * F // which results in the solution: // D = 255 / (1 + F) // multiplied by 100 since F is the percentage as an integer: // D = (255 * 100) / (100 + F) auto max_data_shards_per_fec_block = (DATA_SHARDS_MAX * 100) / (100 + fecPercentage); // Compute the number of FEC blocks needed for this frame using the block size and max shards auto max_data_per_fec_block = max_data_shards_per_fec_block * blocksize; auto fec_blocks_needed = (payload.size() + (max_data_per_fec_block - 1)) / max_data_per_fec_block; // If the number of FEC blocks needed exceeds the protocol limit, turn off FEC for this frame. // For normal FEC percentages, this should only happen for enormous frames (over 800 packets at 20%). if (fec_blocks_needed > MAX_FEC_BLOCKS) { BOOST_LOG(warning) << "Skipping FEC for abnormally large encoded frame (needed "sv << fec_blocks_needed << " FEC blocks)"sv; fecPercentage = 0; fec_blocks_needed = MAX_FEC_BLOCKS; } std::array fec_blocks; decltype(fec_blocks)::iterator fec_blocks_begin = std::begin(fec_blocks), fec_blocks_end = std::begin(fec_blocks) + fec_blocks_needed; BOOST_LOG(verbose) << "Generating "sv << fec_blocks_needed << " FEC blocks"sv; // Align individual FEC blocks to blocksize auto unaligned_size = payload.size() / fec_blocks_needed; auto aligned_size = ((unaligned_size + (blocksize - 1)) / blocksize) * blocksize; // If we exceed the 10-bit FEC packet index (which means our frame exceeded 4096 packets), // the frame will be unrecoverable. Log an error for this case. if (aligned_size / blocksize >= 1024) { BOOST_LOG(error) << "Encoder produced a frame too large to send! Is the encoder broken? (needed "sv << (aligned_size / blocksize) << " packets)"sv; } // Split the data into aligned FEC blocks for (int x = 0; x < fec_blocks_needed; ++x) { if (x == fec_blocks_needed - 1) { // The last block must extend to the end of the payload fec_blocks[x] = payload.substr(x * aligned_size); } else { // Earlier blocks just extend to the next block offset fec_blocks[x] = payload.substr(x * aligned_size, aligned_size); } } try { // Use around 80% of 1Gbps 1Gbps percent ms packet byte size_t ratecontrol_packets_in_1ms = std::giga::num * 80 / 100 / 1000 / blocksize / 8; // Send less than 64K in a single batch. // On Windows, batches above 64K seem to bypass SO_SNDBUF regardless of its size, // appear in "Other I/O" and begin waiting for interrupts. // This gives inconsistent performance so we'd rather avoid it. size_t send_batch_size = 64 * 1024 / blocksize; // Also don't exceed 64 packets, which can happen when Moonlight requests // unusually small packet size. // Generic Segmentation Offload on Linux can't do more than 64. send_batch_size = std::min(64, send_batch_size); // Don't ignore the last ratecontrol group of the previous frame auto ratecontrol_frame_start = std::max(ratecontrol_next_frame_start, std::chrono::steady_clock::now()); size_t ratecontrol_frame_packets_sent = 0; size_t ratecontrol_group_packets_sent = 0; auto blockIndex = 0; std::for_each(fec_blocks_begin, fec_blocks_end, [&](std::string_view ¤t_payload) { auto packets = (current_payload.size() + (blocksize - 1)) / blocksize; for (int x = 0; x < packets; ++x) { auto *inspect = (video_packet_raw_t *) ¤t_payload[x * blocksize]; inspect->packet.frameIndex = packet->frame_index(); inspect->packet.streamPacketIndex = ((uint32_t) lowseq + x) << 8; // Match multiFecFlags with Moonlight inspect->packet.multiFecFlags = 0x10; inspect->packet.multiFecBlocks = (blockIndex << 4) | ((fec_blocks_needed - 1) << 6); inspect->packet.flags = FLAG_CONTAINS_PIC_DATA; if (x == 0) { inspect->packet.flags |= FLAG_SOF; } if (x == packets - 1) { inspect->packet.flags |= FLAG_EOF; } } frame_fec_latency_logger.first_point_now(); // If video encryption is enabled, we allocate space for the encryption header before each shard auto shards = fec::encode(current_payload, blocksize, fecPercentage, session->config.minRequiredFecPackets, session->video.cipher ? sizeof(video_packet_enc_prefix_t) : 0); frame_fec_latency_logger.second_point_now_and_log(); auto peer_address = session->video.peer.address(); auto batch_info = platf::batched_send_info_t { shards.headers.begin(), shards.prefixsize, shards.payload_buffers, shards.blocksize, 0, 0, (uintptr_t) sock.native_handle(), peer_address, session->video.peer.port(), session->localAddress, }; size_t next_shard_to_send = 0; // set FEC info now that we know for sure what our percentage will be for this frame for (auto x = 0; x < shards.size(); ++x) { auto *inspect = (video_packet_raw_t *) shards.data(x); // RTP video timestamps use a 90 KHz clock auto now = boost::posix_time::microsec_clock::universal_time(); auto timestamp = (now - timebase).total_microseconds() / (1000 / 90); inspect->packet.fecInfo = (x << 12 | shards.data_shards << 22 | shards.percentage << 4); inspect->rtp.header = 0x80 | FLAG_EXTENSION; inspect->rtp.sequenceNumber = util::endian::big(lowseq + x); inspect->rtp.timestamp = util::endian::big(timestamp); inspect->packet.multiFecBlocks = (blockIndex << 4) | ((fec_blocks_needed - 1) << 6); inspect->packet.frameIndex = packet->frame_index(); // Encrypt this shard if video encryption is enabled if (session->video.cipher) { // We use the deterministic IV construction algorithm specified in NIST SP 800-38D // Section 8.2.1. The sequence number is our "invocation" field and the 'V' in the // high bytes is the "fixed" field. Because each client provides their own unique // key, our values in the fixed field need only uniquely identify each independent // use of the client's key with AES-GCM in our code. // // The IV counter is 64 bits long which allows for 2^64 encrypted video packets // to be sent to each client before the IV repeats. std::copy_n((uint8_t *) &session->video.gcm_iv_counter, sizeof(session->video.gcm_iv_counter), std::begin(iv)); iv[11] = 'V'; // Video stream session->video.gcm_iv_counter++; // Encrypt the target buffer in place auto *prefix = (video_packet_enc_prefix_t *) shards.prefix(x); prefix->frameNumber = packet->frame_index(); std::copy(std::begin(iv), std::end(iv), prefix->iv); session->video.cipher->encrypt(std::string_view {(char *) inspect, (size_t) blocksize}, prefix->tag, (uint8_t *) inspect, &iv); } if (x - next_shard_to_send + 1 >= send_batch_size || x + 1 == shards.size()) { // Do pacing within the frame. // Also trigger pacing before the first send_batch() of the frame // to account for the last send_batch() of the previous frame. if (ratecontrol_group_packets_sent >= ratecontrol_packets_in_1ms || ratecontrol_frame_packets_sent == 0) { auto due = ratecontrol_frame_start + std::chrono::duration_cast(1ms) * ratecontrol_frame_packets_sent / ratecontrol_packets_in_1ms; auto now = std::chrono::steady_clock::now(); if (now < due) { timer->sleep_for(due - now); } ratecontrol_group_packets_sent = 0; } size_t current_batch_size = x - next_shard_to_send + 1; batch_info.block_offset = next_shard_to_send; batch_info.block_count = current_batch_size; frame_send_batch_latency_logger.first_point_now(); // Use a batched send if it's supported on this platform if (!platf::send_batch(batch_info)) { // Batched send is not available, so send each packet individually BOOST_LOG(verbose) << "Falling back to unbatched send"sv; for (auto y = 0; y < current_batch_size; y++) { auto send_info = platf::send_info_t { shards.prefix(next_shard_to_send + y), shards.prefixsize, shards.data(next_shard_to_send + y), shards.blocksize, (uintptr_t) sock.native_handle(), peer_address, session->video.peer.port(), session->localAddress, }; platf::send(send_info); } } frame_send_batch_latency_logger.second_point_now_and_log(); ratecontrol_group_packets_sent += current_batch_size; ratecontrol_frame_packets_sent += current_batch_size; next_shard_to_send = x + 1; } } // remember this in case the next frame comes immediately ratecontrol_next_frame_start = ratecontrol_frame_start + std::chrono::duration_cast(1ms) * ratecontrol_frame_packets_sent / ratecontrol_packets_in_1ms; frame_network_latency_logger.second_point_now_and_log(); if (packet->is_idr()) { BOOST_LOG(verbose) << "Key Frame ["sv << packet->frame_index() << "] :: send ["sv << shards.size() << "] shards..."sv; } else { BOOST_LOG(verbose) << "Frame ["sv << packet->frame_index() << "] :: send ["sv << shards.size() << "] shards..."sv << std::endl; } ++blockIndex; lowseq += shards.size(); }); session->video.lowseq = lowseq; } catch (const std::exception &e) { BOOST_LOG(error) << "Broadcast video failed "sv << e.what(); std::this_thread::sleep_for(100ms); } } shutdown_event->raise(true); } void audioBroadcastThread(udp::socket &sock) { auto shutdown_event = mail::man->event(mail::broadcast_shutdown); auto packets = mail::man->queue(mail::audio_packets); audio_packet_t audio_packet; fec::rs_t rs {reed_solomon_new(RTPA_DATA_SHARDS, RTPA_FEC_SHARDS)}; crypto::aes_t iv(16); // For unknown reasons, the RS parity matrix computed by our RS implementation // doesn't match the one Nvidia uses for audio data. I'm not exactly sure why, // but we can simply replace it with the matrix generated by OpenFEC which // works correctly. This is possible because the data and FEC shard count is // constant and known in advance. const unsigned char parity[] = {0x77, 0x40, 0x38, 0x0e, 0xc7, 0xa7, 0x0d, 0x6c}; memcpy(rs.get()->p, parity, sizeof(parity)); audio_packet.rtp.header = 0x80; audio_packet.rtp.packetType = 97; audio_packet.rtp.ssrc = 0; // Audio traffic is sent on this thread platf::adjust_thread_priority(platf::thread_priority_e::high); while (auto packet = packets->pop()) { if (shutdown_event->peek()) { break; } TUPLE_2D_REF(channel_data, packet_data, *packet); auto session = (session_t *) channel_data; auto sequenceNumber = session->audio.sequenceNumber; auto timestamp = session->audio.timestamp; *(std::uint32_t *) iv.data() = util::endian::big(session->audio.avRiKeyId + sequenceNumber); auto &shards_p = session->audio.shards_p; auto bytes = encode_audio(session->config.encryptionFlagsEnabled & SS_ENC_AUDIO, packet_data, shards_p[sequenceNumber % RTPA_DATA_SHARDS], iv, session->audio.cipher); if (bytes < 0) { BOOST_LOG(error) << "Couldn't encode audio packet"sv; break; } audio_packet.rtp.sequenceNumber = util::endian::big(sequenceNumber); audio_packet.rtp.timestamp = util::endian::big(timestamp); session->audio.sequenceNumber++; session->audio.timestamp += session->config.audio.packetDuration; auto peer_address = session->audio.peer.address(); try { auto send_info = platf::send_info_t { (const char *) &audio_packet, sizeof(audio_packet), (const char *) shards_p[sequenceNumber % RTPA_DATA_SHARDS], (size_t) bytes, (uintptr_t) sock.native_handle(), peer_address, session->audio.peer.port(), session->localAddress, }; platf::send(send_info); BOOST_LOG(verbose) << "Audio ["sv << sequenceNumber << "] :: send..."sv; auto &fec_packet = session->audio.fec_packet; // initialize the FEC header at the beginning of the FEC block if (sequenceNumber % RTPA_DATA_SHARDS == 0) { fec_packet.fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber); fec_packet.fecHeader.baseTimestamp = util::endian::big(timestamp); } // generate parity shards at the end of the FEC block if ((sequenceNumber + 1) % RTPA_DATA_SHARDS == 0) { reed_solomon_encode(rs.get(), shards_p.begin(), RTPA_TOTAL_SHARDS, bytes); for (auto x = 0; x < RTPA_FEC_SHARDS; ++x) { fec_packet.rtp.sequenceNumber = util::endian::big(sequenceNumber + x + 1); fec_packet.fecHeader.fecShardIndex = x; auto send_info = platf::send_info_t { (const char *) &fec_packet, sizeof(fec_packet), (const char *) shards_p[RTPA_DATA_SHARDS + x], (size_t) bytes, (uintptr_t) sock.native_handle(), peer_address, session->audio.peer.port(), session->localAddress, }; platf::send(send_info); BOOST_LOG(verbose) << "Audio FEC ["sv << (sequenceNumber & ~(RTPA_DATA_SHARDS - 1)) << ' ' << x << "] :: send..."sv; } } } catch (const std::exception &e) { BOOST_LOG(error) << "Broadcast audio failed "sv << e.what(); std::this_thread::sleep_for(100ms); } } shutdown_event->raise(true); } int start_broadcast(broadcast_ctx_t &ctx) { auto address_family = net::af_from_enum_string(config::sunshine.address_family); auto protocol = address_family == net::IPV4 ? udp::v4() : udp::v6(); auto control_port = net::map_port(CONTROL_PORT); auto video_port = net::map_port(VIDEO_STREAM_PORT); auto audio_port = net::map_port(AUDIO_STREAM_PORT); if (ctx.control_server.bind(address_family, control_port)) { BOOST_LOG(error) << "Couldn't bind Control server to port ["sv << control_port << "], likely another process already bound to the port"sv; return -1; } boost::system::error_code ec; ctx.video_sock.open(protocol, ec); if (ec) { BOOST_LOG(fatal) << "Couldn't open socket for Video server: "sv << ec.message(); return -1; } // Set video socket send buffer size (SO_SENDBUF) to 1MB try { ctx.video_sock.set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024)); } catch (...) { BOOST_LOG(error) << "Failed to set video socket send buffer size (SO_SENDBUF)"; } ctx.video_sock.bind(udp::endpoint(protocol, video_port), ec); if (ec) { BOOST_LOG(fatal) << "Couldn't bind Video server to port ["sv << video_port << "]: "sv << ec.message(); return -1; } ctx.audio_sock.open(protocol, ec); if (ec) { BOOST_LOG(fatal) << "Couldn't open socket for Audio server: "sv << ec.message(); return -1; } ctx.audio_sock.bind(udp::endpoint(protocol, audio_port), ec); if (ec) { BOOST_LOG(fatal) << "Couldn't bind Audio server to port ["sv << audio_port << "]: "sv << ec.message(); return -1; } ctx.message_queue_queue = std::make_shared(30); ctx.video_thread = std::thread {videoBroadcastThread, std::ref(ctx.video_sock)}; ctx.audio_thread = std::thread {audioBroadcastThread, std::ref(ctx.audio_sock)}; ctx.control_thread = std::thread {controlBroadcastThread, &ctx.control_server}; ctx.recv_thread = std::thread {recvThread, std::ref(ctx)}; return 0; } void end_broadcast(broadcast_ctx_t &ctx) { auto broadcast_shutdown_event = mail::man->event(mail::broadcast_shutdown); broadcast_shutdown_event->raise(true); auto video_packets = mail::man->queue(mail::video_packets); auto audio_packets = mail::man->queue(mail::audio_packets); // Minimize delay stopping video/audio threads video_packets->stop(); audio_packets->stop(); ctx.message_queue_queue->stop(); ctx.io_context.stop(); ctx.video_sock.close(); ctx.audio_sock.close(); video_packets.reset(); audio_packets.reset(); BOOST_LOG(debug) << "Waiting for main listening thread to end..."sv; ctx.recv_thread.join(); BOOST_LOG(debug) << "Waiting for main video thread to end..."sv; ctx.video_thread.join(); BOOST_LOG(debug) << "Waiting for main audio thread to end..."sv; ctx.audio_thread.join(); BOOST_LOG(debug) << "Waiting for main control thread to end..."sv; ctx.control_thread.join(); BOOST_LOG(debug) << "All broadcasting threads ended"sv; broadcast_shutdown_event->reset(); } int recv_ping(session_t *session, decltype(broadcast)::ptr_t ref, socket_e type, std::string_view expected_payload, udp::endpoint &peer, std::chrono::milliseconds timeout) { auto messages = std::make_shared(30); av_session_id_t session_id = std::string {expected_payload}; // Only allow matches on the peer address for legacy clients if (!(session->config.mlFeatureFlags & ML_FF_SESSION_ID_V1)) { ref->message_queue_queue->raise(type, peer.address(), messages); } ref->message_queue_queue->raise(type, session_id, messages); auto fg = util::fail_guard([&]() { messages->stop(); // remove message queue from session if (!(session->config.mlFeatureFlags & ML_FF_SESSION_ID_V1)) { ref->message_queue_queue->raise(type, peer.address(), nullptr); } ref->message_queue_queue->raise(type, session_id, nullptr); }); auto start_time = std::chrono::steady_clock::now(); auto current_time = start_time; while (current_time - start_time < config::stream.ping_timeout) { auto delta_time = current_time - start_time; auto msg_opt = messages->pop(config::stream.ping_timeout - delta_time); if (!msg_opt) { break; } TUPLE_2D_REF(recv_peer, msg, *msg_opt); if (msg.find(expected_payload) != std::string::npos) { // Match the new PING payload format BOOST_LOG(debug) << "Received ping [v2] from "sv << recv_peer.address() << ':' << recv_peer.port() << " ["sv << util::hex_vec(msg) << ']'; } else if (!(session->config.mlFeatureFlags & ML_FF_SESSION_ID_V1) && msg == "PING"sv) { // Match the legacy fixed PING payload only if the new type is not supported BOOST_LOG(debug) << "Received ping [v1] from "sv << recv_peer.address() << ':' << recv_peer.port() << " ["sv << util::hex_vec(msg) << ']'; } else { BOOST_LOG(debug) << "Received non-ping from "sv << recv_peer.address() << ':' << recv_peer.port() << " ["sv << util::hex_vec(msg) << ']'; current_time = std::chrono::steady_clock::now(); continue; } // Update connection details. peer = recv_peer; return 0; } BOOST_LOG(error) << "Initial Ping Timeout"sv; return -1; } void videoThread(session_t *session) { auto fg = util::fail_guard([&]() { session::stop(*session); }); while_starting_do_nothing(session->state); auto ref = broadcast.ref(); auto error = recv_ping(session, ref, socket_e::video, session->video.ping_payload, session->video.peer, config::stream.ping_timeout); if (error < 0) { return; } // Enable local prioritization and QoS tagging on video traffic if requested by the client auto address = session->video.peer.address(); session->video.qos = platf::enable_socket_qos(ref->video_sock.native_handle(), address, session->video.peer.port(), platf::qos_data_type_e::video, session->config.videoQosType != 0); BOOST_LOG(debug) << "Start capturing Video"sv; video::capture(session->mail, session->config.monitor, session); } void audioThread(session_t *session) { auto fg = util::fail_guard([&]() { session::stop(*session); }); while_starting_do_nothing(session->state); auto ref = broadcast.ref(); auto error = recv_ping(session, ref, socket_e::audio, session->audio.ping_payload, session->audio.peer, config::stream.ping_timeout); if (error < 0) { return; } // Enable local prioritization and QoS tagging on audio traffic if requested by the client auto address = session->audio.peer.address(); session->audio.qos = platf::enable_socket_qos(ref->audio_sock.native_handle(), address, session->audio.peer.port(), platf::qos_data_type_e::audio, session->config.audioQosType != 0); BOOST_LOG(debug) << "Start capturing Audio"sv; audio::capture(session->mail, session->config.audio, session); } namespace session { std::atomic_uint running_sessions; state_e state(session_t &session) { return session.state.load(std::memory_order_relaxed); } void stop(session_t &session) { while_starting_do_nothing(session.state); auto expected = state_e::RUNNING; auto already_stopping = !session.state.compare_exchange_strong(expected, state_e::STOPPING); if (already_stopping) { return; } session.shutdown_event->raise(true); } void join(session_t &session) { // Current Nvidia drivers have a bug where NVENC can deadlock the encoder thread with hardware-accelerated // GPU scheduling enabled. If this happens, we will terminate ourselves and the service can restart. // The alternative is that Sunshine can never start another session until it's manually restarted. auto task = []() { BOOST_LOG(fatal) << "Hang detected! Session failed to terminate in 10 seconds."sv; logging::log_flush(); lifetime::debug_trap(); }; auto force_kill = task_pool.pushDelayed(task, 10s).task_id; auto fg = util::fail_guard([&force_kill]() { // Cancel the kill task if we manage to return from this function task_pool.cancel(force_kill); }); BOOST_LOG(debug) << "Waiting for video to end..."sv; session.videoThread.join(); BOOST_LOG(debug) << "Waiting for audio to end..."sv; session.audioThread.join(); BOOST_LOG(debug) << "Waiting for control to end..."sv; session.controlEnd.view(); // Reset input on session stop to avoid stuck repeated keys BOOST_LOG(debug) << "Resetting Input..."sv; input::reset(session.input); // If this is the last session, invoke the platform callbacks if (--running_sessions == 0) { bool revert_display_config {config::video.dd.config_revert_on_disconnect}; if (proc::proc.running()) { #if defined SUNSHINE_TRAY && SUNSHINE_TRAY >= 1 system_tray::update_tray_pausing(proc::proc.get_last_run_app_name()); #endif } else { // We have no app running and also no clients anymore. revert_display_config = true; } if (revert_display_config) { display_device::revert_configuration(); } platf::streaming_will_stop(); } BOOST_LOG(debug) << "Session ended"sv; } int start(session_t &session, const std::string &addr_string) { session.input = input::alloc(session.mail); session.broadcast_ref = broadcast.ref(); if (!session.broadcast_ref) { return -1; } session.control.expected_peer_address = addr_string; BOOST_LOG(debug) << "Expecting incoming session connections from "sv << addr_string; // Insert this session into the session list { auto lg = session.broadcast_ref->control_server._sessions.lock(); session.broadcast_ref->control_server._sessions->push_back(&session); } auto addr = boost::asio::ip::make_address(addr_string); session.video.peer.address(addr); session.video.peer.port(0); session.audio.peer.address(addr); session.audio.peer.port(0); session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; session.audioThread = std::thread {audioThread, &session}; session.videoThread = std::thread {videoThread, &session}; session.state.store(state_e::RUNNING, std::memory_order_relaxed); // If this is the first session, invoke the platform callbacks if (++running_sessions == 1) { platf::streaming_will_start(); #if defined SUNSHINE_TRAY && SUNSHINE_TRAY >= 1 system_tray::update_tray_playing(proc::proc.get_last_run_app_name()); #endif } return 0; } std::shared_ptr alloc(config_t &config, rtsp_stream::launch_session_t &launch_session) { auto session = std::make_shared(); auto mail = std::make_shared(); session->shutdown_event = mail->event(mail::shutdown); session->launch_session_id = launch_session.id; session->config = config; session->control.connect_data = launch_session.control_connect_data; session->control.feedback_queue = mail->queue(mail::gamepad_feedback); session->control.hdr_queue = mail->event(mail::hdr); session->control.legacy_input_enc_iv = launch_session.iv; session->control.cipher = crypto::cipher::gcm_t { launch_session.gcm_key, false }; session->video.idr_events = mail->event(mail::idr); session->video.invalidate_ref_frames_events = mail->event>(mail::invalidate_ref_frames); session->video.lowseq = 0; session->video.ping_payload = launch_session.av_ping_payload; if (config.encryptionFlagsEnabled & SS_ENC_VIDEO) { BOOST_LOG(info) << "Video encryption enabled"sv; session->video.cipher = crypto::cipher::gcm_t { launch_session.gcm_key, false }; session->video.gcm_iv_counter = 0; } constexpr auto max_block_size = crypto::cipher::round_to_pkcs7_padded(2048); util::buffer_t shards {RTPA_TOTAL_SHARDS * max_block_size}; util::buffer_t shards_p {RTPA_TOTAL_SHARDS}; for (auto x = 0; x < RTPA_TOTAL_SHARDS; ++x) { shards_p[x] = (uint8_t *) &shards[x * max_block_size]; } // Audio FEC spans multiple audio packets, // therefore its session specific session->audio.shards = std::move(shards); session->audio.shards_p = std::move(shards_p); session->audio.fec_packet.rtp.header = 0x80; session->audio.fec_packet.rtp.packetType = 127; session->audio.fec_packet.rtp.timestamp = 0; session->audio.fec_packet.rtp.ssrc = 0; session->audio.fec_packet.fecHeader.payloadType = 97; session->audio.fec_packet.fecHeader.ssrc = 0; session->audio.cipher = crypto::cipher::cbc_t { launch_session.gcm_key, true }; session->audio.ping_payload = launch_session.av_ping_payload; session->audio.avRiKeyId = util::endian::big(*(std::uint32_t *) launch_session.iv.data()); session->audio.sequenceNumber = 0; session->audio.timestamp = 0; session->control.peer = nullptr; session->state.store(state_e::STOPPED, std::memory_order_relaxed); session->mail = std::move(mail); return session; } } // namespace session } // namespace stream