bgpd: Make the process_queue per bgp process

We currently have a global process queue for handling route
updates in bgp.  This is fine, in general, except there are
places and times where we plug the queue for no new work
during certain peer states of bgp update delay.  If we
happen to be processing multiple bgp instances on startup
why do we want to stop processing in vrf A when vrf B
is in a bit of a pickle?

Also this separation will allow us to start forward thinking
about how to fully integrate pthreads into route processing
in bgp.

Signed-off-by: Donald Sharp <sharpd@nvidia.com>
This commit is contained in:
Donald Sharp 2020-10-17 09:43:14 -04:00
parent 719e0a6a6c
commit b6c386bbbd
5 changed files with 32 additions and 28 deletions

View File

@ -740,11 +740,12 @@ void bgp_update_delay_end(struct bgp *bgp)
bgp->main_zebra_update_hold = 1; bgp->main_zebra_update_hold = 1;
bgp->main_peers_update_hold = 1; bgp->main_peers_update_hold = 1;
/* Resume the queue processing. This should trigger the event that would /*
take * Resume the queue processing. This should trigger the event that would
care of processing any work that was queued during the read-only * take care of processing any work that was queued during the read-only
mode. */ * mode.
work_queue_unplug(bm->process_main_queue); */
work_queue_unplug(bgp->process_queue);
} }
/** /**
@ -997,7 +998,7 @@ static void bgp_update_delay_begin(struct bgp *bgp)
struct peer *peer; struct peer *peer;
/* Stop the processing of queued work. Enqueue shall continue */ /* Stop the processing of queued work. Enqueue shall continue */
work_queue_plug(bm->process_main_queue); work_queue_plug(bgp->process_queue);
for (ALL_LIST_ELEMENTS(bgp->peer, node, nnode, peer)) for (ALL_LIST_ELEMENTS(bgp->peer, node, nnode, peer))
peer->update_delay_over = 0; peer->update_delay_over = 0;

View File

