update udp client

This commit is contained in:
pigeatgarlic 2025-01-09 13:09:44 +00:00
parent c5d4d6fc2a
commit 856e352cc9
2 changed files with 68 additions and 39 deletions

View File

@ -8,7 +8,7 @@ import (
func main() {
// listen to incoming udp packets
pc, err := net.ListenPacket("udp", ":63400")
pc, err := net.ListenPacket("udp", ":32521")
if err != nil {
log.Fatal(err)
}
@ -21,11 +21,13 @@ func main() {
if err != nil {
fmt.Printf("error serving %s\n",err.Error())
}
go serve(pc, addr, buf[:n])
fmt.Printf("udp packet size %d from %s\n",n,addr.String())
_,err = pc.WriteTo([]byte("hi"),addr)
if err != nil {
panic(err)
}
}
}
func serve(pc net.PacketConn, addr net.Addr, buf []byte) {
fmt.Printf("udp packet size %d from %s\n",len(buf),addr.String())
}

View File

@ -55,38 +55,56 @@ on_signal(int sig, FN &&fn) {
class Client {
boost::asio::io_context io_service;
udp::socket socket{io_service};
boost::array<char, 16 * 1024> recv_buffer;
udp::endpoint remote_endpoint;
safe::mail_t mail;
public:
Client(safe::mail_t m) {
mail = m;
}
void Receiver(udp::endpoint remote_endpoint)
{
socket.open(udp::v4());
socket.bind(remote_endpoint);
wait();
io_service.run();
void Bind(udp::endpoint _remote_endpoint) {
if(io_service != nullptr)
io_service->stop();
if (socket != nullptr)
socket->close();
io_service = new boost::asio::io_context();
socket = new udp::socket(*io_service);
socket->open(udp::v4());
socket->bind(_remote_endpoint);
remote_endpoint = remote_endpoint;
}
void Receiver() {
wait();
io_service->run();
}
uintptr_t GetHandle() {
return (uintptr_t)socket->native_handle();
}
private:
void handle_receive(const boost::system::error_code& error, size_t bytes_transferred) {
if (error) {
return;
boost::asio::io_context* io_service = nullptr;
udp::socket* socket = nullptr;
boost::array<char, 16 * 1024> recv_buffer;
udp::endpoint remote_endpoint;
safe::mail_t mail;
void handle_receive(const boost::system::error_code& err, size_t bytes_transferred) {
if (!err) {
char* data = recv_buffer.c_array();
BOOST_LOG(error) << "Receive " << std::string(data,data+bytes_transferred);
wait();
}
wait();
}
void wait() {
socket.async_receive_from(boost::asio::buffer(recv_buffer),
remote_endpoint,
boost::bind(&Client::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
socket->async_receive_from(
boost::asio::buffer(recv_buffer),
remote_endpoint,
boost::bind(
&Client::handle_receive,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
));
}
};
@ -227,15 +245,28 @@ main(int argc, char *argv[]) {
int port;
std::string address;
std::stringstream ss; ss << argv[3]; ss >> address;
std::stringstream ss2; ss2 << argv[4]; ss2 >> port;
std::stringstream ss; ss << argv[4]; ss >> address;
std::stringstream ss2; ss2 << argv[5]; ss2 >> port;
udp::endpoint remote_endpoint = udp::endpoint(make_address(address), port);
int lport;
std::string laddress;
std::stringstream ss3; ss3 << argv[2]; ss3 >> laddress;
std::stringstream ss4; ss4 << argv[3]; ss4 >> lport;
udp::endpoint local_endpoint = udp::endpoint(make_address(laddress), lport);
auto push = [process_shutdown_event,remote_endpoint,laddress](safe::mail_t mail, Queue* queue, QueueType queue_type){
auto mail = std::make_shared<safe::mail_raw_t>();
auto client = new Client(mail);
client->Bind(local_endpoint);
std::thread recv([client,local_endpoint] { while(true) {
client->Receiver();
client->Bind(local_endpoint);
std::this_thread::sleep_for(1s);
}});
auto push = [client,process_shutdown_event,remote_endpoint,local_endpoint](safe::mail_t mail, Queue* queue, QueueType queue_type){
auto video_packets = mail->queue<video::packet_t>(mail::video_packets);
auto audio_packets = mail->queue<audio::packet_t>(mail::audio_packets);
auto bitrate = mail->event<int>(mail::bitrate);
@ -244,6 +275,7 @@ main(int argc, char *argv[]) {
auto local_shutdown= mail->event<bool>(mail::shutdown);
auto touch_port = mail->event<input::touch_port_t>(mail::touch_port);
#ifdef _WIN32
if (queue_type == QueueType::Video0 || queue_type == QueueType::Video1)
platf::adjust_thread_priority(platf::thread_priority_e::critical);
@ -253,13 +285,11 @@ main(int argc, char *argv[]) {
auto last_timestamp = std::chrono::high_resolution_clock::now().time_since_epoch().count();
bool first_video_packet = true;
auto localAddr = make_address(laddress);
auto rAddr = remote_endpoint.address();
auto rPort = remote_endpoint.port();
auto lAddr = local_endpoint.address();
auto lPort = local_endpoint.port();
boost::asio::io_context io_service;
udp::socket socket(io_service);
socket.open(udp::v4());
while (!process_shutdown_event->peek() && !local_shutdown->peek()) {
if (queue_type == QueueType::Video0 || queue_type == QueueType::Video1) {
do {
@ -274,8 +304,8 @@ main(int argc, char *argv[]) {
size_t size = packet->data_size();
platf::batched_send_info_t send_info {
ptr, size, 1,
(uintptr_t) socket.native_handle(),
rAddr, rPort, localAddr,
client->GetHandle(),
rAddr, rPort, lAddr
};
platf::send_batch(send_info);
@ -288,8 +318,8 @@ main(int argc, char *argv[]) {
size_t size = packet->second.size();
platf::batched_send_info_t send_info {
ptr, size, 1,
(uintptr_t) socket.native_handle(),
rAddr, rPort, localAddr,
client->GetHandle(),
rAddr, rPort, lAddr
};
platf::send_batch(send_info);
@ -330,9 +360,6 @@ main(int argc, char *argv[]) {
};
auto mail = std::make_shared<safe::mail_raw_t>();
auto client = new Client(mail);
std::thread recv([&] { client->Receiver(remote_endpoint); });
auto queue = &memory->queues[queuetype];
BOOST_LOG(info) << "Starting capture on channel " << queuetype;
if (queuetype == QueueType::Video0 || queuetype == QueueType::Video1) {