lib: additional thread.c MT-safety work

Fixes a few insufficient critical sections. Adds back locking for
thread_cancel(), since while thread_cancel() is only safe to call from
the pthread which owns the thread master due to races involving
thread_fetch() modifying thread master's ready queue, we still need
mutual exclusion here for all of the other public thread.c functions to
maintain their MT-safety.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
This commit is contained in:
Quentin Young 2017-04-17 18:33:58 +00:00
parent 1189d95fca
commit 2c70efaed1

View File

@ -773,6 +773,8 @@ funcname_thread_add_read_write (int dir, struct thread_master *m,
#endif #endif
if (thread) if (thread)
{
pthread_mutex_lock (&thread->mtx);
{ {
thread->u.fd = fd; thread->u.fd = fd;
if (dir == THREAD_READ) if (dir == THREAD_READ)
@ -780,6 +782,8 @@ funcname_thread_add_read_write (int dir, struct thread_master *m,
else else
thread_add_fd (m->write, thread); thread_add_fd (m->write, thread);
} }
pthread_mutex_unlock (&thread->mtx);
}
} }
pthread_mutex_unlock (&m->mtx); pthread_mutex_unlock (&m->mtx);
@ -802,17 +806,21 @@ funcname_thread_add_timer_timeval (struct thread_master *m,
assert (type == THREAD_TIMER || type == THREAD_BACKGROUND); assert (type == THREAD_TIMER || type == THREAD_BACKGROUND);
assert (time_relative); assert (time_relative);
queue = ((type == THREAD_TIMER) ? m->timer : m->background);
pthread_mutex_lock (&m->mtx); pthread_mutex_lock (&m->mtx);
{ {
queue = ((type == THREAD_TIMER) ? m->timer : m->background);
thread = thread_get (m, type, func, arg, debugargpass); thread = thread_get (m, type, func, arg, debugargpass);
pthread_mutex_lock (&thread->mtx);
{
monotime(&thread->u.sands);
timeradd(&thread->u.sands, time_relative, &thread->u.sands);
pqueue_enqueue(thread, queue);
}
pthread_mutex_unlock (&thread->mtx);
} }
pthread_mutex_unlock (&m->mtx); pthread_mutex_unlock (&m->mtx);
monotime(&thread->u.sands);
timeradd(&thread->u.sands, time_relative, &thread->u.sands);
pqueue_enqueue(thread, queue);
return thread; return thread;
} }
@ -903,9 +911,13 @@ funcname_thread_add_event (struct thread_master *m,
pthread_mutex_lock (&m->mtx); pthread_mutex_lock (&m->mtx);
{ {
thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass); thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
pthread_mutex_lock (&thread->mtx);
{
thread->u.val = val; thread->u.val = val;
thread_list_add (&m->event, thread); thread_list_add (&m->event, thread);
} }
pthread_mutex_unlock (&thread->mtx);
}
pthread_mutex_unlock (&m->mtx); pthread_mutex_unlock (&m->mtx);
return thread; return thread;
@ -940,8 +952,8 @@ thread_cancel_read_or_write (struct thread *thread, short int state)
/** /**
* Cancel thread from scheduler. * Cancel thread from scheduler.
* *
* This function is *NOT* MT-safe. DO NOT call it from any other thread than * This function is *NOT* MT-safe. DO NOT call it from any other pthread except
* the primary thread. * the one which owns thread->master.
*/ */
void void
thread_cancel (struct thread *thread) thread_cancel (struct thread *thread)
@ -950,6 +962,9 @@ thread_cancel (struct thread *thread)
struct pqueue *queue = NULL; struct pqueue *queue = NULL;
struct thread **thread_array = NULL; struct thread **thread_array = NULL;
pthread_mutex_lock (&thread->master->mtx);
pthread_mutex_lock (&thread->mtx);
switch (thread->type) switch (thread->type)
{ {
case THREAD_READ: case THREAD_READ:
@ -981,15 +996,14 @@ thread_cancel (struct thread *thread)
queue = thread->master->background; queue = thread->master->background;
break; break;
default: default:
return; goto done;
break; break;
} }
if (queue) if (queue)
{ {
assert(thread->index >= 0); assert(thread->index >= 0);
assert(thread == queue->array[thread->index]); pqueue_remove (thread, queue);
pqueue_remove_at(thread->index, queue);
} }
else if (list) else if (list)
{ {
@ -1005,6 +1019,10 @@ thread_cancel (struct thread *thread)
} }
thread_add_unuse (thread->master, thread); thread_add_unuse (thread->master, thread);
done:
pthread_mutex_unlock (&thread->mtx);
pthread_mutex_unlock (&thread->master->mtx);
} }
/* Delete all events which has argument value arg. */ /* Delete all events which has argument value arg. */
@ -1214,16 +1232,14 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
struct timeval *timer_wait = &timer_val; struct timeval *timer_wait = &timer_val;
struct timeval *timer_wait_bg; struct timeval *timer_wait_bg;
pthread_mutex_lock (&m->mtx);
while (1) while (1)
{ {
int num = 0; int num = 0;
/* Signals pre-empt everything */ /* Signals pre-empt everything */
pthread_mutex_unlock (&m->mtx);
quagga_sigevent_process (); quagga_sigevent_process ();
pthread_mutex_lock (&m->mtx);
pthread_mutex_lock (&m->mtx);
/* Drain the ready queue of already scheduled jobs, before scheduling /* Drain the ready queue of already scheduled jobs, before scheduling
* more. * more.
*/ */
@ -1272,7 +1288,10 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
if (num < 0) if (num < 0)
{ {
if (errno == EINTR) if (errno == EINTR)
{
pthread_mutex_unlock (&m->mtx);
continue; /* signal received - process it */ continue; /* signal received - process it */
}
zlog_warn ("select() error: %s", safe_strerror (errno)); zlog_warn ("select() error: %s", safe_strerror (errno));
pthread_mutex_unlock (&m->mtx); pthread_mutex_unlock (&m->mtx);
return NULL; return NULL;
@ -1310,6 +1329,8 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
pthread_mutex_unlock (&m->mtx); pthread_mutex_unlock (&m->mtx);
return fetch; return fetch;
} }
pthread_mutex_unlock (&m->mtx);
} }
} }