libs, bgpd: improve task cancellation by argument value

Extend the thread_cancel_event api so that it's more complete:
look in all the lists of events, including io and timers, for
matching tasks. Add a limited version of the api that only
examines tasks in the event and ready queues.

BGP appears to require the old behavior, so change its macro
to use the more limited cancel api.

Signed-off-by: Mark Stapp <mjs@voltanet.io>
This commit is contained in:
Mark Stapp 2021-01-27 14:32:22 -05:00
parent aea25d1ec8
commit a9318a3287
3 changed files with 161 additions and 49 deletions

View File

@ -31,7 +31,7 @@
#define BGP_TIMER_OFF(T) \
do { \
THREAD_OFF(T); \
THREAD_OFF((T)); \
} while (0)
#define BGP_EVENT_ADD(P, E) \
@ -44,7 +44,7 @@
#define BGP_EVENT_FLUSH(P) \
do { \
assert(peer); \
thread_cancel_event(bm->master, (P)); \
thread_cancel_event_ready(bm->master, (P)); \
} while (0)
#define BGP_UPDATE_GROUP_TIMER_ON(T, F) \
@ -53,10 +53,10 @@
PEER_ROUTE_ADV_DELAY(peer)) \
thread_add_timer_msec(bm->master, (F), peer, \
(BGP_DEFAULT_UPDATE_ADVERTISEMENT_TIME * 1000),\
T); \
(T)); \
else \
thread_add_timer_msec(bm->master, (F), peer, \
0, T); \
0, (T)); \
} while (0) \
#define BGP_MSEC_JITTER 10

View File

