lib: mt-safe cancel, round deux

* Update pollfds copy as well as the original
* Keep array count for copy in thread_master
* Remove last remnants of POLLHUP in .events field
* Remove unused snmpcount (lolwut)
* Improve docs
* Add missing do_thread_cancel() call in thread_cancel_event()
* Change thread_fetch() to always enter poll() to avoid starving i/o
* Remember to free up cancel_req when destroying thread_master
* Fix dereference of null pointer
* Fix dead store to timeval
* Fix missing condition for condition variable :-)

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
This commit is contained in:
Quentin Young 2017-06-09 03:40:27 +00:00
parent 63ccb9cb12
commit 8797240ea4
2 changed files with 156 additions and 84 deletions

View File

@ -384,6 +384,7 @@ thread_master_create (void)
rv->owner = pthread_self(); rv->owner = pthread_self();
rv->cancel_req = list_new (); rv->cancel_req = list_new ();
rv->cancel_req->del = cancelreq_del; rv->cancel_req->del = cancelreq_del;
rv->canceled = true;
pipe (rv->io_pipe); pipe (rv->io_pipe);
set_nonblocking (rv->io_pipe[0]); set_nonblocking (rv->io_pipe[0]);
set_nonblocking (rv->io_pipe[1]); set_nonblocking (rv->io_pipe[1]);
@ -542,6 +543,7 @@ thread_master_free (struct thread_master *m)
pthread_mutex_destroy (&m->mtx); pthread_mutex_destroy (&m->mtx);
close (m->io_pipe[0]); close (m->io_pipe[0]);
close (m->io_pipe[1]); close (m->io_pipe[1]);
list_delete (m->cancel_req);
XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
XFREE (MTYPE_THREAD_MASTER, m->handler.copy); XFREE (MTYPE_THREAD_MASTER, m->handler.copy);
@ -646,7 +648,7 @@ thread_get (struct thread_master *m, u_char type,
static int static int
fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
nfds_t count, struct timeval *timer_wait) nfds_t count, const struct timeval *timer_wait)
{ {
/* If timer_wait is null here, that means poll() should block indefinitely, /* If timer_wait is null here, that means poll() should block indefinitely,
* unless the thread_master has overriden it by setting ->selectpoll_timeout. * unless the thread_master has overriden it by setting ->selectpoll_timeout.
@ -875,23 +877,56 @@ funcname_thread_add_event (struct thread_master *m,
/* Thread cancellation ------------------------------------------------------ */ /* Thread cancellation ------------------------------------------------------ */
/**
* NOT's out the .events field of pollfd corresponding to the given file
* descriptor. The event to be NOT'd is passed in the 'state' parameter.
*
* This needs to happen for both copies of pollfd's. See 'thread_fetch'
* implementation for details.
*
* @param master
* @param fd
* @param state the event to cancel. One or more (OR'd together) of the
* following:
* - POLLIN
* - POLLOUT
*/
static void static void
thread_cancel_read_or_write (struct thread *thread, short int state) thread_cancel_rw (struct thread_master *master, int fd, short state)
{ {
for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i) /* Cancel POLLHUP too just in case some bozo set it */
if (thread->master->handler.pfds[i].fd == thread->u.fd) state |= POLLHUP;
{
thread->master->handler.pfds[i].events &= ~(state);
/* remove thread fds from pfd list */ /* find the index of corresponding pollfd */
if (thread->master->handler.pfds[i].events == 0) nfds_t i;
for (i = 0; i < master->handler.pfdcount; i++)
if (master->handler.pfds[i].fd == fd)
break;
/* NOT out event. */
master->handler.pfds[i].events &= ~(state);
/* If all events are canceled, delete / resize the pollfd array. */
if (master->handler.pfds[i].events == 0)
{ {
memmove(thread->master->handler.pfds+i, memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
thread->master->handler.pfds+i+1, (master->handler.pfdcount - i - 1) * sizeof (struct pollfd));
(thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd)); master->handler.pfdcount--;
thread->master->handler.pfdcount--;
return;
} }
/* If we have the same pollfd in the copy, perform the same operations,
* otherwise return. */
if (i >= master->handler.copycount)
return;
master->handler.copy[i].events &= ~(state);
if (master->handler.copy[i].events == 0)
{
memmove(master->handler.copy + i, master->handler.copy + i + 1,
(master->handler.copycount - i - 1) * sizeof (struct pollfd));
master->handler.copycount--;
} }
} }
@ -966,21 +1001,21 @@ do_thread_cancel (struct thread_master *master)
switch (thread->type) switch (thread->type)
{ {
case THREAD_READ: case THREAD_READ:
thread_cancel_read_or_write (thread, POLLIN | POLLHUP); thread_cancel_rw (master, thread->u.fd, POLLIN);
thread_array = thread->master->read; thread_array = master->read;
break; break;
case THREAD_WRITE: case THREAD_WRITE:
thread_cancel_read_or_write (thread, POLLOUT | POLLHUP); thread_cancel_rw (master, thread->u.fd, POLLOUT);
thread_array = thread->master->write; thread_array = master->write;
break; break;
case THREAD_TIMER: case THREAD_TIMER:
queue = thread->master->timer; queue = master->timer;
break; break;
case THREAD_EVENT: case THREAD_EVENT:
list = &thread->master->event; list = &master->event;
break; break;
case THREAD_READY: case THREAD_READY:
list = &thread->master->ready; list = &master->ready;
break; break;
default: default:
continue; continue;
@ -1015,6 +1050,7 @@ do_thread_cancel (struct thread_master *master)
list_delete_all_node (master->cancel_req); list_delete_all_node (master->cancel_req);
/* Wake up any threads which may be blocked in thread_cancel_async() */ /* Wake up any threads which may be blocked in thread_cancel_async() */
master->canceled = true;
pthread_cond_broadcast (&master->cancel_cond); pthread_cond_broadcast (&master->cancel_cond);
} }
@ -1036,6 +1072,7 @@ thread_cancel_event (struct thread_master *master, void *arg)
struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
cr->eventobj = arg; cr->eventobj = arg;
listnode_add (master->cancel_req, cr); listnode_add (master->cancel_req, cr);
do_thread_cancel(master);
} }
pthread_mutex_unlock (&master->mtx); pthread_mutex_unlock (&master->mtx);
} }
@ -1065,34 +1102,55 @@ thread_cancel (struct thread *thread)
/** /**
* Asynchronous cancellation. * Asynchronous cancellation.
* *
* Called with a pointer to a thread, this function posts a cancellation * Called with either a struct thread ** or void * to an event argument,
* request and blocks until it is serviced. * this function posts the correct cancellation request and blocks until it is
* serviced.
* *
* If the thread is currently running, execution blocks until it completes. * If the thread is currently running, execution blocks until it completes.
* *
* The last two parameters are mutually exclusive, i.e. if you pass one the
* other must be NULL.
*
* When the cancellation procedure executes on the target thread_master, the
* thread * provided is checked for nullity. If it is null, the thread is
* assumed to no longer exist and the cancellation request is a no-op. Thus
* users of this API must pass a back-reference when scheduling the original
* task.
*
* MT-Safe * MT-Safe
* *
* @param thread the thread to cancel * @param master the thread master with the relevant event / task
* @param thread pointer to thread to cancel
* @param eventobj the event
*/ */
void void
thread_cancel_async (struct thread_master *master, struct thread **thread, void *eventobj) thread_cancel_async (struct thread_master *master, struct thread **thread,
void *eventobj)
{ {
assert (!(thread && eventobj) && (thread || eventobj)); assert (!(thread && eventobj) && (thread || eventobj));
assert (master->owner != pthread_self()); assert (master->owner != pthread_self());
pthread_mutex_lock (&master->mtx); pthread_mutex_lock (&master->mtx);
{ {
if (*thread) { master->canceled = false;
struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
if (thread)
{
struct cancel_req *cr =
XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
cr->threadref = thread; cr->threadref = thread;
listnode_add (master->cancel_req, cr); listnode_add (master->cancel_req, cr);
} }
else if (eventobj) { else if (eventobj)
struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); {
struct cancel_req *cr =
XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
cr->eventobj = eventobj; cr->eventobj = eventobj;
listnode_add (master->cancel_req, cr); listnode_add (master->cancel_req, cr);
} }
AWAKEN (master); AWAKEN (master);
while (!master->canceled)
pthread_cond_wait (&master->cancel_cond, &master->mtx); pthread_cond_wait (&master->cancel_cond, &master->mtx);
} }
pthread_mutex_unlock (&master->mtx); pthread_mutex_unlock (&master->mtx);
@ -1143,13 +1201,22 @@ thread_process_io_helper (struct thread_master *m, struct thread *thread,
return 1; return 1;
} }
/**
* Process I/O events.
*
* Walks through file descriptor array looking for those pollfds whose .revents
* field has something interesting. Deletes any invalid file descriptors.
*
* @param m the thread master
* @param num the number of active file descriptors (return value of poll())
*/
static void static void
thread_process_io (struct thread_master *m, struct pollfd *pfds, thread_process_io (struct thread_master *m, unsigned int num)
unsigned int num, unsigned int count)
{ {
unsigned int ready = 0; unsigned int ready = 0;
struct pollfd *pfds = m->handler.copy;
for (nfds_t i = 0; i < count && ready < num ; ++i) for (nfds_t i = 0; i < m->handler.copycount && ready < num ; ++i)
{ {
/* no event for current fd? immediately continue */ /* no event for current fd? immediately continue */
if (pfds[i].revents == 0) if (pfds[i].revents == 0)
@ -1178,8 +1245,9 @@ thread_process_io (struct thread_master *m, struct pollfd *pfds,
m->handler.pfdcount--; m->handler.pfdcount--;
memmove (pfds + i, pfds + i + 1, memmove (pfds + i, pfds + i + 1,
(count - i - 1) * sizeof(struct pollfd)); (m->handler.copycount - i - 1) * sizeof(struct pollfd));
count--; m->handler.copycount--;
i--; i--;
} }
} }
@ -1231,14 +1299,14 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
{ {
struct thread *thread = NULL; struct thread *thread = NULL;
struct timeval now; struct timeval now;
struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 }; struct timeval zerotime = { 0, 0 };
struct timeval *timer_wait = &timer_val; struct timeval timer_val;
unsigned int count; struct timeval *timer_wait;
int num = 0; int num = 0;
do { do {
/* 1. Handle signals if any */ /* Handle signals if any */
if (m->handle_signals) if (m->handle_signals)
quagga_sigevent_process (); quagga_sigevent_process ();
@ -1254,25 +1322,26 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
/* If there are no tasks on the ready queue, we will poll() until a timer /* If there are no tasks on the ready queue, we will poll() until a timer
* expires or we receive I/O, whichever comes first. The strategy for doing * expires or we receive I/O, whichever comes first. The strategy for doing
* this is to set the poll() timeout to the time remaining until the next * this is to set the poll() timeout to the time remaining until the next
* timer expires. */ * timer expires. We need to hit poll() at least once per loop to avoid
if (m->ready.count == 0) * starvation by events. */
{
/* timer_wait will be NULL if there are no pending timers */
timer_wait = thread_timer_wait (m->timer, &timer_val); timer_wait = thread_timer_wait (m->timer, &timer_val);
/* If negative timeout, we wish to poll() indefinitely. */ /* If negative timeout, we wish to perform a nonblocking poll() */
if (timer_wait && timer_wait->tv_sec < 0) if (timer_wait && !timercmp (timer_wait, &zerotime, >))
{ timer_wait = &zerotime;
timerclear(&timer_val);
timer_wait = &timer_val;
}
/* Calculate number of file descriptors and make a temporary copy */ /* Copy pollfd array + # active pollfds in it. Not necessary to copy
count = m->handler.pfdcount + m->handler.pfdcountsnmp; * the array size as this is fixed. */
memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd)); m->handler.copycount = m->handler.pfdcount;
memcpy (m->handler.copy, m->handler.pfds,
m->handler.copycount * sizeof (struct pollfd));
pthread_mutex_unlock (&m->mtx); pthread_mutex_unlock (&m->mtx);
{ {
num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait); num = fd_poll (m, m->handler.copy, m->handler.pfdsize,
m->handler.copycount, timer_wait);
} }
pthread_mutex_lock (&m->mtx); pthread_mutex_lock (&m->mtx);
@ -1288,7 +1357,6 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
pthread_mutex_unlock (&m->mtx); pthread_mutex_unlock (&m->mtx);
return NULL; return NULL;
} }
}
/* Since we could have received more cancellation requests during poll(), process those */ /* Since we could have received more cancellation requests during poll(), process those */
do_thread_cancel (m); do_thread_cancel (m);
@ -1299,7 +1367,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
/* Post I/O to ready queue. */ /* Post I/O to ready queue. */
if (num > 0) if (num > 0)
thread_process_io (m, m->handler.copy, num, count); thread_process_io (m, num);
/* If we have a ready task, break the loop and return it to the caller */ /* If we have a ready task, break the loop and return it to the caller */
if ((thread = thread_trim_head (&m->ready))) if ((thread = thread_trim_head (&m->ready)))

