diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c index f9bb8d518d..ce8ce96a0d 100644 --- a/bgpd/bgp_io.c +++ b/bgpd/bgp_io.c @@ -50,8 +50,9 @@ static void bgp_process_reads(struct thread *); static bool validate_header(struct peer *); /* generic i/o status codes */ -#define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred -#define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error +#define BGP_IO_TRANS_ERR (1 << 0) /* EAGAIN or similar occurred */ +#define BGP_IO_FATAL_ERR (1 << 1) /* some kind of fatal TCP error */ +#define BGP_IO_WORK_FULL_ERR (1 << 2) /* No room in work buffer */ /* Thread external API ----------------------------------------------------- */ @@ -163,6 +164,58 @@ static void bgp_process_writes(struct thread *thread) } } +static int read_ibuf_work(struct peer *peer) +{ + /* static buffer for transferring packets */ + /* shorter alias to peer's input buffer */ + struct ringbuf *ibw = peer->ibuf_work; + /* packet size as given by header */ + uint16_t pktsize = 0; + struct stream *pkt; + + /* Hold the I/O lock, we might not have space on the InQ */ + frr_mutex_lock_autounlock(&peer->io_mtx); + /* ============================================== */ + + if (peer->ibuf->count >= bm->inq_limit) + return -ENOMEM; + + /* check that we have enough data for a header */ + if (ringbuf_remain(ibw) < BGP_HEADER_SIZE) + return 0; + + /* check that header is valid */ + if (!validate_header(peer)) + return -EBADMSG; + + /* header is valid; retrieve packet size */ + ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize)); + + pktsize = ntohs(pktsize); + + /* if this fails we are seriously screwed */ + assert(pktsize <= peer->max_packet_size); + + /* + * If we have that much data, chuck it into its own + * stream and append to input queue for processing. + * + * Otherwise, come back later. + */ + if (ringbuf_remain(ibw) < pktsize) + return 0; + + pkt = stream_new(pktsize); + assert(STREAM_WRITEABLE(pkt) == pktsize); + assert(ringbuf_get(ibw, pkt->data, pktsize) == pktsize); + stream_set_endp(pkt, pktsize); + + frrtrace(2, frr_bgp, packet_read, peer, pkt); + stream_fifo_push(peer->ibuf, pkt); + + return pktsize; +} + /* * Called from I/O pthread when a file descriptor has become ready for reading, * or has hung up. @@ -173,12 +226,14 @@ static void bgp_process_writes(struct thread *thread) static void bgp_process_reads(struct thread *thread) { /* clang-format off */ - static struct peer *peer; // peer to read from - uint16_t status; // bgp_read status code - bool more = true; // whether we got more data - bool fatal = false; // whether fatal error occurred - bool added_pkt = false; // whether we pushed onto ->ibuf - int code = 0; // FSM code if error occurred + static struct peer *peer; /* peer to read from */ + uint16_t status; /* bgp_read status code */ + bool fatal = false; /* whether fatal error occurred */ + bool added_pkt = false; /* whether we pushed onto ->ibuf */ + int code = 0; /* FSM code if error occurred */ + bool ibuf_full = false; /* Is peer fifo IN Buffer full */ + static bool ibuf_full_logged; /* Have we logged full already */ + int ret = 1; /* clang-format on */ peer = THREAD_ARG(thread); @@ -195,12 +250,11 @@ static void bgp_process_reads(struct thread *thread) /* error checking phase */ if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem; just don't process packets */ - more = false; + goto done; } if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) { /* problem; tear down session */ - more = false; fatal = true; /* Handle the error in the main pthread, include the @@ -208,67 +262,53 @@ static void bgp_process_reads(struct thread *thread) */ thread_add_event(bm->master, bgp_packet_process_error, peer, code, &peer->t_process_packet_error); + goto done; } - while (more) { - /* static buffer for transferring packets */ - /* shorter alias to peer's input buffer */ - struct ringbuf *ibw = peer->ibuf_work; - /* packet size as given by header */ - uint16_t pktsize = 0; - - /* check that we have enough data for a header */ - if (ringbuf_remain(ibw) < BGP_HEADER_SIZE) + while (true) { + ret = read_ibuf_work(peer); + if (ret <= 0) break; - /* check that header is valid */ - if (!validate_header(peer)) { - fatal = true; - break; + added_pkt = true; + } + + switch (ret) { + case -EBADMSG: + fatal = true; + break; + case -ENOMEM: + ibuf_full = true; + if (!ibuf_full_logged) { + flog_warn( + EC_BGP_UPDATE_RCV, + "%s [Warning] Peer Input-Queue is full: limit (%u)", + peer->host, bm->inq_limit); + ibuf_full_logged = true; } - - /* header is valid; retrieve packet size */ - ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize)); - - pktsize = ntohs(pktsize); - - /* if this fails we are seriously screwed */ - assert(pktsize <= peer->max_packet_size); - - /* - * If we have that much data, chuck it into its own - * stream and append to input queue for processing. - */ - if (ringbuf_remain(ibw) >= pktsize) { - struct stream *pkt = stream_new(pktsize); - - assert(STREAM_WRITEABLE(pkt) == pktsize); - assert(ringbuf_get(ibw, pkt->data, pktsize) == pktsize); - stream_set_endp(pkt, pktsize); - - frrtrace(2, frr_bgp, packet_read, peer, pkt); - frr_with_mutex (&peer->io_mtx) { - stream_fifo_push(peer->ibuf, pkt); - } - - added_pkt = true; - } else - break; + break; + default: + ibuf_full_logged = false; + break; } +done: /* handle invalid header */ if (fatal) { /* wipe buffer just in case someone screwed up */ ringbuf_wipe(peer->ibuf_work); - } else { + return; + } + + /* ringbuf should be fully drained unless ibuf is full */ + if (!ibuf_full) assert(ringbuf_space(peer->ibuf_work) >= peer->max_packet_size); - thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, - &peer->t_read); - if (added_pkt) - thread_add_event(bm->master, bgp_process_packet, - peer, 0, &peer->t_process_packet); - } + thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, + &peer->t_read); + if (added_pkt) + thread_add_event(bm->master, bgp_process_packet, peer, 0, + &peer->t_process_packet); } /* @@ -462,12 +502,20 @@ done : { */ static uint16_t bgp_read(struct peer *peer, int *code_p) { - size_t readsize; // how many bytes we want to read - ssize_t nbytes; // how many bytes we actually read + size_t readsize; /* how many bytes we want to read */ + ssize_t nbytes; /* how many bytes we actually read */ + size_t ibuf_work_space; /* space we can read into the work buf */ uint16_t status = 0; - readsize = - MIN(ringbuf_space(peer->ibuf_work), sizeof(peer->ibuf_scratch)); + ibuf_work_space = ringbuf_space(peer->ibuf_work); + + if (ibuf_work_space == 0) { + SET_FLAG(status, BGP_IO_WORK_FULL_ERR); + return status; + } + + readsize = MIN(ibuf_work_space, sizeof(peer->ibuf_scratch)); + nbytes = read(peer->fd, peer->ibuf_scratch, readsize); /* EAGAIN or EWOULDBLOCK; come back later */ diff --git a/bgpd/bgp_vty.c b/bgpd/bgp_vty.c index ec0fcd8f11..e856529fc6 100644 --- a/bgpd/bgp_vty.c +++ b/bgpd/bgp_vty.c @@ -17733,6 +17733,10 @@ int bgp_config_write(struct vty *vty) if (bm->tcp_dscp != IPTOS_PREC_INTERNETCONTROL) vty_out(vty, "bgp session-dscp %u\n", bm->tcp_dscp >> 2); + /* BGP InQ limit */ + if (bm->inq_limit != BM_DEFAULT_INQ_LIMIT) + vty_out(vty, "bgp input-queue-limit %u\n", bm->inq_limit); + /* BGP configuration. */ for (ALL_LIST_ELEMENTS(bm->bgp, mnode, mnnode, bgp)) { @@ -18445,6 +18449,31 @@ DEFPY(mpls_bgp_forwarding, mpls_bgp_forwarding_cmd, return CMD_SUCCESS; } +DEFPY (bgp_inq_limit, + bgp_inq_limit_cmd, + "bgp input-queue-limit (1-4294967295)$limit", + BGP_STR + "Set the BGP Input Queue limit for all peers when message parsing\n" + "Input-Queue limit\n") +{ + bm->inq_limit = limit; + + return CMD_SUCCESS; +} + +DEFPY (no_bgp_inq_limit, + no_bgp_inq_limit_cmd, + "no bgp input-queue-limit [(1-4294967295)$limit]", + NO_STR + BGP_STR + "Set the BGP Input Queue limit for all peers when message parsing\n" + "Input-Queue limit\n") +{ + bm->inq_limit = BM_DEFAULT_INQ_LIMIT; + + return CMD_SUCCESS; +} + /* Initialization of BGP interface. */ static void bgp_vty_if_init(void) { @@ -18494,6 +18523,10 @@ void bgp_vty_init(void) install_default(BGP_EVPN_VNI_NODE); install_default(BGP_SRV6_NODE); + /* "global bgp inq-limit command */ + install_element(CONFIG_NODE, &bgp_inq_limit_cmd); + install_element(CONFIG_NODE, &no_bgp_inq_limit_cmd); + /* "bgp local-mac" hidden commands. */ install_element(CONFIG_NODE, &bgp_local_mac_cmd); install_element(CONFIG_NODE, &no_bgp_local_mac_cmd); @@ -18689,6 +18722,10 @@ void bgp_vty_init(void) install_element(BGP_NODE, &bgp_graceful_restart_rib_stale_time_cmd); install_element(BGP_NODE, &no_bgp_graceful_restart_rib_stale_time_cmd); + /* "bgp inq-limit command */ + install_element(BGP_NODE, &bgp_inq_limit_cmd); + install_element(BGP_NODE, &no_bgp_inq_limit_cmd); + /* "bgp graceful-shutdown" commands */ install_element(BGP_NODE, &bgp_graceful_shutdown_cmd); install_element(BGP_NODE, &no_bgp_graceful_shutdown_cmd); diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c index 005d6b3092..512b52836d 100644 --- a/bgpd/bgpd.c +++ b/bgpd/bgpd.c @@ -7868,6 +7868,7 @@ void bgp_master_init(struct thread_master *master, const int buffer_size, bm->socket_buffer = buffer_size; bm->wait_for_fib = false; bm->tcp_dscp = IPTOS_PREC_INTERNETCONTROL; + bm->inq_limit = BM_DEFAULT_INQ_LIMIT; bgp_mac_init(); /* init the rd id space. diff --git a/bgpd/bgpd.h b/bgpd/bgpd.h index c844f1530f..c41c2ee429 100644 --- a/bgpd/bgpd.h +++ b/bgpd/bgpd.h @@ -177,6 +177,9 @@ struct bgp_master { /* DSCP value for TCP sessions */ uint8_t tcp_dscp; +#define BM_DEFAULT_INQ_LIMIT 10000 + uint32_t inq_limit; + QOBJ_FIELDS; }; DECLARE_QOBJ_TYPE(bgp_master); diff --git a/doc/user/bgp.rst b/doc/user/bgp.rst index 7d7dd3d805..3d9b0b3606 100644 --- a/doc/user/bgp.rst +++ b/doc/user/bgp.rst @@ -4018,6 +4018,11 @@ The following command is available in ``config`` mode as well as in the the startup configuration, graceful shutdown will remain in effect across restarts of *bgpd* and will need to be explicitly disabled. +.. clicmd:: bgp input-queue-limit (1-4294967295) + + Set the BGP Input Queue limit for all peers when messaging parsing. Increase + this only if you have the memory to handle large queues of messages at once. + .. _bgp-displaying-bgp-information: Displaying BGP Information