From 63ccb9cb12b559be539bc406f1b9ffc5cb3ac16a Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 7 Jun 2017 20:34:09 +0000 Subject: [PATCH 1/3] lib: add mt-safe thread_cancel This patch implements an MT-safe version of thread_cancel() in thread_cancel_async(). Behavior as follows: * Cancellation requests are queued into a list * Cancellation requests made from the same pthread as the thread_master owner are serviced immediately (thread_cancel()) * Cancellation requests made from a separate pthread are queued and the call blocks on a condition variable until the owning pthread services the request, at which point the condition variable is signaled and execution continues (thread_cancel_async()) Signed-off-by: Quentin Young --- lib/thread.c | 451 ++++++++++++++++++++++++++++++--------------------- lib/thread.h | 11 +- 2 files changed, 277 insertions(+), 185 deletions(-) diff --git a/lib/thread.c b/lib/thread.c index bf3500fd8b..fad80ffc32 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -328,6 +328,12 @@ thread_timer_update(void *node, int actual_position) thread->index = actual_position; } +static void +cancelreq_del (void *cr) +{ + XFREE (MTYPE_TMP, cr); +} + /* Allocate new thread master. */ struct thread_master * thread_master_create (void) @@ -351,6 +357,7 @@ thread_master_create (void) return NULL; pthread_mutex_init (&rv->mtx, NULL); + pthread_cond_init (&rv->cancel_cond, NULL); rv->fd_limit = (int)limit.rlim_cur; rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit); @@ -375,6 +382,8 @@ thread_master_create (void) rv->spin = true; rv->handle_signals = true; rv->owner = pthread_self(); + rv->cancel_req = list_new (); + rv->cancel_req->del = cancelreq_del; pipe (rv->io_pipe); set_nonblocking (rv->io_pipe[0]); set_nonblocking (rv->io_pipe[1]); @@ -864,6 +873,8 @@ funcname_thread_add_event (struct thread_master *m, return thread; } +/* Thread cancellation ------------------------------------------------------ */ + static void thread_cancel_read_or_write (struct thread *thread, short int state) { @@ -885,130 +896,209 @@ thread_cancel_read_or_write (struct thread *thread, short int state) } /** - * Cancel thread from scheduler. + * Process cancellation requests. * - * This function is *NOT* MT-safe. DO NOT call it from any other pthread except - * the one which owns thread->master. You will crash. + * This may only be run from the pthread which owns the thread_master. + * + * @param master the thread master to process + * @REQUIRE master->mtx */ -void -thread_cancel (struct thread *thread) +static void +do_thread_cancel (struct thread_master *master) { struct thread_list *list = NULL; struct pqueue *queue = NULL; struct thread **thread_array = NULL; - - pthread_mutex_lock (&thread->mtx); - pthread_mutex_lock (&thread->master->mtx); - - assert (pthread_self() == thread->master->owner); - - switch (thread->type) - { - case THREAD_READ: - thread_cancel_read_or_write (thread, POLLIN | POLLHUP); - thread_array = thread->master->read; - break; - case THREAD_WRITE: - thread_cancel_read_or_write (thread, POLLOUT | POLLHUP); - thread_array = thread->master->write; - break; - case THREAD_TIMER: - queue = thread->master->timer; - break; - case THREAD_EVENT: - list = &thread->master->event; - break; - case THREAD_READY: - list = &thread->master->ready; - break; - default: - goto done; - break; - } - - if (queue) - { - assert(thread->index >= 0); - pqueue_remove (thread, queue); - } - else if (list) - { - thread_list_delete (list, thread); - } - else if (thread_array) - { - thread_array[thread->u.fd] = NULL; - } - else - { - assert(!"Thread should be either in queue or list or array!"); - } - - if (thread->ref) - *thread->ref = NULL; - - thread_add_unuse (thread->master, thread); - -done: - pthread_mutex_unlock (&thread->master->mtx); - pthread_mutex_unlock (&thread->mtx); -} - -/* Delete all events which has argument value arg. */ -unsigned int -thread_cancel_event (struct thread_master *m, void *arg) -{ - unsigned int ret = 0; struct thread *thread; - struct thread *t; - pthread_mutex_lock (&m->mtx); - { - thread = m->event.head; - while (thread) - { - t = thread; - pthread_mutex_lock (&t->mtx); + struct cancel_req *cr; + struct listnode *ln; + for (ALL_LIST_ELEMENTS_RO (master->cancel_req, ln, cr)) + { + /* If this is an event object cancellation, linear search through event + * list deleting any events which have the specified argument. We also + * need to check every thread in the ready queue. */ + if (cr->eventobj) { - thread = t->next; + struct thread *t; + thread = master->event.head; - if (t->arg == arg) + while (thread) { - ret++; - thread_list_delete (&m->event, t); - if (t->ref) - *t->ref = NULL; - thread_add_unuse (m, t); - } - } - pthread_mutex_unlock (&t->mtx); - } + t = thread; + thread = t->next; - /* thread can be on the ready list too */ - thread = m->ready.head; - while (thread) - { - t = thread; - pthread_mutex_lock (&t->mtx); + if (t->arg == cr->eventobj) + { + thread_list_delete (&master->event, t); + if (t->ref) + *t->ref = NULL; + thread_add_unuse (master, t); + } + } + + thread = master->ready.head; + while (thread) + { + t = thread; + thread = t->next; + + if (t->arg == cr->eventobj) + { + thread_list_delete (&master->ready, t); + if (t->ref) + *t->ref = NULL; + thread_add_unuse (master, t); + } + } + continue; + } + + /* The pointer varies depending on whether the cancellation request was + * made asynchronously or not. If it was, we need to check whether the + * thread even exists anymore before cancelling it. */ + thread = (cr->thread) ? cr->thread : *cr->threadref; + + if (!thread) + continue; + + /* Determine the appropriate queue to cancel the thread from */ + switch (thread->type) { - thread = t->next; - - if (t->arg == arg) - { - ret++; - thread_list_delete (&m->ready, t); - if (t->ref) - *t->ref = NULL; - thread_add_unuse (m, t); - } + case THREAD_READ: + thread_cancel_read_or_write (thread, POLLIN | POLLHUP); + thread_array = thread->master->read; + break; + case THREAD_WRITE: + thread_cancel_read_or_write (thread, POLLOUT | POLLHUP); + thread_array = thread->master->write; + break; + case THREAD_TIMER: + queue = thread->master->timer; + break; + case THREAD_EVENT: + list = &thread->master->event; + break; + case THREAD_READY: + list = &thread->master->ready; + break; + default: + continue; + break; } - pthread_mutex_unlock (&t->mtx); - } - } - pthread_mutex_unlock (&m->mtx); - return ret; + + if (queue) + { + assert(thread->index >= 0); + pqueue_remove (thread, queue); + } + else if (list) + { + thread_list_delete (list, thread); + } + else if (thread_array) + { + thread_array[thread->u.fd] = NULL; + } + else + { + assert(!"Thread should be either in queue or list or array!"); + } + + if (thread->ref) + *thread->ref = NULL; + + thread_add_unuse (thread->master, thread); + } + + /* Delete and free all cancellation requests */ + list_delete_all_node (master->cancel_req); + + /* Wake up any threads which may be blocked in thread_cancel_async() */ + pthread_cond_broadcast (&master->cancel_cond); } +/** + * Cancel any events which have the specified argument. + * + * MT-Unsafe + * + * @param m the thread_master to cancel from + * @param arg the argument passed when creating the event + */ +void +thread_cancel_event (struct thread_master *master, void *arg) +{ + assert (master->owner == pthread_self()); + + pthread_mutex_lock (&master->mtx); + { + struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); + cr->eventobj = arg; + listnode_add (master->cancel_req, cr); + } + pthread_mutex_unlock (&master->mtx); +} + +/** + * Cancel a specific task. + * + * MT-Unsafe + * + * @param thread task to cancel + */ +void +thread_cancel (struct thread *thread) +{ + assert (thread->master->owner == pthread_self()); + + pthread_mutex_lock (&thread->master->mtx); + { + struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); + cr->thread = thread; + listnode_add (thread->master->cancel_req, cr); + do_thread_cancel (thread->master); + } + pthread_mutex_unlock (&thread->master->mtx); +} + +/** + * Asynchronous cancellation. + * + * Called with a pointer to a thread, this function posts a cancellation + * request and blocks until it is serviced. + * + * If the thread is currently running, execution blocks until it completes. + * + * MT-Safe + * + * @param thread the thread to cancel + */ +void +thread_cancel_async (struct thread_master *master, struct thread **thread, void *eventobj) +{ + assert (!(thread && eventobj) && (thread || eventobj)); + assert (master->owner != pthread_self()); + + pthread_mutex_lock (&master->mtx); + { + if (*thread) { + struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); + cr->threadref = thread; + listnode_add (master->cancel_req, cr); + } + else if (eventobj) { + struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); + cr->eventobj = eventobj; + listnode_add (master->cancel_req, cr); + } + AWAKEN (master); + pthread_cond_wait (&master->cancel_cond, &master->mtx); + } + pthread_mutex_unlock (&master->mtx); +} +/* ------------------------------------------------------------------------- */ + static struct timeval * thread_timer_wait (struct pqueue *queue, struct timeval *timer_val) { @@ -1139,98 +1229,91 @@ thread_process (struct thread_list *list) struct thread * thread_fetch (struct thread_master *m, struct thread *fetch) { - struct thread *thread; + struct thread *thread = NULL; struct timeval now; struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 }; struct timeval *timer_wait = &timer_val; + unsigned int count; - do - { - int num = 0; + int num = 0; - /* Signals pre-empt everything */ - if (m->handle_signals) - quagga_sigevent_process (); - - pthread_mutex_lock (&m->mtx); - /* Drain the ready queue of already scheduled jobs, before scheduling - * more. - */ - if ((thread = thread_trim_head (&m->ready)) != NULL) - { - fetch = thread_run (m, thread, fetch); - if (fetch->ref) - *fetch->ref = NULL; - pthread_mutex_unlock (&m->mtx); - return fetch; - } - - /* To be fair to all kinds of threads, and avoid starvation, we - * need to be careful to consider all thread types for scheduling - * in each quanta. I.e. we should not return early from here on. - */ - - /* Normal event are the next highest priority. */ - thread_process (&m->event); - - /* Calculate select wait timer if nothing else to do */ - if (m->ready.count == 0) - { - timer_wait = thread_timer_wait (m->timer, &timer_val); - } + do { + /* 1. Handle signals if any */ + if (m->handle_signals) + quagga_sigevent_process (); - if (timer_wait && timer_wait->tv_sec < 0) - { - timerclear(&timer_val); - timer_wait = &timer_val; - } + pthread_mutex_lock (&m->mtx); - unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp; - memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd)); + /* Process any pending cancellation requests */ + do_thread_cancel (m); - pthread_mutex_unlock (&m->mtx); + /* Post events to ready queue. This must come before the following block + * since events should occur immediately */ + thread_process (&m->event); + + /* If there are no tasks on the ready queue, we will poll() until a timer + * expires or we receive I/O, whichever comes first. The strategy for doing + * this is to set the poll() timeout to the time remaining until the next + * timer expires. */ + if (m->ready.count == 0) { - num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait); + timer_wait = thread_timer_wait (m->timer, &timer_val); + + /* If negative timeout, we wish to poll() indefinitely. */ + if (timer_wait && timer_wait->tv_sec < 0) + { + timerclear(&timer_val); + timer_wait = &timer_val; + } + + /* Calculate number of file descriptors and make a temporary copy */ + count = m->handler.pfdcount + m->handler.pfdcountsnmp; + memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd)); + + pthread_mutex_unlock (&m->mtx); + { + num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait); + } + pthread_mutex_lock (&m->mtx); + + /* Handle any errors received in poll() */ + if (num < 0) + { + if (errno == EINTR) + { + pthread_mutex_unlock (&m->mtx); + continue; /* loop around to signal handler */ + } + zlog_warn ("poll() error: %s", safe_strerror (errno)); + pthread_mutex_unlock (&m->mtx); + return NULL; + } } - pthread_mutex_lock (&m->mtx); - /* Signals should get quick treatment */ - if (num < 0) - { - if (errno == EINTR) - { - pthread_mutex_unlock (&m->mtx); - continue; /* signal received - process it */ - } - zlog_warn ("poll() error: %s", safe_strerror (errno)); - pthread_mutex_unlock (&m->mtx); - return NULL; - } + /* Since we could have received more cancellation requests during poll(), process those */ + do_thread_cancel (m); - /* Check foreground timers. Historically, they have had higher - * priority than I/O threads, so let's push them onto the ready - * list in front of the I/O threads. */ - monotime(&now); - thread_process_timers (m->timer, &now); - - /* Got IO, process it */ - if (num > 0) - thread_process_io (m, m->handler.copy, num, count); + /* Post timers to ready queue. */ + monotime(&now); + thread_process_timers (m->timer, &now); - if ((thread = thread_trim_head (&m->ready)) != NULL) - { - fetch = thread_run (m, thread, fetch); - if (fetch->ref) - *fetch->ref = NULL; - pthread_mutex_unlock (&m->mtx); - return fetch; - } + /* Post I/O to ready queue. */ + if (num > 0) + thread_process_io (m, m->handler.copy, num, count); - pthread_mutex_unlock (&m->mtx); + /* If we have a ready task, break the loop and return it to the caller */ + if ((thread = thread_trim_head (&m->ready))) + { + fetch = thread_run (m, thread, fetch); + if (fetch->ref) + *fetch->ref = NULL; + } - } while (m->spin); + pthread_mutex_unlock (&m->mtx); - return NULL; + } while (!thread && m->spin); + + return fetch; } unsigned long diff --git a/lib/thread.h b/lib/thread.h index 86f839810f..d49d325f75 100644 --- a/lib/thread.h +++ b/lib/thread.h @@ -59,6 +59,12 @@ struct fd_handler struct pollfd *copy; }; +struct cancel_req { + struct thread *thread; + void *eventobj; + struct thread **threadref; +}; + /* Master of the theads. */ struct thread_master { @@ -68,6 +74,8 @@ struct thread_master struct thread_list event; struct thread_list ready; struct thread_list unuse; + struct list *cancel_req; + pthread_cond_t cancel_cond; int io_pipe[2]; int fd_limit; struct fd_handler handler; @@ -189,7 +197,8 @@ extern void funcname_thread_execute (struct thread_master *, #undef debugargdef extern void thread_cancel (struct thread *); -extern unsigned int thread_cancel_event (struct thread_master *, void *); +extern void thread_cancel_async (struct thread_master *, struct thread **, void *); +extern void thread_cancel_event (struct thread_master *, void *); extern struct thread *thread_fetch (struct thread_master *, struct thread *); extern void thread_call (struct thread *); extern unsigned long thread_timer_remain_second (struct thread *); From 8797240ea4de5163f8633c1c561edbfd8ecf1c2a Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Fri, 9 Jun 2017 03:40:27 +0000 Subject: [PATCH 2/3] lib: mt-safe cancel, round deux * Update pollfds copy as well as the original * Keep array count for copy in thread_master * Remove last remnants of POLLHUP in .events field * Remove unused snmpcount (lolwut) * Improve docs * Add missing do_thread_cancel() call in thread_cancel_event() * Change thread_fetch() to always enter poll() to avoid starving i/o * Remember to free up cancel_req when destroying thread_master * Fix dereference of null pointer * Fix dead store to timeval * Fix missing condition for condition variable :-) Signed-off-by: Quentin Young --- lib/thread.c | 226 +++++++++++++++++++++++++++++++++------------------ lib/thread.h | 14 ++-- 2 files changed, 156 insertions(+), 84 deletions(-) diff --git a/lib/thread.c b/lib/thread.c index fad80ffc32..3f6945ca02 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -384,6 +384,7 @@ thread_master_create (void) rv->owner = pthread_self(); rv->cancel_req = list_new (); rv->cancel_req->del = cancelreq_del; + rv->canceled = true; pipe (rv->io_pipe); set_nonblocking (rv->io_pipe[0]); set_nonblocking (rv->io_pipe[1]); @@ -542,6 +543,7 @@ thread_master_free (struct thread_master *m) pthread_mutex_destroy (&m->mtx); close (m->io_pipe[0]); close (m->io_pipe[1]); + list_delete (m->cancel_req); XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); XFREE (MTYPE_THREAD_MASTER, m->handler.copy); @@ -646,7 +648,7 @@ thread_get (struct thread_master *m, u_char type, static int fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, - nfds_t count, struct timeval *timer_wait) + nfds_t count, const struct timeval *timer_wait) { /* If timer_wait is null here, that means poll() should block indefinitely, * unless the thread_master has overriden it by setting ->selectpoll_timeout. @@ -875,24 +877,57 @@ funcname_thread_add_event (struct thread_master *m, /* Thread cancellation ------------------------------------------------------ */ +/** + * NOT's out the .events field of pollfd corresponding to the given file + * descriptor. The event to be NOT'd is passed in the 'state' parameter. + * + * This needs to happen for both copies of pollfd's. See 'thread_fetch' + * implementation for details. + * + * @param master + * @param fd + * @param state the event to cancel. One or more (OR'd together) of the + * following: + * - POLLIN + * - POLLOUT + */ static void -thread_cancel_read_or_write (struct thread *thread, short int state) +thread_cancel_rw (struct thread_master *master, int fd, short state) { - for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i) - if (thread->master->handler.pfds[i].fd == thread->u.fd) - { - thread->master->handler.pfds[i].events &= ~(state); + /* Cancel POLLHUP too just in case some bozo set it */ + state |= POLLHUP; - /* remove thread fds from pfd list */ - if (thread->master->handler.pfds[i].events == 0) - { - memmove(thread->master->handler.pfds+i, - thread->master->handler.pfds+i+1, - (thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd)); - thread->master->handler.pfdcount--; - return; - } - } + /* find the index of corresponding pollfd */ + nfds_t i; + + for (i = 0; i < master->handler.pfdcount; i++) + if (master->handler.pfds[i].fd == fd) + break; + + /* NOT out event. */ + master->handler.pfds[i].events &= ~(state); + + /* If all events are canceled, delete / resize the pollfd array. */ + if (master->handler.pfds[i].events == 0) + { + memmove(master->handler.pfds + i, master->handler.pfds + i + 1, + (master->handler.pfdcount - i - 1) * sizeof (struct pollfd)); + master->handler.pfdcount--; + } + + /* If we have the same pollfd in the copy, perform the same operations, + * otherwise return. */ + if (i >= master->handler.copycount) + return; + + master->handler.copy[i].events &= ~(state); + + if (master->handler.copy[i].events == 0) + { + memmove(master->handler.copy + i, master->handler.copy + i + 1, + (master->handler.copycount - i - 1) * sizeof (struct pollfd)); + master->handler.copycount--; + } } /** @@ -966,21 +1001,21 @@ do_thread_cancel (struct thread_master *master) switch (thread->type) { case THREAD_READ: - thread_cancel_read_or_write (thread, POLLIN | POLLHUP); - thread_array = thread->master->read; + thread_cancel_rw (master, thread->u.fd, POLLIN); + thread_array = master->read; break; case THREAD_WRITE: - thread_cancel_read_or_write (thread, POLLOUT | POLLHUP); - thread_array = thread->master->write; + thread_cancel_rw (master, thread->u.fd, POLLOUT); + thread_array = master->write; break; case THREAD_TIMER: - queue = thread->master->timer; + queue = master->timer; break; case THREAD_EVENT: - list = &thread->master->event; + list = &master->event; break; case THREAD_READY: - list = &thread->master->ready; + list = &master->ready; break; default: continue; @@ -1015,6 +1050,7 @@ do_thread_cancel (struct thread_master *master) list_delete_all_node (master->cancel_req); /* Wake up any threads which may be blocked in thread_cancel_async() */ + master->canceled = true; pthread_cond_broadcast (&master->cancel_cond); } @@ -1036,6 +1072,7 @@ thread_cancel_event (struct thread_master *master, void *arg) struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); cr->eventobj = arg; listnode_add (master->cancel_req, cr); + do_thread_cancel(master); } pthread_mutex_unlock (&master->mtx); } @@ -1065,35 +1102,56 @@ thread_cancel (struct thread *thread) /** * Asynchronous cancellation. * - * Called with a pointer to a thread, this function posts a cancellation - * request and blocks until it is serviced. + * Called with either a struct thread ** or void * to an event argument, + * this function posts the correct cancellation request and blocks until it is + * serviced. * * If the thread is currently running, execution blocks until it completes. * + * The last two parameters are mutually exclusive, i.e. if you pass one the + * other must be NULL. + * + * When the cancellation procedure executes on the target thread_master, the + * thread * provided is checked for nullity. If it is null, the thread is + * assumed to no longer exist and the cancellation request is a no-op. Thus + * users of this API must pass a back-reference when scheduling the original + * task. + * * MT-Safe * - * @param thread the thread to cancel + * @param master the thread master with the relevant event / task + * @param thread pointer to thread to cancel + * @param eventobj the event */ void -thread_cancel_async (struct thread_master *master, struct thread **thread, void *eventobj) +thread_cancel_async (struct thread_master *master, struct thread **thread, + void *eventobj) { assert (!(thread && eventobj) && (thread || eventobj)); assert (master->owner != pthread_self()); pthread_mutex_lock (&master->mtx); { - if (*thread) { - struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); - cr->threadref = thread; - listnode_add (master->cancel_req, cr); - } - else if (eventobj) { - struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); - cr->eventobj = eventobj; - listnode_add (master->cancel_req, cr); - } + master->canceled = false; + + if (thread) + { + struct cancel_req *cr = + XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); + cr->threadref = thread; + listnode_add (master->cancel_req, cr); + } + else if (eventobj) + { + struct cancel_req *cr = + XCALLOC (MTYPE_TMP, sizeof (struct cancel_req)); + cr->eventobj = eventobj; + listnode_add (master->cancel_req, cr); + } AWAKEN (master); - pthread_cond_wait (&master->cancel_cond, &master->mtx); + + while (!master->canceled) + pthread_cond_wait (&master->cancel_cond, &master->mtx); } pthread_mutex_unlock (&master->mtx); } @@ -1143,13 +1201,22 @@ thread_process_io_helper (struct thread_master *m, struct thread *thread, return 1; } +/** + * Process I/O events. + * + * Walks through file descriptor array looking for those pollfds whose .revents + * field has something interesting. Deletes any invalid file descriptors. + * + * @param m the thread master + * @param num the number of active file descriptors (return value of poll()) + */ static void -thread_process_io (struct thread_master *m, struct pollfd *pfds, - unsigned int num, unsigned int count) +thread_process_io (struct thread_master *m, unsigned int num) { unsigned int ready = 0; + struct pollfd *pfds = m->handler.copy; - for (nfds_t i = 0; i < count && ready < num ; ++i) + for (nfds_t i = 0; i < m->handler.copycount && ready < num ; ++i) { /* no event for current fd? immediately continue */ if (pfds[i].revents == 0) @@ -1178,8 +1245,9 @@ thread_process_io (struct thread_master *m, struct pollfd *pfds, m->handler.pfdcount--; memmove (pfds + i, pfds + i + 1, - (count - i - 1) * sizeof(struct pollfd)); - count--; + (m->handler.copycount - i - 1) * sizeof(struct pollfd)); + m->handler.copycount--; + i--; } } @@ -1231,14 +1299,14 @@ thread_fetch (struct thread_master *m, struct thread *fetch) { struct thread *thread = NULL; struct timeval now; - struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 }; - struct timeval *timer_wait = &timer_val; - unsigned int count; + struct timeval zerotime = { 0, 0 }; + struct timeval timer_val; + struct timeval *timer_wait; int num = 0; do { - /* 1. Handle signals if any */ + /* Handle signals if any */ if (m->handle_signals) quagga_sigevent_process (); @@ -1254,41 +1322,41 @@ thread_fetch (struct thread_master *m, struct thread *fetch) /* If there are no tasks on the ready queue, we will poll() until a timer * expires or we receive I/O, whichever comes first. The strategy for doing * this is to set the poll() timeout to the time remaining until the next - * timer expires. */ - if (m->ready.count == 0) - { - timer_wait = thread_timer_wait (m->timer, &timer_val); + * timer expires. We need to hit poll() at least once per loop to avoid + * starvation by events. */ - /* If negative timeout, we wish to poll() indefinitely. */ - if (timer_wait && timer_wait->tv_sec < 0) - { - timerclear(&timer_val); - timer_wait = &timer_val; - } + /* timer_wait will be NULL if there are no pending timers */ + timer_wait = thread_timer_wait (m->timer, &timer_val); - /* Calculate number of file descriptors and make a temporary copy */ - count = m->handler.pfdcount + m->handler.pfdcountsnmp; - memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd)); + /* If negative timeout, we wish to perform a nonblocking poll() */ + if (timer_wait && !timercmp (timer_wait, &zerotime, >)) + timer_wait = &zerotime; - pthread_mutex_unlock (&m->mtx); - { - num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait); - } - pthread_mutex_lock (&m->mtx); + /* Copy pollfd array + # active pollfds in it. Not necessary to copy + * the array size as this is fixed. */ + m->handler.copycount = m->handler.pfdcount; + memcpy (m->handler.copy, m->handler.pfds, + m->handler.copycount * sizeof (struct pollfd)); - /* Handle any errors received in poll() */ - if (num < 0) - { - if (errno == EINTR) - { - pthread_mutex_unlock (&m->mtx); - continue; /* loop around to signal handler */ - } - zlog_warn ("poll() error: %s", safe_strerror (errno)); - pthread_mutex_unlock (&m->mtx); - return NULL; - } - } + pthread_mutex_unlock (&m->mtx); + { + num = fd_poll (m, m->handler.copy, m->handler.pfdsize, + m->handler.copycount, timer_wait); + } + pthread_mutex_lock (&m->mtx); + + /* Handle any errors received in poll() */ + if (num < 0) + { + if (errno == EINTR) + { + pthread_mutex_unlock (&m->mtx); + continue; /* loop around to signal handler */ + } + zlog_warn ("poll() error: %s", safe_strerror (errno)); + pthread_mutex_unlock (&m->mtx); + return NULL; + } /* Since we could have received more cancellation requests during poll(), process those */ do_thread_cancel (m); @@ -1299,7 +1367,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch) /* Post I/O to ready queue. */ if (num > 0) - thread_process_io (m, m->handler.copy, num, count); + thread_process_io (m, num); /* If we have a ready task, break the loop and return it to the caller */ if ((thread = thread_trim_head (&m->ready))) diff --git a/lib/thread.h b/lib/thread.h index d49d325f75..e48068b174 100644 --- a/lib/thread.h +++ b/lib/thread.h @@ -47,16 +47,19 @@ struct pqueue; struct fd_handler { - /* number of pfd stored in pfds */ - nfds_t pfdcount; - /* number of pfd stored in pfds + number of snmp pfd */ - nfds_t pfdcountsnmp; - /* number of pfd that fit in the allocated space of pfds */ + /* number of pfd that fit in the allocated space of pfds. This is a constant + * and is the same for both pfds and copy. */ nfds_t pfdsize; + /* file descriptors to monitor for i/o */ struct pollfd *pfds; + /* number of pollfds stored in pfds */ + nfds_t pfdcount; + /* chunk used for temp copy of pollfds */ struct pollfd *copy; + /* number of pollfds stored in copy */ + nfds_t copycount; }; struct cancel_req { @@ -75,6 +78,7 @@ struct thread_master struct thread_list ready; struct thread_list unuse; struct list *cancel_req; + bool canceled; pthread_cond_t cancel_cond; int io_pipe[2]; int fd_limit; From 7feb7d7e6581213ae3eb6010c3036478f9e90158 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 14 Jun 2017 17:06:10 +0000 Subject: [PATCH 3/3] lib: tighten up exit semantics for thread_fetch() * Account for the pipe poker in poll() by explicitly returning NULL when we have no events, timers or file descriptors to work with * Add a comment explaining exactly what we are doing and why Signed-off-by: Quentin Young --- lib/thread.c | 83 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 33 deletions(-) diff --git a/lib/thread.c b/lib/thread.c index 3f6945ca02..02108bc6bd 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -1300,8 +1300,8 @@ thread_fetch (struct thread_master *m, struct thread *fetch) struct thread *thread = NULL; struct timeval now; struct timeval zerotime = { 0, 0 }; - struct timeval timer_val; - struct timeval *timer_wait; + struct timeval tv; + struct timeval *tw; int num = 0; @@ -1321,42 +1321,59 @@ thread_fetch (struct thread_master *m, struct thread *fetch) /* If there are no tasks on the ready queue, we will poll() until a timer * expires or we receive I/O, whichever comes first. The strategy for doing - * this is to set the poll() timeout to the time remaining until the next - * timer expires. We need to hit poll() at least once per loop to avoid - * starvation by events. */ + * this is: + * + * - If there are events pending, set the poll() timeout to zero + * - If there are no events pending, but there are timers pending, set the + * timeout to the smallest remaining time on any timer + * - If there are neither timers nor events pending, but there are file + * descriptors pending, block indefinitely in poll() + * - If nothing is pending, it's time for the application to die + * + * In every case except the last, we need to hit poll() at least once per + * loop to avoid starvation by events */ - /* timer_wait will be NULL if there are no pending timers */ - timer_wait = thread_timer_wait (m->timer, &timer_val); + if (m->ready.count == 0) + tw = thread_timer_wait (m->timer, &tv); - /* If negative timeout, we wish to perform a nonblocking poll() */ - if (timer_wait && !timercmp (timer_wait, &zerotime, >)) - timer_wait = &zerotime; + if (m->ready.count != 0 || (tw && !timercmp (tw, &zerotime, >))) + tw = &zerotime; - /* Copy pollfd array + # active pollfds in it. Not necessary to copy - * the array size as this is fixed. */ - m->handler.copycount = m->handler.pfdcount; - memcpy (m->handler.copy, m->handler.pfds, - m->handler.copycount * sizeof (struct pollfd)); + if (!tw && m->handler.pfdcount == 0) + { /* die */ + pthread_mutex_unlock (&m->mtx); + fetch = NULL; + break; + } - pthread_mutex_unlock (&m->mtx); - { - num = fd_poll (m, m->handler.copy, m->handler.pfdsize, - m->handler.copycount, timer_wait); - } - pthread_mutex_lock (&m->mtx); + /* Copy pollfd array + # active pollfds in it. Not necessary to copy + * the array size as this is fixed. */ + m->handler.copycount = m->handler.pfdcount; + memcpy (m->handler.copy, m->handler.pfds, + m->handler.copycount * sizeof (struct pollfd)); - /* Handle any errors received in poll() */ - if (num < 0) - { - if (errno == EINTR) - { - pthread_mutex_unlock (&m->mtx); - continue; /* loop around to signal handler */ - } - zlog_warn ("poll() error: %s", safe_strerror (errno)); - pthread_mutex_unlock (&m->mtx); - return NULL; - } + pthread_mutex_unlock (&m->mtx); + { + num = fd_poll (m, m->handler.copy, m->handler.pfdsize, + m->handler.copycount, tw); + } + pthread_mutex_lock (&m->mtx); + + /* Handle any errors received in poll() */ + if (num < 0) + { + if (errno == EINTR) + { + pthread_mutex_unlock (&m->mtx); + continue; /* loop around to signal handler */ + } + + /* else die */ + zlog_warn ("poll() error: %s", safe_strerror (errno)); + pthread_mutex_unlock (&m->mtx); + fetch = NULL; + break; + } /* Since we could have received more cancellation requests during poll(), process those */ do_thread_cancel (m);