Merge pull request #8220 from mjstapp/bgp_notify_race

bgpd: handle socket read errors in the main pthread
This commit is contained in:
Donatas Abraitis 2021-03-15 13:30:00 +02:00 committed by GitHub
commit 8b87b2f4f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 27 deletions

View File

@ -45,7 +45,7 @@
/* forward declarations */ /* forward declarations */
static uint16_t bgp_write(struct peer *); static uint16_t bgp_write(struct peer *);
static uint16_t bgp_read(struct peer *); static uint16_t bgp_read(struct peer *peer, int *code_p);
static int bgp_process_writes(struct thread *); static int bgp_process_writes(struct thread *);
static int bgp_process_reads(struct thread *); static int bgp_process_reads(struct thread *);
static bool validate_header(struct peer *); static bool validate_header(struct peer *);
@ -181,6 +181,7 @@ static int bgp_process_reads(struct thread *thread)
bool fatal = false; // whether fatal error occurred bool fatal = false; // whether fatal error occurred
bool added_pkt = false; // whether we pushed onto ->ibuf bool added_pkt = false; // whether we pushed onto ->ibuf
/* clang-format on */ /* clang-format on */
int code;
peer = THREAD_ARG(thread); peer = THREAD_ARG(thread);
@ -190,7 +191,7 @@ static int bgp_process_reads(struct thread *thread)
struct frr_pthread *fpt = bgp_pth_io; struct frr_pthread *fpt = bgp_pth_io;
frr_with_mutex(&peer->io_mtx) { frr_with_mutex(&peer->io_mtx) {
status = bgp_read(peer); status = bgp_read(peer, &code);
} }
/* error checking phase */ /* error checking phase */
@ -203,6 +204,12 @@ static int bgp_process_reads(struct thread *thread)
/* problem; tear down session */ /* problem; tear down session */
more = false; more = false;
fatal = true; fatal = true;
/* Handle the error in the main pthread, include the
* specific state change from 'bgp_read'.
*/
thread_add_event(bm->master, bgp_packet_process_error,
peer, code, NULL);
} }
while (more) { while (more) {
@ -236,6 +243,7 @@ static int bgp_process_reads(struct thread *thread)
*/ */
if (ringbuf_remain(ibw) >= pktsize) { if (ringbuf_remain(ibw) >= pktsize) {
struct stream *pkt = stream_new(pktsize); struct stream *pkt = stream_new(pktsize);
assert(STREAM_WRITEABLE(pkt) == pktsize); assert(STREAM_WRITEABLE(pkt) == pktsize);
assert(ringbuf_get(ibw, pkt->data, pktsize) == pktsize); assert(ringbuf_get(ibw, pkt->data, pktsize) == pktsize);
stream_set_endp(pkt, pktsize); stream_set_endp(pkt, pktsize);
@ -449,7 +457,7 @@ done : {
* *
* @return status flag (see top-of-file) * @return status flag (see top-of-file)
*/ */
static uint16_t bgp_read(struct peer *peer) static uint16_t bgp_read(struct peer *peer, int *code_p)
{ {
ssize_t nbytes; // how many bytes we actually read ssize_t nbytes; // how many bytes we actually read
uint16_t status = 0; uint16_t status = 0;
@ -459,43 +467,28 @@ static uint16_t bgp_read(struct peer *peer)
/* EAGAIN or EWOULDBLOCK; come back later */ /* EAGAIN or EWOULDBLOCK; come back later */
if (nbytes < 0 && ERRNO_IO_RETRY(errno)) { if (nbytes < 0 && ERRNO_IO_RETRY(errno)) {
SET_FLAG(status, BGP_IO_TRANS_ERR); SET_FLAG(status, BGP_IO_TRANS_ERR);
/* Fatal error; tear down session */
} else if (nbytes < 0) { } else if (nbytes < 0) {
/* Fatal error; tear down session */
flog_err(EC_BGP_UPDATE_RCV, flog_err(EC_BGP_UPDATE_RCV,
"%s [Error] bgp_read_packet error: %s", peer->host, "%s [Error] bgp_read_packet error: %s", peer->host,
safe_strerror(errno)); safe_strerror(errno));
if (peer->status == Established) { /* Handle the error in the main pthread. */
if ((CHECK_FLAG(peer->flags, PEER_FLAG_GRACEFUL_RESTART) if (code_p)
|| CHECK_FLAG(peer->flags, *code_p = TCP_fatal_error;
PEER_FLAG_GRACEFUL_RESTART_HELPER))
&& CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
} else
peer->last_reset = PEER_DOWN_CLOSE_SESSION;
}
BGP_EVENT_ADD(peer, TCP_fatal_error);
SET_FLAG(status, BGP_IO_FATAL_ERR); SET_FLAG(status, BGP_IO_FATAL_ERR);
/* Received EOF / TCP session closed */
} else if (nbytes == 0) { } else if (nbytes == 0) {
/* Received EOF / TCP session closed */
if (bgp_debug_neighbor_events(peer)) if (bgp_debug_neighbor_events(peer))
zlog_debug("%s [Event] BGP connection closed fd %d", zlog_debug("%s [Event] BGP connection closed fd %d",
peer->host, peer->fd); peer->host, peer->fd);
if (peer->status == Established) { /* Handle the error in the main pthread. */
if ((CHECK_FLAG(peer->flags, PEER_FLAG_GRACEFUL_RESTART) if (code_p)
|| CHECK_FLAG(peer->flags, *code_p = TCP_connection_closed;
PEER_FLAG_GRACEFUL_RESTART_HELPER))
&& CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
} else
peer->last_reset = PEER_DOWN_CLOSE_SESSION;
}
BGP_EVENT_ADD(peer, TCP_connection_closed);
SET_FLAG(status, BGP_IO_FATAL_ERR); SET_FLAG(status, BGP_IO_FATAL_ERR);
} }

View File

@ -2699,3 +2699,37 @@ void bgp_send_delayed_eor(struct bgp *bgp)
for (ALL_LIST_ELEMENTS(bgp->peer, node, nnode, peer)) for (ALL_LIST_ELEMENTS(bgp->peer, node, nnode, peer))
bgp_write_proceed_actions(peer); bgp_write_proceed_actions(peer);
} }
/*
* Task callback to handle socket error encountered in the io pthread. We avoid
* having the io pthread try to enqueue fsm events or mess with the peer
* struct.
*/
int bgp_packet_process_error(struct thread *thread)
{
struct peer *peer;
int code;
peer = THREAD_ARG(thread);
code = THREAD_VAL(thread);
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s [Event] BGP error %d on fd %d",
peer->host, peer->fd, code);
/* Closed connection or error on the socket */
if (peer->status == Established) {
if ((CHECK_FLAG(peer->flags, PEER_FLAG_GRACEFUL_RESTART)
|| CHECK_FLAG(peer->flags,
PEER_FLAG_GRACEFUL_RESTART_HELPER))
&& CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
} else
peer->last_reset = PEER_DOWN_CLOSE_SESSION;
}
bgp_event_update(peer, code);
return 0;
}

View File

@ -83,4 +83,8 @@ extern int bgp_generate_updgrp_packets(struct thread *);
extern int bgp_process_packet(struct thread *); extern int bgp_process_packet(struct thread *);
extern void bgp_send_delayed_eor(struct bgp *bgp); extern void bgp_send_delayed_eor(struct bgp *bgp);
/* Task callback to handle socket error encountered in the io pthread */
int bgp_packet_process_error(struct thread *thread);
#endif /* _QUAGGA_BGP_PACKET_H */ #endif /* _QUAGGA_BGP_PACKET_H */