@ -1057,21 +1057,29 @@ struct thread *_thread_add_event(const struct xref_threadsched *xref,
* - POLLIN
* - POLLOUT
*/
static void thread_cancel_rw(struct thread_master *master, int fd, short state)
static void thread_cancel_rw(struct thread_master *master, int fd, short state,
int idx_hint)
{
bool found = false;
/* Cancel POLLHUP too just in case some bozo set it */
state |= POLLHUP;
/* find the index of corresponding pollfd */
nfds_t i;
for (i = 0; i < master->handler.pfdcount; i++)
if (master->handler.pfds[i].fd == fd) {
found = true;
break;
}
/* Cancel POLLHUP too just in case some bozo set it */
state |= POLLHUP;
/* Some callers know the index of the pfd already */
if (idx_hint >= 0) {
i = idx_hint;
found = true;
} else {
/* Have to look for the fd in the pfd array */
for (i = 0; i < master->handler.pfdcount; i++)
if (master->handler.pfds[i].fd == fd) {
found = true;
break;
}
}
if (!found) {
zlog_debug(
@ -1111,6 +1119,95 @@ static void thread_cancel_rw(struct thread_master *master, int fd, short state)
}
}
/*
* Process task cancellation given a task argument: iterate through the
* various lists of tasks, looking for any that match the argument.
*/
static void cancel_arg_helper(struct thread_master *master,
const struct cancel_req *cr)
{
struct thread *t;
nfds_t i;
int fd;
struct pollfd *pfd;
/* We're only processing arg-based cancellations here. */
if (cr->eventobj == NULL)
return;
/* First process the ready lists. */
frr_each_safe(thread_list, &master->event, t) {
if (t->arg != cr->eventobj)
continue;
thread_list_del(&master->event, t);
if (t->ref)
*t->ref = NULL;
thread_add_unuse(master, t);
}
frr_each_safe(thread_list, &master->ready, t) {
if (t->arg != cr->eventobj)
continue;
thread_list_del(&master->ready, t);
if (t->ref)
*t->ref = NULL;
thread_add_unuse(master, t);
}
/* If requested, stop here and ignore io and timers */
if (CHECK_FLAG(cr->flags, THREAD_CANCEL_FLAG_READY))
return;
/* Check the io tasks */
for (i = 0; i < master->handler.pfdcount;) {
pfd = master->handler.pfds + i;
if (pfd->events & POLLIN)
t = master->read[pfd->fd];
else
t = master->write[pfd->fd];
if (t && t->arg == cr->eventobj) {
fd = pfd->fd;
/* Found a match to cancel: clean up fd arrays */
thread_cancel_rw(master, pfd->fd, pfd->events, i);
/* Clean up thread arrays */
master->read[fd] = NULL;
master->write[fd] = NULL;
/* Clear caller's ref */
if (t->ref)
*t->ref = NULL;
thread_add_unuse(master, t);
/* Don't increment 'i' since the cancellation will have
* removed the entry from the pfd array
*/
} else
i++;
}
/* Check the timer tasks */
t = thread_timer_list_first(&master->timer);
while (t) {
struct thread *t_next;
t_next = thread_timer_list_next(&master->timer, t);
if (t->arg == cr->eventobj) {
thread_timer_list_del(&master->timer, t);
if (t->ref)
*t->ref = NULL;
thread_add_unuse(master, t);
}
t = t_next;
}
}
/**
* Process cancellation requests.
*
@ -1129,31 +1226,12 @@ static void do_thread_cancel(struct thread_master *master)
struct listnode *ln;
for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
/*
* If this is an event object cancellation, linear search
* through event list deleting any events which have the
* specified argument. We also need to check every thread
* in the ready queue.
* If this is an event object cancellation, search
* through task lists deleting any tasks which have the
* specified argument - use this handy helper function.
*/
if (cr->eventobj) {
struct thread *t;
frr_each_safe(thread_list, &master->event, t) {
if (t->arg != cr->eventobj)
continue;
thread_list_del(&master->event, t);
if (t->ref)
*t->ref = NULL;
thread_add_unuse(master, t);
}
frr_each_safe(thread_list, &master->ready, t) {
if (t->arg != cr->eventobj)
continue;
thread_list_del(&master->ready, t);
if (t->ref)
*t->ref = NULL;
thread_add_unuse(master, t);
}
cancel_arg_helper(master, cr);
continue;
}
@ -1171,11 +1249,11 @@ static void do_thread_cancel(struct thread_master *master)
/* Determine the appropriate queue to cancel the thread from */
switch (thread->type) {
case THREAD_READ:
thread_cancel_rw(master, thread->u.fd, POLLIN);
thread_cancel_rw(master, thread->u.fd, POLLIN, -1);
thread_array = master->read;
break;
case THREAD_WRITE:
thread_cancel_rw(master, thread->u.fd, POLLOUT);
thread_cancel_rw(master, thread->u.fd, POLLOUT, -1);
thread_array = master->write;
break;
case THREAD_TIMER:
@ -1213,6 +1291,30 @@ static void do_thread_cancel(struct thread_master *master)
pthread_cond_broadcast(&master->cancel_cond);
}
/*
* Helper function used for multiple flavors of arg-based cancellation.
*/
static void cancel_event_helper(struct thread_master *m, void *arg, int flags)
{
struct cancel_req *cr;
assert(m->owner == pthread_self());
/* Only worth anything if caller supplies an arg. */
if (arg == NULL)
return;
cr = XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
cr->flags = flags;
frr_with_mutex(&m->mtx) {
cr->eventobj = arg;
listnode_add(m->cancel_req, cr);
do_thread_cancel(m);
}
}
/**
* Cancel any events which have the specified argument.
*
@ -1223,15 +1325,22 @@ static void do_thread_cancel(struct thread_master *master)
*/
void thread_cancel_event(struct thread_master *master, void *arg)
{
assert(master->owner == pthread_self());
cancel_event_helper(master, arg, 0);
}
frr_with_mutex(&master->mtx) {
struct cancel_req *cr =
XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
cr->eventobj = arg;
listnode_add(master->cancel_req, cr);
do_thread_cancel(master);
}
/*
* Cancel ready tasks with an arg matching 'arg'
*
* MT-Unsafe
*
* @param m the thread_master to cancel from
* @param arg the argument passed when creating the event
*/
void thread_cancel_event_ready(struct thread_master *m, void *arg)
{
/* Only cancel ready/event tasks */
cancel_event_helper(m, arg, THREAD_CANCEL_FLAG_READY);
}
/**

View File

@ -46,8 +46,8 @@ PREDECL_HEAP(thread_timer_list)
struct fd_handler {
/* number of pfd that fit in the allocated space of pfds. This is a
* constant
* and is the same for both pfds and copy. */
* constant and is the same for both pfds and copy.
*/
nfds_t pfdsize;
/* file descriptors to monitor for i/o */
@ -231,7 +231,10 @@ extern void _thread_execute(const struct xref_threadsched *xref,
extern void thread_cancel(struct thread **event);
extern void thread_cancel_async(struct thread_master *, struct thread **,
void *);
extern void thread_cancel_event(struct thread_master *, void *);
/* Cancel ready tasks with an arg matching 'arg' */
extern void thread_cancel_event_ready(struct thread_master *m, void *arg);
/* Cancel all tasks with an arg matching 'arg', including timers and io */
extern void thread_cancel_event(struct thread_master *m, void *arg);
extern struct thread *thread_fetch(struct thread_master *, struct thread *);
extern void thread_call(struct thread *);
extern unsigned long thread_timer_remain_second(struct thread *);