From 856e352cc9216c85fb19c000809043e6695ac21e Mon Sep 17 00:00:00 2001 From: pigeatgarlic <64737125+pigeatgarlic@users.noreply.github.com> Date: Thu, 9 Jan 2025 13:09:44 +0000 Subject: [PATCH] update udp client --- server/server.go | 12 +++--- src/main.cpp | 95 +++++++++++++++++++++++++++++++----------------- 2 files changed, 68 insertions(+), 39 deletions(-) diff --git a/server/server.go b/server/server.go index dc6e5019..ae90d35b 100644 --- a/server/server.go +++ b/server/server.go @@ -8,7 +8,7 @@ import ( func main() { // listen to incoming udp packets - pc, err := net.ListenPacket("udp", ":63400") + pc, err := net.ListenPacket("udp", ":32521") if err != nil { log.Fatal(err) } @@ -21,11 +21,13 @@ func main() { if err != nil { fmt.Printf("error serving %s\n",err.Error()) } - go serve(pc, addr, buf[:n]) + + fmt.Printf("udp packet size %d from %s\n",n,addr.String()) + _,err = pc.WriteTo([]byte("hi"),addr) + if err != nil { + panic(err) + } } } -func serve(pc net.PacketConn, addr net.Addr, buf []byte) { - fmt.Printf("udp packet size %d from %s\n",len(buf),addr.String()) -} diff --git a/src/main.cpp b/src/main.cpp index 67b03026..44db15fa 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -55,38 +55,56 @@ on_signal(int sig, FN &&fn) { class Client { - boost::asio::io_context io_service; - udp::socket socket{io_service}; - boost::array recv_buffer; - udp::endpoint remote_endpoint; - safe::mail_t mail; - public: Client(safe::mail_t m) { mail = m; } - void Receiver(udp::endpoint remote_endpoint) - { - socket.open(udp::v4()); - socket.bind(remote_endpoint); - wait(); - io_service.run(); + void Bind(udp::endpoint _remote_endpoint) { + if(io_service != nullptr) + io_service->stop(); + if (socket != nullptr) + socket->close(); + + io_service = new boost::asio::io_context(); + socket = new udp::socket(*io_service); + socket->open(udp::v4()); + socket->bind(_remote_endpoint); + remote_endpoint = remote_endpoint; + } + void Receiver() { + wait(); + io_service->run(); + } + + uintptr_t GetHandle() { + return (uintptr_t)socket->native_handle(); } private: - void handle_receive(const boost::system::error_code& error, size_t bytes_transferred) { - if (error) { - return; + boost::asio::io_context* io_service = nullptr; + udp::socket* socket = nullptr; + boost::array recv_buffer; + udp::endpoint remote_endpoint; + safe::mail_t mail; + void handle_receive(const boost::system::error_code& err, size_t bytes_transferred) { + if (!err) { + char* data = recv_buffer.c_array(); + BOOST_LOG(error) << "Receive " << std::string(data,data+bytes_transferred); + wait(); } - - wait(); } void wait() { - socket.async_receive_from(boost::asio::buffer(recv_buffer), - remote_endpoint, - boost::bind(&Client::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + socket->async_receive_from( + boost::asio::buffer(recv_buffer), + remote_endpoint, + boost::bind( + &Client::handle_receive, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred + )); } }; @@ -227,15 +245,28 @@ main(int argc, char *argv[]) { int port; std::string address; - std::stringstream ss; ss << argv[3]; ss >> address; - std::stringstream ss2; ss2 << argv[4]; ss2 >> port; + std::stringstream ss; ss << argv[4]; ss >> address; + std::stringstream ss2; ss2 << argv[5]; ss2 >> port; udp::endpoint remote_endpoint = udp::endpoint(make_address(address), port); + int lport; std::string laddress; std::stringstream ss3; ss3 << argv[2]; ss3 >> laddress; + std::stringstream ss4; ss4 << argv[3]; ss4 >> lport; + udp::endpoint local_endpoint = udp::endpoint(make_address(laddress), lport); - auto push = [process_shutdown_event,remote_endpoint,laddress](safe::mail_t mail, Queue* queue, QueueType queue_type){ + auto mail = std::make_shared(); + auto client = new Client(mail); + + client->Bind(local_endpoint); + std::thread recv([client,local_endpoint] { while(true) { + client->Receiver(); + client->Bind(local_endpoint); + std::this_thread::sleep_for(1s); + }}); + + auto push = [client,process_shutdown_event,remote_endpoint,local_endpoint](safe::mail_t mail, Queue* queue, QueueType queue_type){ auto video_packets = mail->queue(mail::video_packets); auto audio_packets = mail->queue(mail::audio_packets); auto bitrate = mail->event(mail::bitrate); @@ -244,6 +275,7 @@ main(int argc, char *argv[]) { auto local_shutdown= mail->event(mail::shutdown); auto touch_port = mail->event(mail::touch_port); + #ifdef _WIN32 if (queue_type == QueueType::Video0 || queue_type == QueueType::Video1) platf::adjust_thread_priority(platf::thread_priority_e::critical); @@ -253,13 +285,11 @@ main(int argc, char *argv[]) { auto last_timestamp = std::chrono::high_resolution_clock::now().time_since_epoch().count(); bool first_video_packet = true; - auto localAddr = make_address(laddress); auto rAddr = remote_endpoint.address(); auto rPort = remote_endpoint.port(); + auto lAddr = local_endpoint.address(); + auto lPort = local_endpoint.port(); - boost::asio::io_context io_service; - udp::socket socket(io_service); - socket.open(udp::v4()); while (!process_shutdown_event->peek() && !local_shutdown->peek()) { if (queue_type == QueueType::Video0 || queue_type == QueueType::Video1) { do { @@ -274,8 +304,8 @@ main(int argc, char *argv[]) { size_t size = packet->data_size(); platf::batched_send_info_t send_info { ptr, size, 1, - (uintptr_t) socket.native_handle(), - rAddr, rPort, localAddr, + client->GetHandle(), + rAddr, rPort, lAddr }; platf::send_batch(send_info); @@ -288,8 +318,8 @@ main(int argc, char *argv[]) { size_t size = packet->second.size(); platf::batched_send_info_t send_info { ptr, size, 1, - (uintptr_t) socket.native_handle(), - rAddr, rPort, localAddr, + client->GetHandle(), + rAddr, rPort, lAddr }; platf::send_batch(send_info); @@ -330,9 +360,6 @@ main(int argc, char *argv[]) { }; - auto mail = std::make_shared(); - auto client = new Client(mail); - std::thread recv([&] { client->Receiver(remote_endpoint); }); auto queue = &memory->queues[queuetype]; BOOST_LOG(info) << "Starting capture on channel " << queuetype; if (queuetype == QueueType::Video0 || queuetype == QueueType::Video1) {