View File

@ -47,16 +47,19 @@ struct pqueue;
struct fd_handler struct fd_handler
{ {
/* number of pfd stored in pfds */ /* number of pfd that fit in the allocated space of pfds. This is a constant
nfds_t pfdcount; * and is the same for both pfds and copy. */
/* number of pfd stored in pfds + number of snmp pfd */
nfds_t pfdcountsnmp;
/* number of pfd that fit in the allocated space of pfds */
nfds_t pfdsize; nfds_t pfdsize;
/* file descriptors to monitor for i/o */ /* file descriptors to monitor for i/o */
struct pollfd *pfds; struct pollfd *pfds;
/* number of pollfds stored in pfds */
nfds_t pfdcount;
/* chunk used for temp copy of pollfds */ /* chunk used for temp copy of pollfds */
struct pollfd *copy; struct pollfd *copy;
/* number of pollfds stored in copy */
nfds_t copycount;
}; };
struct cancel_req { struct cancel_req {
@ -75,6 +78,7 @@ struct thread_master
struct thread_list ready; struct thread_list ready;
struct thread_list unuse; struct thread_list unuse;
struct list *cancel_req; struct list *cancel_req;
bool canceled;
pthread_cond_t cancel_cond; pthread_cond_t cancel_cond;
int io_pipe[2]; int io_pipe[2];
int fd_limit; int fd_limit;