@ -2937,18 +2937,21 @@ static void bgp_processq_del(struct work_queue *wq, void *data)
XFREE(MTYPE_BGP_PROCESS_QUEUE, pqnode); XFREE(MTYPE_BGP_PROCESS_QUEUE, pqnode);
} }
void bgp_process_queue_init(void) void bgp_process_queue_init(struct bgp *bgp)
{ {
if (!bm->process_main_queue) if (!bgp->process_queue) {
bm->process_main_queue = char name[BUFSIZ];
work_queue_new(bm->master, "process_main_queue");
bm->process_main_queue->spec.workfunc = &bgp_process_wq; snprintf(name, BUFSIZ, "process_queue %s", bgp->name_pretty);
bm->process_main_queue->spec.del_item_data = &bgp_processq_del; bgp->process_queue = work_queue_new(bm->master, name);
bm->process_main_queue->spec.max_retries = 0; }
bm->process_main_queue->spec.hold = 50;
bgp->process_queue->spec.workfunc = &bgp_process_wq;
bgp->process_queue->spec.del_item_data = &bgp_processq_del;
bgp->process_queue->spec.max_retries = 0;
bgp->process_queue->spec.hold = 50;
/* Use a higher yield value of 50ms for main queue processing */ /* Use a higher yield value of 50ms for main queue processing */
bm->process_main_queue->spec.yield = 50 * 1000L; bgp->process_queue->spec.yield = 50 * 1000L;
} }
static struct bgp_process_queue *bgp_processq_alloc(struct bgp *bgp) static struct bgp_process_queue *bgp_processq_alloc(struct bgp *bgp)
@ -2968,7 +2971,7 @@ static struct bgp_process_queue *bgp_processq_alloc(struct bgp *bgp)
void bgp_process(struct bgp *bgp, struct bgp_dest *dest, afi_t afi, safi_t safi) void bgp_process(struct bgp *bgp, struct bgp_dest *dest, afi_t afi, safi_t safi)
{ {
#define ARBITRARY_PROCESS_QLEN 10000 #define ARBITRARY_PROCESS_QLEN 10000
struct work_queue *wq = bm->process_main_queue; struct work_queue *wq = bgp->process_queue;
struct bgp_process_queue *pqnode; struct bgp_process_queue *pqnode;
int pqnode_reuse = 0; int pqnode_reuse = 0;
@ -3025,13 +3028,13 @@ void bgp_add_eoiu_mark(struct bgp *bgp)
{ {
struct bgp_process_queue *pqnode; struct bgp_process_queue *pqnode;
if (bm->process_main_queue == NULL) if (bgp->process_queue == NULL)
return; return;
pqnode = bgp_processq_alloc(bgp); pqnode = bgp_processq_alloc(bgp);
SET_FLAG(pqnode->flags, BGP_PROCESS_QUEUE_EOIU_MARKER); SET_FLAG(pqnode->flags, BGP_PROCESS_QUEUE_EOIU_MARKER);
work_queue_add(bm->process_main_queue, pqnode); work_queue_add(bgp->process_queue, pqnode);
} }
static int bgp_maximum_prefix_restart_timer(struct thread *thread) static int bgp_maximum_prefix_restart_timer(struct thread *thread)
@ -4543,7 +4546,7 @@ static void bgp_clear_route_table(struct peer *peer, afi_t afi, safi_t safi,
struct bgp_table *table) struct bgp_table *table)
{ {
struct bgp_dest *dest; struct bgp_dest *dest;
int force = bm->process_main_queue ? 0 : 1; int force = peer->bgp->process_queue ? 0 : 1;
if (!table) if (!table)
table = peer->bgp->rib[afi][safi]; table = peer->bgp->rib[afi][safi];

View File

@ -527,7 +527,7 @@ DECLARE_HOOK(bgp_process,
/* Prototypes. */ /* Prototypes. */
extern void bgp_rib_remove(struct bgp_dest *dest, struct bgp_path_info *pi, extern void bgp_rib_remove(struct bgp_dest *dest, struct bgp_path_info *pi,
struct peer *peer, afi_t afi, safi_t safi); struct peer *peer, afi_t afi, safi_t safi);
extern void bgp_process_queue_init(void); extern void bgp_process_queue_init(struct bgp *bgp);
extern void bgp_route_init(void); extern void bgp_route_init(void);
extern void bgp_route_finish(void); extern void bgp_route_finish(void);
extern void bgp_cleanup_routes(struct bgp *); extern void bgp_cleanup_routes(struct bgp *);

View File

@ -2981,6 +2981,8 @@ static struct bgp *bgp_create(as_t *as, const char *name,
} }
bgp_lock(bgp); bgp_lock(bgp);
bgp_process_queue_init(bgp);
bgp->heuristic_coalesce = true; bgp->heuristic_coalesce = true;
bgp->inst_type = inst_type; bgp->inst_type = inst_type;
bgp->vrf_id = (inst_type == BGP_INSTANCE_TYPE_DEFAULT) ? VRF_DEFAULT bgp->vrf_id = (inst_type == BGP_INSTANCE_TYPE_DEFAULT) ? VRF_DEFAULT
@ -3505,6 +3507,9 @@ int bgp_delete(struct bgp *bgp)
bgp_set_evpn(bgp_get_default()); bgp_set_evpn(bgp_get_default());
} }
if (bgp->process_queue)
work_queue_free_and_null(&bgp->process_queue);
thread_master_free_unused(bm->master); thread_master_free_unused(bm->master);
bgp_unlock(bgp); /* initial reference */ bgp_unlock(bgp); /* initial reference */
@ -7085,8 +7090,6 @@ void bgp_master_init(struct thread_master *master, const int buffer_size)
bm->terminating = false; bm->terminating = false;
bm->socket_buffer = buffer_size; bm->socket_buffer = buffer_size;
bgp_process_queue_init();
bgp_mac_init(); bgp_mac_init();
/* init the rd id space. /* init the rd id space.
assign 0th index in the bitfield, assign 0th index in the bitfield,
@ -7291,9 +7294,6 @@ void bgp_terminate(void)
bgp_notify_send(peer, BGP_NOTIFY_CEASE, bgp_notify_send(peer, BGP_NOTIFY_CEASE,
BGP_NOTIFY_CEASE_PEER_UNCONFIG); BGP_NOTIFY_CEASE_PEER_UNCONFIG);
if (bm->process_main_queue)
work_queue_free_and_null(&bm->process_main_queue);
if (bm->t_rmap_update) if (bm->t_rmap_update)
BGP_TIMER_OFF(bm->t_rmap_update); BGP_TIMER_OFF(bm->t_rmap_update);

View File

@ -122,9 +122,6 @@ struct bgp_master {
/* BGP thread master. */ /* BGP thread master. */
struct thread_master *master; struct thread_master *master;
/* work queues */
struct work_queue *process_main_queue;
/* Listening sockets */ /* Listening sockets */
struct list *listen_sockets; struct list *listen_sockets;
@ -682,6 +679,9 @@ struct bgp {
/* Weighted ECMP related config. */ /* Weighted ECMP related config. */
enum bgp_link_bw_handling lb_handling; enum bgp_link_bw_handling lb_handling;
/* Process Queue for handling routes */
struct work_queue *process_queue;
QOBJ_FIELDS QOBJ_FIELDS
}; };
DECLARE_QOBJ_TYPE(bgp) DECLARE_QOBJ_TYPE(bgp)