diff --git a/bgpd/bgp_rpki.c b/bgpd/bgp_rpki.c index 52c5dc5e90..9a42bb7a85 100644 --- a/bgpd/bgp_rpki.c +++ b/bgpd/bgp_rpki.c @@ -47,6 +47,7 @@ #include "bgpd/bgp_attr.h" #include "bgpd/bgp_aspath.h" #include "bgpd/bgp_route.h" +#include "lib/network.h" #include "lib/thread.h" #include "rtrlib/rtrlib.h" #include "rtrlib/rtr_mgr.h" @@ -131,12 +132,14 @@ static route_map_result_t route_match(void *rule, const struct prefix *prefix, static void *route_match_compile(const char *arg); static void revalidate_bgp_node(struct bgp_node *bgp_node, afi_t afi, safi_t safi); +static void revalidate_all_routes(void); static struct rtr_mgr_config *rtr_config; static struct list *cache_list; static int rtr_is_running; static int rtr_is_stopping; static int rtr_is_starting; +static _Atomic int rtr_update_overflow; static int rpki_debug; static unsigned int polling_period; static unsigned int expire_interval; @@ -345,6 +348,19 @@ static int bgpd_sync_callback(struct thread *thread) thread_add_read(bm->master, bgpd_sync_callback, NULL, rpki_sync_socket_bgpd, NULL); + + if (atomic_load_explicit(&rtr_update_overflow, memory_order_seq_cst)) { + while (read(rpki_sync_socket_bgpd, &rec, + sizeof(struct pfx_record)) + != -1) + ; + + atomic_store_explicit(&rtr_update_overflow, 0, + memory_order_seq_cst); + revalidate_all_routes(); + return 0; + } + int retval = read(rpki_sync_socket_bgpd, &rec, sizeof(struct pfx_record)); if (retval != sizeof(struct pfx_record)) { @@ -442,28 +458,53 @@ static void rpki_update_cb_sync_rtr(struct pfx_table *p __attribute__((unused)), const struct pfx_record rec, const bool added __attribute__((unused))) { - if (rtr_is_stopping || rtr_is_starting) + if (rtr_is_stopping || rtr_is_starting + || atomic_load_explicit(&rtr_update_overflow, memory_order_seq_cst)) return; int retval = write(rpki_sync_socket_rtr, &rec, sizeof(struct pfx_record)); - if (retval != sizeof(struct pfx_record)) + if (retval == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) + atomic_store_explicit(&rtr_update_overflow, 1, + memory_order_seq_cst); + + else if (retval != sizeof(struct pfx_record)) RPKI_DEBUG("Could not write to rpki_sync_socket_rtr"); } static void rpki_init_sync_socket(void) { int fds[2]; + const char *msg; RPKI_DEBUG("initializing sync socket"); if (socketpair(PF_LOCAL, SOCK_DGRAM, 0, fds) != 0) { - RPKI_DEBUG("Could not open rpki sync socket"); - return; + msg = "could not open rpki sync socketpair"; + goto err; } rpki_sync_socket_rtr = fds[0]; rpki_sync_socket_bgpd = fds[1]; + + if (set_nonblocking(rpki_sync_socket_rtr) != 0) { + msg = "could not set rpki_sync_socket_rtr to non blocking"; + goto err; + } + + if (set_nonblocking(rpki_sync_socket_bgpd) != 0) { + msg = "could not set rpki_sync_socket_bgpd to non blocking"; + goto err; + } + + thread_add_read(bm->master, bgpd_sync_callback, NULL, rpki_sync_socket_bgpd, NULL); + + return; + +err: + zlog_err("RPKI: %s", msg); + abort(); + } static int bgp_rpki_init(struct thread_master *master) @@ -514,6 +555,7 @@ static int start(void) rtr_is_stopping = 0; rtr_is_starting = 1; + rtr_update_overflow = 0; if (list_isempty(cache_list)) { RPKI_DEBUG(