Merge pull request #12176 from sworleys/BGP-InQ

bgpd,doc: limit InQ buf to allow for back pressure
This commit is contained in:
Russ White 2022-10-27 16:13:44 -04:00 committed by GitHub
commit 86a5cfa31e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 156 additions and 62 deletions

View File

@ -50,8 +50,9 @@ static void bgp_process_reads(struct thread *);
static bool validate_header(struct peer *);
/* generic i/o status codes */
#define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred
#define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error
#define BGP_IO_TRANS_ERR (1 << 0) /* EAGAIN or similar occurred */
#define BGP_IO_FATAL_ERR (1 << 1) /* some kind of fatal TCP error */
#define BGP_IO_WORK_FULL_ERR (1 << 2) /* No room in work buffer */
/* Thread external API ----------------------------------------------------- */
@ -163,6 +164,58 @@ static void bgp_process_writes(struct thread *thread)
}
}
static int read_ibuf_work(struct peer *peer)
{
/* static buffer for transferring packets */
/* shorter alias to peer's input buffer */
struct ringbuf *ibw = peer->ibuf_work;
/* packet size as given by header */
uint16_t pktsize = 0;
struct stream *pkt;
/* Hold the I/O lock, we might not have space on the InQ */
frr_mutex_lock_autounlock(&peer->io_mtx);
/* ============================================== */
if (peer->ibuf->count >= bm->inq_limit)
return -ENOMEM;
/* check that we have enough data for a header */
if (ringbuf_remain(ibw) < BGP_HEADER_SIZE)
return 0;
/* check that header is valid */
if (!validate_header(peer))
return -EBADMSG;
/* header is valid; retrieve packet size */
ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize));
pktsize = ntohs(pktsize);
/* if this fails we are seriously screwed */
assert(pktsize <= peer->max_packet_size);
/*
* If we have that much data, chuck it into its own
* stream and append to input queue for processing.
*
* Otherwise, come back later.
*/
if (ringbuf_remain(ibw) < pktsize)
return 0;
pkt = stream_new(pktsize);
assert(STREAM_WRITEABLE(pkt) == pktsize);
assert(ringbuf_get(ibw, pkt->data, pktsize) == pktsize);
stream_set_endp(pkt, pktsize);
frrtrace(2, frr_bgp, packet_read, peer, pkt);
stream_fifo_push(peer->ibuf, pkt);
return pktsize;
}
/*
* Called from I/O pthread when a file descriptor has become ready for reading,
* or has hung up.
@ -173,12 +226,14 @@ static void bgp_process_writes(struct thread *thread)
static void bgp_process_reads(struct thread *thread)
{
/* clang-format off */
static struct peer *peer; // peer to read from
uint16_t status; // bgp_read status code
bool more = true; // whether we got more data
bool fatal = false; // whether fatal error occurred
bool added_pkt = false; // whether we pushed onto ->ibuf
int code = 0; // FSM code if error occurred
static struct peer *peer; /* peer to read from */
uint16_t status; /* bgp_read status code */
bool fatal = false; /* whether fatal error occurred */
bool added_pkt = false; /* whether we pushed onto ->ibuf */
int code = 0; /* FSM code if error occurred */
bool ibuf_full = false; /* Is peer fifo IN Buffer full */
static bool ibuf_full_logged; /* Have we logged full already */
int ret = 1;
/* clang-format on */
peer = THREAD_ARG(thread);
@ -195,12 +250,11 @@ static void bgp_process_reads(struct thread *thread)
/* error checking phase */
if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
/* no problem; just don't process packets */
more = false;
goto done;
}
if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
/* problem; tear down session */
more = false;
fatal = true;
/* Handle the error in the main pthread, include the
@ -208,67 +262,53 @@ static void bgp_process_reads(struct thread *thread)
*/
thread_add_event(bm->master, bgp_packet_process_error,
peer, code, &peer->t_process_packet_error);
goto done;
}
while (more) {
/* static buffer for transferring packets */
/* shorter alias to peer's input buffer */
struct ringbuf *ibw = peer->ibuf_work;
/* packet size as given by header */
uint16_t pktsize = 0;
/* check that we have enough data for a header */
if (ringbuf_remain(ibw) < BGP_HEADER_SIZE)
while (true) {
ret = read_ibuf_work(peer);
if (ret <= 0)
break;
/* check that header is valid */
if (!validate_header(peer)) {
fatal = true;
break;
}
/* header is valid; retrieve packet size */
ringbuf_peek(ibw, BGP_MARKER_SIZE, &pktsize, sizeof(pktsize));
pktsize = ntohs(pktsize);
/* if this fails we are seriously screwed */
assert(pktsize <= peer->max_packet_size);
/*
* If we have that much data, chuck it into its own
* stream and append to input queue for processing.
*/
if (ringbuf_remain(ibw) >= pktsize) {
struct stream *pkt = stream_new(pktsize);
assert(STREAM_WRITEABLE(pkt) == pktsize);
assert(ringbuf_get(ibw, pkt->data, pktsize) == pktsize);
stream_set_endp(pkt, pktsize);
frrtrace(2, frr_bgp, packet_read, peer, pkt);
frr_with_mutex (&peer->io_mtx) {
stream_fifo_push(peer->ibuf, pkt);
}
added_pkt = true;
} else
}
switch (ret) {
case -EBADMSG:
fatal = true;
break;
case -ENOMEM:
ibuf_full = true;
if (!ibuf_full_logged) {
flog_warn(
EC_BGP_UPDATE_RCV,
"%s [Warning] Peer Input-Queue is full: limit (%u)",
peer->host, bm->inq_limit);
ibuf_full_logged = true;
}
break;
default:
ibuf_full_logged = false;
break;
}
done:
/* handle invalid header */
if (fatal) {
/* wipe buffer just in case someone screwed up */
ringbuf_wipe(peer->ibuf_work);
} else {
return;
}
/* ringbuf should be fully drained unless ibuf is full */
if (!ibuf_full)
assert(ringbuf_space(peer->ibuf_work) >= peer->max_packet_size);
thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);
if (added_pkt)
thread_add_event(bm->master, bgp_process_packet,
peer, 0, &peer->t_process_packet);
}
thread_add_event(bm->master, bgp_process_packet, peer, 0,
&peer->t_process_packet);
}
/*
@ -462,12 +502,20 @@ done : {
*/
static uint16_t bgp_read(struct peer *peer, int *code_p)
{
size_t readsize; // how many bytes we want to read
ssize_t nbytes; // how many bytes we actually read
size_t readsize; /* how many bytes we want to read */
ssize_t nbytes; /* how many bytes we actually read */
size_t ibuf_work_space; /* space we can read into the work buf */
uint16_t status = 0;
readsize =
MIN(ringbuf_space(peer->ibuf_work), sizeof(peer->ibuf_scratch));
ibuf_work_space = ringbuf_space(peer->ibuf_work);
if (ibuf_work_space == 0) {
SET_FLAG(status, BGP_IO_WORK_FULL_ERR);
return status;
}
readsize = MIN(ibuf_work_space, sizeof(peer->ibuf_scratch));
nbytes = read(peer->fd, peer->ibuf_scratch, readsize);
/* EAGAIN or EWOULDBLOCK; come back later */

View File

@ -17733,6 +17733,10 @@ int bgp_config_write(struct vty *vty)
if (bm->tcp_dscp != IPTOS_PREC_INTERNETCONTROL)
vty_out(vty, "bgp session-dscp %u\n", bm->tcp_dscp >> 2);
/* BGP InQ limit */
if (bm->inq_limit != BM_DEFAULT_INQ_LIMIT)
vty_out(vty, "bgp input-queue-limit %u\n", bm->inq_limit);
/* BGP configuration. */
for (ALL_LIST_ELEMENTS(bm->bgp, mnode, mnnode, bgp)) {
@ -18445,6 +18449,31 @@ DEFPY(mpls_bgp_forwarding, mpls_bgp_forwarding_cmd,
return CMD_SUCCESS;
}
DEFPY (bgp_inq_limit,
bgp_inq_limit_cmd,
"bgp input-queue-limit (1-4294967295)$limit",
BGP_STR
"Set the BGP Input Queue limit for all peers when message parsing\n"
"Input-Queue limit\n")
{
bm->inq_limit = limit;
return CMD_SUCCESS;
}
DEFPY (no_bgp_inq_limit,
no_bgp_inq_limit_cmd,
"no bgp input-queue-limit [(1-4294967295)$limit]",
NO_STR
BGP_STR
"Set the BGP Input Queue limit for all peers when message parsing\n"
"Input-Queue limit\n")
{
bm->inq_limit = BM_DEFAULT_INQ_LIMIT;
return CMD_SUCCESS;
}
/* Initialization of BGP interface. */
static void bgp_vty_if_init(void)
{
@ -18494,6 +18523,10 @@ void bgp_vty_init(void)
install_default(BGP_EVPN_VNI_NODE);
install_default(BGP_SRV6_NODE);
/* "global bgp inq-limit command */
install_element(CONFIG_NODE, &bgp_inq_limit_cmd);
install_element(CONFIG_NODE, &no_bgp_inq_limit_cmd);
/* "bgp local-mac" hidden commands. */
install_element(CONFIG_NODE, &bgp_local_mac_cmd);
install_element(CONFIG_NODE, &no_bgp_local_mac_cmd);
@ -18689,6 +18722,10 @@ void bgp_vty_init(void)
install_element(BGP_NODE, &bgp_graceful_restart_rib_stale_time_cmd);
install_element(BGP_NODE, &no_bgp_graceful_restart_rib_stale_time_cmd);
/* "bgp inq-limit command */
install_element(BGP_NODE, &bgp_inq_limit_cmd);
install_element(BGP_NODE, &no_bgp_inq_limit_cmd);
/* "bgp graceful-shutdown" commands */
install_element(BGP_NODE, &bgp_graceful_shutdown_cmd);
install_element(BGP_NODE, &no_bgp_graceful_shutdown_cmd);

View File

@ -7868,6 +7868,7 @@ void bgp_master_init(struct thread_master *master, const int buffer_size,
bm->socket_buffer = buffer_size;
bm->wait_for_fib = false;
bm->tcp_dscp = IPTOS_PREC_INTERNETCONTROL;
bm->inq_limit = BM_DEFAULT_INQ_LIMIT;
bgp_mac_init();
/* init the rd id space.

View File

@ -177,6 +177,9 @@ struct bgp_master {
/* DSCP value for TCP sessions */
uint8_t tcp_dscp;
#define BM_DEFAULT_INQ_LIMIT 10000
uint32_t inq_limit;
QOBJ_FIELDS;
};
DECLARE_QOBJ_TYPE(bgp_master);

View File

@ -4018,6 +4018,11 @@ The following command is available in ``config`` mode as well as in the
the startup configuration, graceful shutdown will remain in effect
across restarts of *bgpd* and will need to be explicitly disabled.
.. clicmd:: bgp input-queue-limit (1-4294967295)
Set the BGP Input Queue limit for all peers when messaging parsing. Increase
this only if you have the memory to handle large queues of messages at once.
.. _bgp-displaying-bgp-information:
Displaying BGP Information