From a9b9d1bd09e9541d9d484ae48aeead8dec2ebae6 Mon Sep 17 00:00:00 2001 From: Mathias Tillman Date: Thu, 7 Apr 2022 08:20:47 +0200 Subject: [PATCH] Properly catch exceptions in stream broadcast handlers to prevent unhandled exception crash/termination. --- sunshine/stream.cpp | 142 ++++++++++++++++++++++++-------------------- 1 file changed, 77 insertions(+), 65 deletions(-) diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index dd14e829..10e215b6 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -963,63 +963,69 @@ void videoBroadcastThread(udp::socket &sock) { fec_blocks[0] = payload; } - auto blockIndex = 0; - std::for_each(fec_blocks_begin, fec_blocks_end, [&](std::string_view ¤t_payload) { - auto packets = (current_payload.size() + (blocksize - 1)) / blocksize; + try { + auto blockIndex = 0; + std::for_each(fec_blocks_begin, fec_blocks_end, [&](std::string_view ¤t_payload) { + auto packets = (current_payload.size() + (blocksize - 1)) / blocksize; - for(int x = 0; x < packets; ++x) { - auto *inspect = (video_packet_raw_t *)¤t_payload[x * blocksize]; + for(int x = 0; x < packets; ++x) { + auto *inspect = (video_packet_raw_t *)¤t_payload[x * blocksize]; - inspect->packet.frameIndex = packet->pts; - inspect->packet.streamPacketIndex = ((uint32_t)lowseq + x) << 8; + inspect->packet.frameIndex = packet->pts; + inspect->packet.streamPacketIndex = ((uint32_t)lowseq + x) << 8; - // Match multiFecFlags with Moonlight - inspect->packet.multiFecFlags = 0x10; - inspect->packet.multiFecBlocks = (blockIndex << 4) | lastBlockIndex; + // Match multiFecFlags with Moonlight + inspect->packet.multiFecFlags = 0x10; + inspect->packet.multiFecBlocks = (blockIndex << 4) | lastBlockIndex; - if(x == 0) { - inspect->packet.flags |= FLAG_SOF; + if(x == 0) { + inspect->packet.flags |= FLAG_SOF; + } + + if(x == packets - 1) { + inspect->packet.flags |= FLAG_EOF; + } } - if(x == packets - 1) { - inspect->packet.flags |= FLAG_EOF; + auto shards = fec::encode(current_payload, blocksize, fecPercentage, session->config.minRequiredFecPackets); + + // set FEC info now that we know for sure what our percentage will be for this frame + for(auto x = 0; x < shards.size(); ++x) { + auto *inspect = (video_packet_raw_t *)shards.data(x); + + inspect->packet.fecInfo = + (x << 12 | + shards.data_shards << 22 | + shards.percentage << 4); + + inspect->rtp.header = 0x80 | FLAG_EXTENSION; + inspect->rtp.sequenceNumber = util::endian::big(lowseq + x); + + inspect->packet.multiFecBlocks = (blockIndex << 4) | lastBlockIndex; + inspect->packet.frameIndex = packet->pts; } - } - auto shards = fec::encode(current_payload, blocksize, fecPercentage, session->config.minRequiredFecPackets); + for(auto x = 0; x < shards.size(); ++x) { + sock.send_to(asio::buffer(shards[x]), session->video.peer); + } - // set FEC info now that we know for sure what our percentage will be for this frame - for(auto x = 0; x < shards.size(); ++x) { - auto *inspect = (video_packet_raw_t *)shards.data(x); + if(packet->flags & AV_PKT_FLAG_KEY) { + BOOST_LOG(verbose) << "Key Frame ["sv << packet->pts << "] :: send ["sv << shards.size() << "] shards..."sv; + } + else { + BOOST_LOG(verbose) << "Frame ["sv << packet->pts << "] :: send ["sv << shards.size() << "] shards..."sv << std::endl; + } - inspect->packet.fecInfo = - (x << 12 | - shards.data_shards << 22 | - shards.percentage << 4); + ++blockIndex; + lowseq += shards.size(); + }); - inspect->rtp.header = 0x80 | FLAG_EXTENSION; - inspect->rtp.sequenceNumber = util::endian::big(lowseq + x); - - inspect->packet.multiFecBlocks = (blockIndex << 4) | lastBlockIndex; - inspect->packet.frameIndex = packet->pts; - } - - for(auto x = 0; x < shards.size(); ++x) { - sock.send_to(asio::buffer(shards[x]), session->video.peer); - } - - if(packet->flags & AV_PKT_FLAG_KEY) { - BOOST_LOG(verbose) << "Key Frame ["sv << packet->pts << "] :: send ["sv << shards.size() << "] shards..."sv; - } - else { - BOOST_LOG(verbose) << "Frame ["sv << packet->pts << "] :: send ["sv << shards.size() << "] shards..."sv << std::endl; - } - - ++blockIndex; - lowseq += shards.size(); - }); - - session->video.lowseq = lowseq; + session->video.lowseq = lowseq; + } + catch(const std::exception &e) { + BOOST_LOG(error) << "Broadcast video failed "sv << e.what(); + std::this_thread::sleep_for(100ms); + } } shutdown_event->raise(true); @@ -1077,29 +1083,35 @@ void audioBroadcastThread(udp::socket &sock) { auto &shards_p = session->audio.shards_p; std::copy_n(audio_packet->payload(), bytes, shards_p[sequenceNumber % RTPA_DATA_SHARDS]); - sock.send_to(asio::buffer((char *)audio_packet.get(), sizeof(audio_packet_raw_t) + bytes), session->audio.peer); + try { + sock.send_to(asio::buffer((char *)audio_packet.get(), sizeof(audio_packet_raw_t) + bytes), session->audio.peer); - BOOST_LOG(verbose) << "Audio ["sv << sequenceNumber << "] :: send..."sv; + BOOST_LOG(verbose) << "Audio ["sv << sequenceNumber << "] :: send..."sv; - auto &fec_packet = session->audio.fec_packet; - // initialize the FEC header at the beginning of the FEC block - if(sequenceNumber % RTPA_DATA_SHARDS == 0) { - fec_packet->fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber); - fec_packet->fecHeader.baseTimestamp = util::endian::big(timestamp); - } - - // generate parity shards at the end of the FEC block - if((sequenceNumber + 1) % RTPA_DATA_SHARDS == 0) { - reed_solomon_encode(rs.get(), shards_p.begin(), RTPA_TOTAL_SHARDS, bytes); - - for(auto x = 0; x < RTPA_FEC_SHARDS; ++x) { - fec_packet->rtp.sequenceNumber = util::endian::big(sequenceNumber + x + 1); - fec_packet->fecHeader.fecShardIndex = x; - memcpy(fec_packet->payload(), shards_p[RTPA_DATA_SHARDS + x], bytes); - sock.send_to(asio::buffer((char *)fec_packet.get(), sizeof(audio_fec_packet_raw_t) + bytes), session->audio.peer); - BOOST_LOG(verbose) << "Audio FEC ["sv << (sequenceNumber & ~(RTPA_DATA_SHARDS - 1)) << ' ' << x << "] :: send..."sv; + auto &fec_packet = session->audio.fec_packet; + // initialize the FEC header at the beginning of the FEC block + if(sequenceNumber % RTPA_DATA_SHARDS == 0) { + fec_packet->fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber); + fec_packet->fecHeader.baseTimestamp = util::endian::big(timestamp); } + + // generate parity shards at the end of the FEC block + if((sequenceNumber + 1) % RTPA_DATA_SHARDS == 0) { + reed_solomon_encode(rs.get(), shards_p.begin(), RTPA_TOTAL_SHARDS, bytes); + + for(auto x = 0; x < RTPA_FEC_SHARDS; ++x) { + fec_packet->rtp.sequenceNumber = util::endian::big(sequenceNumber + x + 1); + fec_packet->fecHeader.fecShardIndex = x; + memcpy(fec_packet->payload(), shards_p[RTPA_DATA_SHARDS + x], bytes); + sock.send_to(asio::buffer((char *)fec_packet.get(), sizeof(audio_fec_packet_raw_t) + bytes), session->audio.peer); + BOOST_LOG(verbose) << "Audio FEC ["sv << (sequenceNumber & ~(RTPA_DATA_SHARDS - 1)) << ' ' << x << "] :: send..."sv; + } + } + } + catch(const std::exception &e) { + BOOST_LOG(error) << "Broadcast audio failed "sv << e.what(); + std::this_thread::sleep_for(100ms); } }