lib: add mt-safe thread_cancel

This patch implements an MT-safe version of thread_cancel() in
thread_cancel_async(). Behavior as follows:

* Cancellation requests are queued into a list

* Cancellation requests made from the same pthread as the thread_master
  owner are serviced immediately (thread_cancel())

* Cancellation requests made from a separate pthread are queued and the
  call blocks on a condition variable until the owning pthread services
  the request, at which point the condition variable is signaled and
  execution continues (thread_cancel_async())

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
This commit is contained in:
Quentin Young 2017-06-07 20:34:09 +00:00
parent 62e4232010
commit 63ccb9cb12
2 changed files with 277 additions and 185 deletions

View File

@ -328,6 +328,12 @@ thread_timer_update(void *node, int actual_position)
thread->index = actual_position;
}
static void
cancelreq_del (void *cr)
{
XFREE (MTYPE_TMP, cr);
}
/* Allocate new thread master. */
struct thread_master *
thread_master_create (void)
@ -351,6 +357,7 @@ thread_master_create (void)
return NULL;
pthread_mutex_init (&rv->mtx, NULL);
pthread_cond_init (&rv->cancel_cond, NULL);
rv->fd_limit = (int)limit.rlim_cur;
rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
@ -375,6 +382,8 @@ thread_master_create (void)
rv->spin = true;
rv->handle_signals = true;
rv->owner = pthread_self();
rv->cancel_req = list_new ();
rv->cancel_req->del = cancelreq_del;
pipe (rv->io_pipe);
set_nonblocking (rv->io_pipe[0]);
set_nonblocking (rv->io_pipe[1]);
@ -864,6 +873,8 @@ funcname_thread_add_event (struct thread_master *m,
return thread;
}
/* Thread cancellation ------------------------------------------------------ */
static void
thread_cancel_read_or_write (struct thread *thread, short int state)
{
@ -885,130 +896,209 @@ thread_cancel_read_or_write (struct thread *thread, short int state)
}
/**
* Cancel thread from scheduler.
* Process cancellation requests.
*
* This function is *NOT* MT-safe. DO NOT call it from any other pthread except
* the one which owns thread->master. You will crash.
* This may only be run from the pthread which owns the thread_master.
*
* @param master the thread master to process
* @REQUIRE master->mtx
*/
void
thread_cancel (struct thread *thread)
static void
do_thread_cancel (struct thread_master *master)
{
struct thread_list *list = NULL;
struct pqueue *queue = NULL;
struct thread **thread_array = NULL;
pthread_mutex_lock (&thread->mtx);
pthread_mutex_lock (&thread->master->mtx);
assert (pthread_self() == thread->master->owner);
switch (thread->type)
{
case THREAD_READ:
thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
thread_array = thread->master->read;
break;
case THREAD_WRITE:
thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
thread_array = thread->master->write;
break;
case THREAD_TIMER:
queue = thread->master->timer;
break;
case THREAD_EVENT:
list = &thread->master->event;
break;
case THREAD_READY:
list = &thread->master->ready;
break;
default:
goto done;
break;
}
if (queue)
{
assert(thread->index >= 0);
pqueue_remove (thread, queue);
}
else if (list)
{
thread_list_delete (list, thread);
}
else if (thread_array)
{
thread_array[thread->u.fd] = NULL;
}
else
{
assert(!"Thread should be either in queue or list or array!");
}
if (thread->ref)
*thread->ref = NULL;
thread_add_unuse (thread->master, thread);
done:
pthread_mutex_unlock (&thread->master->mtx);
pthread_mutex_unlock (&thread->mtx);
}
/* Delete all events which has argument value arg. */
unsigned int
thread_cancel_event (struct thread_master *m, void *arg)
{
unsigned int ret = 0;
struct thread *thread;
struct thread *t;
pthread_mutex_lock (&m->mtx);
{
thread = m->event.head;
while (thread)
{
t = thread;
pthread_mutex_lock (&t->mtx);
struct cancel_req *cr;
struct listnode *ln;
for (ALL_LIST_ELEMENTS_RO (master->cancel_req, ln, cr))
{
/* If this is an event object cancellation, linear search through event
* list deleting any events which have the specified argument. We also
* need to check every thread in the ready queue. */
if (cr->eventobj)
{
thread = t->next;
struct thread *t;
thread = master->event.head;
if (t->arg == arg)
while (thread)
{
ret++;
thread_list_delete (&m->event, t);
if (t->ref)
*t->ref = NULL;
thread_add_unuse (m, t);
}
}
pthread_mutex_unlock (&t->mtx);
}
t = thread;
thread = t->next;
/* thread can be on the ready list too */
thread = m->ready.head;
while (thread)
{
t = thread;
pthread_mutex_lock (&t->mtx);
if (t->arg == cr->eventobj)
{
thread_list_delete (&master->event, t);
if (t->ref)
*t->ref = NULL;
thread_add_unuse (master, t);
}
}
thread = master->ready.head;
while (thread)
{
t = thread;
thread = t->next;
if (t->arg == cr->eventobj)
{
thread_list_delete (&master->ready, t);
if (t->ref)
*t->ref = NULL;
thread_add_unuse (master, t);
}
}
continue;
}
/* The pointer varies depending on whether the cancellation request was
* made asynchronously or not. If it was, we need to check whether the
* thread even exists anymore before cancelling it. */
thread = (cr->thread) ? cr->thread : *cr->threadref;
if (!thread)
continue;
/* Determine the appropriate queue to cancel the thread from */
switch (thread->type)
{
thread = t->next;
if (t->arg == arg)
{
ret++;
thread_list_delete (&m->ready, t);
if (t->ref)
*t->ref = NULL;
thread_add_unuse (m, t);
}
case THREAD_READ:
thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
thread_array = thread->master->read;
break;
case THREAD_WRITE:
thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
thread_array = thread->master->write;
break;
case THREAD_TIMER:
queue = thread->master->timer;
break;
case THREAD_EVENT:
list = &thread->master->event;
break;
case THREAD_READY:
list = &thread->master->ready;
break;
default:
continue;
break;
}
pthread_mutex_unlock (&t->mtx);
}
}
pthread_mutex_unlock (&m->mtx);
return ret;
if (queue)
{
assert(thread->index >= 0);
pqueue_remove (thread, queue);
}
else if (list)
{
thread_list_delete (list, thread);
}
else if (thread_array)
{
thread_array[thread->u.fd] = NULL;
}
else
{
assert(!"Thread should be either in queue or list or array!");
}
if (thread->ref)
*thread->ref = NULL;
thread_add_unuse (thread->master, thread);
}
/* Delete and free all cancellation requests */
list_delete_all_node (master->cancel_req);
/* Wake up any threads which may be blocked in thread_cancel_async() */
pthread_cond_broadcast (&master->cancel_cond);
}
/**
* Cancel any events which have the specified argument.
*
* MT-Unsafe
*
* @param m the thread_master to cancel from
* @param arg the argument passed when creating the event
*/
void
thread_cancel_event (struct thread_master *master, void *arg)
{
assert (master->owner == pthread_self());
pthread_mutex_lock (&master->mtx);
{
struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
cr->eventobj = arg;
listnode_add (master->cancel_req, cr);
}
pthread_mutex_unlock (&master->mtx);
}
/**
* Cancel a specific task.
*
* MT-Unsafe
*
* @param thread task to cancel
*/
void
thread_cancel (struct thread *thread)
{
assert (thread->master->owner == pthread_self());
pthread_mutex_lock (&thread->master->mtx);
{
struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
cr->thread = thread;
listnode_add (thread->master->cancel_req, cr);
do_thread_cancel (thread->master);
}
pthread_mutex_unlock (&thread->master->mtx);
}
/**
* Asynchronous cancellation.
*
* Called with a pointer to a thread, this function posts a cancellation
* request and blocks until it is serviced.
*
* If the thread is currently running, execution blocks until it completes.
*
* MT-Safe
*
* @param thread the thread to cancel
*/
void
thread_cancel_async (struct thread_master *master, struct thread **thread, void *eventobj)
{
assert (!(thread && eventobj) && (thread || eventobj));
assert (master->owner != pthread_self());
pthread_mutex_lock (&master->mtx);
{
if (*thread) {
struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
cr->threadref = thread;
listnode_add (master->cancel_req, cr);
}
else if (eventobj) {
struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
cr->eventobj = eventobj;
listnode_add (master->cancel_req, cr);
}
AWAKEN (master);
pthread_cond_wait (&master->cancel_cond, &master->mtx);
}
pthread_mutex_unlock (&master->mtx);
}
/* ------------------------------------------------------------------------- */
static struct timeval *
thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
{
@ -1139,98 +1229,91 @@ thread_process (struct thread_list *list)
struct thread *
thread_fetch (struct thread_master *m, struct thread *fetch)
{
struct thread *thread;
struct thread *thread = NULL;
struct timeval now;
struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
struct timeval *timer_wait = &timer_val;
unsigned int count;
do
{
int num = 0;
int num = 0;
/* Signals pre-empt everything */
if (m->handle_signals)
quagga_sigevent_process ();
pthread_mutex_lock (&m->mtx);
/* Drain the ready queue of already scheduled jobs, before scheduling
* more.
*/
if ((thread = thread_trim_head (&m->ready)) != NULL)
{
fetch = thread_run (m, thread, fetch);
if (fetch->ref)
*fetch->ref = NULL;
pthread_mutex_unlock (&m->mtx);
return fetch;
}
/* To be fair to all kinds of threads, and avoid starvation, we
* need to be careful to consider all thread types for scheduling
* in each quanta. I.e. we should not return early from here on.
*/
/* Normal event are the next highest priority. */
thread_process (&m->event);
/* Calculate select wait timer if nothing else to do */
if (m->ready.count == 0)
{
timer_wait = thread_timer_wait (m->timer, &timer_val);
}
do {
/* 1. Handle signals if any */
if (m->handle_signals)
quagga_sigevent_process ();
if (timer_wait && timer_wait->tv_sec < 0)
{
timerclear(&timer_val);
timer_wait = &timer_val;
}
pthread_mutex_lock (&m->mtx);
unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp;
memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd));
/* Process any pending cancellation requests */
do_thread_cancel (m);
pthread_mutex_unlock (&m->mtx);
/* Post events to ready queue. This must come before the following block
* since events should occur immediately */
thread_process (&m->event);
/* If there are no tasks on the ready queue, we will poll() until a timer
* expires or we receive I/O, whichever comes first. The strategy for doing
* this is to set the poll() timeout to the time remaining until the next
* timer expires. */
if (m->ready.count == 0)
{
num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait);
timer_wait = thread_timer_wait (m->timer, &timer_val);
/* If negative timeout, we wish to poll() indefinitely. */
if (timer_wait && timer_wait->tv_sec < 0)
{
timerclear(&timer_val);
timer_wait = &timer_val;
}
/* Calculate number of file descriptors and make a temporary copy */
count = m->handler.pfdcount + m->handler.pfdcountsnmp;
memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd));
pthread_mutex_unlock (&m->mtx);
{
num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait);
}
pthread_mutex_lock (&m->mtx);
/* Handle any errors received in poll() */
if (num < 0)
{
if (errno == EINTR)
{
pthread_mutex_unlock (&m->mtx);
continue; /* loop around to signal handler */
}
zlog_warn ("poll() error: %s", safe_strerror (errno));
pthread_mutex_unlock (&m->mtx);
return NULL;
}
}
pthread_mutex_lock (&m->mtx);
/* Signals should get quick treatment */
if (num < 0)
{
if (errno == EINTR)
{
pthread_mutex_unlock (&m->mtx);
continue; /* signal received - process it */
}
zlog_warn ("poll() error: %s", safe_strerror (errno));
pthread_mutex_unlock (&m->mtx);
return NULL;
}
/* Since we could have received more cancellation requests during poll(), process those */
do_thread_cancel (m);
/* Check foreground timers. Historically, they have had higher
* priority than I/O threads, so let's push them onto the ready
* list in front of the I/O threads. */
monotime(&now);
thread_process_timers (m->timer, &now);
/* Got IO, process it */
if (num > 0)
thread_process_io (m, m->handler.copy, num, count);
/* Post timers to ready queue. */
monotime(&now);
thread_process_timers (m->timer, &now);
if ((thread = thread_trim_head (&m->ready)) != NULL)
{
fetch = thread_run (m, thread, fetch);
if (fetch->ref)
*fetch->ref = NULL;
pthread_mutex_unlock (&m->mtx);
return fetch;
}
/* Post I/O to ready queue. */
if (num > 0)
thread_process_io (m, m->handler.copy, num, count);
pthread_mutex_unlock (&m->mtx);
/* If we have a ready task, break the loop and return it to the caller */
if ((thread = thread_trim_head (&m->ready)))
{
fetch = thread_run (m, thread, fetch);
if (fetch->ref)
*fetch->ref = NULL;
}
} while (m->spin);
pthread_mutex_unlock (&m->mtx);
return NULL;
} while (!thread && m->spin);
return fetch;
}
unsigned long

View File

@ -59,6 +59,12 @@ struct fd_handler
struct pollfd *copy;
};
struct cancel_req {
struct thread *thread;
void *eventobj;
struct thread **threadref;
};
/* Master of the theads. */
struct thread_master
{
@ -68,6 +74,8 @@ struct thread_master
struct thread_list event;
struct thread_list ready;
struct thread_list unuse;
struct list *cancel_req;
pthread_cond_t cancel_cond;
int io_pipe[2];
int fd_limit;
struct fd_handler handler;
@ -189,7 +197,8 @@ extern void funcname_thread_execute (struct thread_master *,
#undef debugargdef
extern void thread_cancel (struct thread *);
extern unsigned int thread_cancel_event (struct thread_master *, void *);
extern void thread_cancel_async (struct thread_master *, struct thread **, void *);
extern void thread_cancel_event (struct thread_master *, void *);
extern struct thread *thread_fetch (struct thread_master *, struct thread *);
extern void thread_call (struct thread *);
extern unsigned long thread_timer_remain_second (struct thread *);