lib: make thread.c pthread-safe

This change introduces synchronization mechanisms to thread.c in order
to allow safe concurrent use.

Thread.c should now be threadstafe with respect to:
* struct thread
* struct thread_master

Calls into thread.c for operations upon data of this type should not
require external synchronization.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
This commit is contained in:
Quentin Young 2017-03-03 19:01:49 +00:00
parent ed8ba2e920
commit 1189d95fca
2 changed files with 200 additions and 95 deletions

View File

@ -41,7 +41,7 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
#include <mach/mach_time.h> #include <mach/mach_time.h>
#endif #endif
/* Relative time, since startup */ static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER;
static struct hash *cpu_record = NULL; static struct hash *cpu_record = NULL;
static unsigned long static unsigned long
@ -137,9 +137,14 @@ cpu_record_print(struct vty *vty, thread_type filter)
vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs"); vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
vty_out(vty, " Avg uSec Max uSecs"); vty_out(vty, " Avg uSec Max uSecs");
vty_out(vty, " Type Thread%s", VTY_NEWLINE); vty_out(vty, " Type Thread%s", VTY_NEWLINE);
hash_iterate(cpu_record,
(void(*)(struct hash_backet*,void*))cpu_record_hash_print, pthread_mutex_lock (&cpu_record_mtx);
args); {
hash_iterate(cpu_record,
(void(*)(struct hash_backet*,void*))cpu_record_hash_print,
args);
}
pthread_mutex_unlock (&cpu_record_mtx);
if (tmp.total_calls > 0) if (tmp.total_calls > 0)
vty_out_cpu_thread_history(vty, &tmp); vty_out_cpu_thread_history(vty, &tmp);
@ -216,16 +221,25 @@ cpu_record_hash_clear (struct hash_backet *bucket,
if ( !(a->types & *filter) ) if ( !(a->types & *filter) )
return; return;
hash_release (cpu_record, bucket->data); pthread_mutex_lock (&cpu_record_mtx);
{
hash_release (cpu_record, bucket->data);
}
pthread_mutex_unlock (&cpu_record_mtx);
} }
static void static void
cpu_record_clear (thread_type filter) cpu_record_clear (thread_type filter)
{ {
thread_type *tmp = &filter; thread_type *tmp = &filter;
hash_iterate (cpu_record,
(void (*) (struct hash_backet*,void*)) cpu_record_hash_clear, pthread_mutex_lock (&cpu_record_mtx);
tmp); {
hash_iterate (cpu_record,
(void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
tmp);
}
pthread_mutex_unlock (&cpu_record_mtx);
} }
DEFUN (clear_thread_cpu, DEFUN (clear_thread_cpu,
@ -326,16 +340,20 @@ thread_master_create (void)
getrlimit(RLIMIT_NOFILE, &limit); getrlimit(RLIMIT_NOFILE, &limit);
if (cpu_record == NULL) pthread_mutex_lock (&cpu_record_mtx);
cpu_record {
= hash_create ((unsigned int (*) (void *))cpu_record_hash_key, if (cpu_record == NULL)
(int (*) (const void *, const void *))cpu_record_hash_cmp); cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
(int (*) (const void *, const void *))
cpu_record_hash_cmp);
}
pthread_mutex_unlock (&cpu_record_mtx);
rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master)); rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
if (rv == NULL) if (rv == NULL)
{ return NULL;
return NULL;
} pthread_mutex_init (&rv->mtx, NULL);
rv->fd_limit = (int)limit.rlim_cur; rv->fd_limit = (int)limit.rlim_cur;
rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit); rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
@ -498,11 +516,16 @@ thread_queue_free (struct thread_master *m, struct pqueue *queue)
void void
thread_master_free_unused (struct thread_master *m) thread_master_free_unused (struct thread_master *m)
{ {
struct thread *t; pthread_mutex_lock (&m->mtx);
while ((t = thread_trim_head(&m->unuse)) != NULL) {
{ struct thread *t;
XFREE(MTYPE_THREAD, t); while ((t = thread_trim_head(&m->unuse)) != NULL)
} {
pthread_mutex_destroy (&t->mtx);
XFREE(MTYPE_THREAD, t);
}
}
pthread_mutex_unlock (&m->mtx);
} }
/* Stop thread scheduler. */ /* Stop thread scheduler. */
@ -516,25 +539,37 @@ thread_master_free (struct thread_master *m)
thread_list_free (m, &m->ready); thread_list_free (m, &m->ready);
thread_list_free (m, &m->unuse); thread_list_free (m, &m->unuse);
thread_queue_free (m, m->background); thread_queue_free (m, m->background);
pthread_mutex_destroy (&m->mtx);
#if defined(HAVE_POLL) #if defined(HAVE_POLL)
XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
#endif #endif
XFREE (MTYPE_THREAD_MASTER, m); XFREE (MTYPE_THREAD_MASTER, m);
if (cpu_record) pthread_mutex_lock (&cpu_record_mtx);
{ {
hash_clean (cpu_record, cpu_record_hash_free); if (cpu_record)
hash_free (cpu_record); {
cpu_record = NULL; hash_clean (cpu_record, cpu_record_hash_free);
} hash_free (cpu_record);
cpu_record = NULL;
}
}
pthread_mutex_unlock (&cpu_record_mtx);
} }
/* Return remain time in second. */ /* Return remain time in second. */
unsigned long unsigned long
thread_timer_remain_second (struct thread *thread) thread_timer_remain_second (struct thread *thread)
{ {
int64_t remain = monotime_until(&thread->u.sands, NULL) / 1000000LL; int64_t remain;
pthread_mutex_lock (&thread->mtx);
{
remain = monotime_until(&thread->u.sands, NULL) / 1000000LL;
}
pthread_mutex_unlock (&thread->mtx);
return remain < 0 ? 0 : remain; return remain < 0 ? 0 : remain;
} }
@ -545,7 +580,11 @@ struct timeval
thread_timer_remain(struct thread *thread) thread_timer_remain(struct thread *thread)
{ {
struct timeval remain; struct timeval remain;
monotime_until(&thread->u.sands, &remain); pthread_mutex_lock (&thread->mtx);
{
monotime_until(&thread->u.sands, &remain);
}
pthread_mutex_unlock (&thread->mtx);
return remain; return remain;
} }
@ -560,8 +599,11 @@ thread_get (struct thread_master *m, u_char type,
if (! thread) if (! thread)
{ {
thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread)); thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread));
/* mutex only needs to be initialized at struct creation. */
pthread_mutex_init (&thread->mtx, NULL);
m->alloc++; m->alloc++;
} }
thread->type = type; thread->type = type;
thread->add_type = type; thread->add_type = type;
thread->master = m; thread->master = m;
@ -584,8 +626,12 @@ thread_get (struct thread_master *m, u_char type,
{ {
tmp.func = func; tmp.func = func;
tmp.funcname = funcname; tmp.funcname = funcname;
thread->hist = hash_get (cpu_record, &tmp, pthread_mutex_lock (&cpu_record_mtx);
(void * (*) (void *))cpu_record_hash_alloc); {
thread->hist = hash_get (cpu_record, &tmp,
(void * (*) (void *))cpu_record_hash_alloc);
}
pthread_mutex_unlock (&cpu_record_mtx);
} }
thread->hist->total_active++; thread->hist->total_active++;
thread->func = func; thread->func = func;
@ -703,36 +749,39 @@ funcname_thread_add_read_write (int dir, struct thread_master *m,
{ {
struct thread *thread = NULL; struct thread *thread = NULL;
#if !defined(HAVE_POLL) pthread_mutex_lock (&m->mtx);
thread_fd_set *fdset = NULL; {
if (dir == THREAD_READ)
fdset = &m->handler.readfd;
else
fdset = &m->handler.writefd;
#endif
#if defined (HAVE_POLL) #if defined (HAVE_POLL)
thread = generic_thread_add(m, func, arg, fd, dir, debugargpass); thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
if (thread == NULL)
return NULL;
#else #else
if (FD_ISSET (fd, fdset)) thread_fd_set *fdset = NULL;
{ if (dir == THREAD_READ)
zlog_warn ("There is already %s fd [%d]", fdset = &m->handler.readfd;
(dir == THREAD_READ) ? "read" : "write", fd); else
return NULL; fdset = &m->handler.writefd;
}
FD_SET (fd, fdset); if (FD_ISSET (fd, fdset))
thread = thread_get (m, dir, func, arg, debugargpass); {
zlog_warn ("There is already %s fd [%d]",
(dir == THREAD_READ) ? "read" : "write", fd);
}
else
{
FD_SET (fd, fdset);
thread = thread_get (m, dir, func, arg, debugargpass);
}
#endif #endif
thread->u.fd = fd; if (thread)
if (dir == THREAD_READ) {
thread_add_fd (m->read, thread); thread->u.fd = fd;
else if (dir == THREAD_READ)
thread_add_fd (m->write, thread); thread_add_fd (m->read, thread);
else
thread_add_fd (m->write, thread);
}
}
pthread_mutex_unlock (&m->mtx);
return thread; return thread;
} }
@ -754,7 +803,11 @@ funcname_thread_add_timer_timeval (struct thread_master *m,
assert (time_relative); assert (time_relative);
queue = ((type == THREAD_TIMER) ? m->timer : m->background); queue = ((type == THREAD_TIMER) ? m->timer : m->background);
thread = thread_get (m, type, func, arg, debugargpass); pthread_mutex_lock (&m->mtx);
{
thread = thread_get (m, type, func, arg, debugargpass);
}
pthread_mutex_unlock (&m->mtx);
monotime(&thread->u.sands); monotime(&thread->u.sands);
timeradd(&thread->u.sands, time_relative, &thread->u.sands); timeradd(&thread->u.sands, time_relative, &thread->u.sands);
@ -847,9 +900,13 @@ funcname_thread_add_event (struct thread_master *m,
assert (m != NULL); assert (m != NULL);
thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass); pthread_mutex_lock (&m->mtx);
thread->u.val = val; {
thread_list_add (&m->event, thread); thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass);
thread->u.val = val;
thread_list_add (&m->event, thread);
}
pthread_mutex_unlock (&m->mtx);
return thread; return thread;
} }
@ -880,14 +937,19 @@ thread_cancel_read_or_write (struct thread *thread, short int state)
fd_clear_read_write (thread); fd_clear_read_write (thread);
} }
/* Cancel thread from scheduler. */ /**
* Cancel thread from scheduler.
*
* This function is *NOT* MT-safe. DO NOT call it from any other thread than
* the primary thread.
*/
void void
thread_cancel (struct thread *thread) thread_cancel (struct thread *thread)
{ {
struct thread_list *list = NULL; struct thread_list *list = NULL;
struct pqueue *queue = NULL; struct pqueue *queue = NULL;
struct thread **thread_array = NULL; struct thread **thread_array = NULL;
switch (thread->type) switch (thread->type)
{ {
case THREAD_READ: case THREAD_READ:
@ -951,39 +1013,48 @@ thread_cancel_event (struct thread_master *m, void *arg)
{ {
unsigned int ret = 0; unsigned int ret = 0;
struct thread *thread; struct thread *thread;
struct thread *t;
thread = m->event.head; pthread_mutex_lock (&m->mtx);
while (thread) {
{ thread = m->event.head;
struct thread *t; while (thread)
{
t = thread; t = thread;
thread = t->next; pthread_mutex_lock (&t->mtx);
if (t->arg == arg)
{ {
ret++; thread = t->next;
thread_list_delete (&m->event, t);
thread_add_unuse (m, t); if (t->arg == arg)
{
ret++;
thread_list_delete (&m->event, t);
thread_add_unuse (m, t);
}
} }
} pthread_mutex_unlock (&t->mtx);
}
/* thread can be on the ready list too */ /* thread can be on the ready list too */
thread = m->ready.head; thread = m->ready.head;
while (thread) while (thread)
{ {
struct thread *t; t = thread;
pthread_mutex_lock (&t->mtx);
t = thread;
thread = t->next;
if (t->arg == arg)
{ {
ret++; thread = t->next;
thread_list_delete (&m->ready, t);
thread_add_unuse (m, t); if (t->arg == arg)
{
ret++;
thread_list_delete (&m->ready, t);
thread_add_unuse (m, t);
}
} }
} pthread_mutex_unlock (&t->mtx);
}
}
pthread_mutex_unlock (&m->mtx);
return ret; return ret;
} }
@ -1143,18 +1214,25 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
struct timeval *timer_wait = &timer_val; struct timeval *timer_wait = &timer_val;
struct timeval *timer_wait_bg; struct timeval *timer_wait_bg;
pthread_mutex_lock (&m->mtx);
while (1) while (1)
{ {
int num = 0; int num = 0;
/* Signals pre-empt everything */ /* Signals pre-empt everything */
pthread_mutex_unlock (&m->mtx);
quagga_sigevent_process (); quagga_sigevent_process ();
pthread_mutex_lock (&m->mtx);
/* Drain the ready queue of already scheduled jobs, before scheduling /* Drain the ready queue of already scheduled jobs, before scheduling
* more. * more.
*/ */
if ((thread = thread_trim_head (&m->ready)) != NULL) if ((thread = thread_trim_head (&m->ready)) != NULL)
return thread_run (m, thread, fetch); {
fetch = thread_run (m, thread, fetch);
pthread_mutex_unlock (&m->mtx);
return fetch;
}
/* To be fair to all kinds of threads, and avoid starvation, we /* To be fair to all kinds of threads, and avoid starvation, we
* need to be careful to consider all thread types for scheduling * need to be careful to consider all thread types for scheduling
@ -1196,6 +1274,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
if (errno == EINTR) if (errno == EINTR)
continue; /* signal received - process it */ continue; /* signal received - process it */
zlog_warn ("select() error: %s", safe_strerror (errno)); zlog_warn ("select() error: %s", safe_strerror (errno));
pthread_mutex_unlock (&m->mtx);
return NULL; return NULL;
} }
@ -1215,14 +1294,22 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
list at this time. If this is code is uncommented, then background list at this time. If this is code is uncommented, then background
timer threads will not run unless there is nothing else to do. */ timer threads will not run unless there is nothing else to do. */
if ((thread = thread_trim_head (&m->ready)) != NULL) if ((thread = thread_trim_head (&m->ready)) != NULL)
return thread_run (m, thread, fetch); {
fetch = thread_run (m, thread, fetch);
pthread_mutex_unlock (&m->mtx);
return fetch;
}
#endif #endif
/* Background timer/events, lowest priority */ /* Background timer/events, lowest priority */
thread_timer_process (m->background, &now); thread_timer_process (m->background, &now);
if ((thread = thread_trim_head (&m->ready)) != NULL) if ((thread = thread_trim_head (&m->ready)) != NULL)
return thread_run (m, thread, fetch); {
fetch = thread_run (m, thread, fetch);
pthread_mutex_unlock (&m->mtx);
return fetch;
}
} }
} }
@ -1248,13 +1335,23 @@ thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime)
int int
thread_should_yield (struct thread *thread) thread_should_yield (struct thread *thread)
{ {
return monotime_since(&thread->real, NULL) > (int64_t)thread->yield; int result;
pthread_mutex_lock (&thread->mtx);
{
result = monotime_since(&thread->real, NULL) > (int64_t)thread->yield;
}
pthread_mutex_unlock (&thread->mtx);
return result;
} }
void void
thread_set_yield_time (struct thread *thread, unsigned long yield_time) thread_set_yield_time (struct thread *thread, unsigned long yield_time)
{ {
thread->yield = yield_time; pthread_mutex_lock (&thread->mtx);
{
thread->yield = yield_time;
}
pthread_mutex_unlock (&thread->mtx);
} }
void void
@ -1324,6 +1421,7 @@ funcname_thread_execute (struct thread_master *m,
memset (&dummy, 0, sizeof (struct thread)); memset (&dummy, 0, sizeof (struct thread));
pthread_mutex_init (&dummy.mtx, NULL);
dummy.type = THREAD_EVENT; dummy.type = THREAD_EVENT;
dummy.add_type = THREAD_EXECUTE; dummy.add_type = THREAD_EXECUTE;
dummy.master = NULL; dummy.master = NULL;
@ -1332,8 +1430,12 @@ funcname_thread_execute (struct thread_master *m,
tmp.func = dummy.func = func; tmp.func = dummy.func = func;
tmp.funcname = dummy.funcname = funcname; tmp.funcname = dummy.funcname = funcname;
dummy.hist = hash_get (cpu_record, &tmp, pthread_mutex_lock (&cpu_record_mtx);
(void * (*) (void *))cpu_record_hash_alloc); {
dummy.hist = hash_get (cpu_record, &tmp,
(void * (*) (void *))cpu_record_hash_alloc);
}
pthread_mutex_unlock (&cpu_record_mtx);
dummy.schedfrom = schedfrom; dummy.schedfrom = schedfrom;
dummy.schedfrom_line = fromln; dummy.schedfrom_line = fromln;

View File

@ -24,6 +24,7 @@
#include <zebra.h> #include <zebra.h>
#include "monotime.h" #include "monotime.h"
#include <pthread.h>
struct rusage_t struct rusage_t
{ {
@ -84,6 +85,7 @@ struct thread_master
int fd_limit; int fd_limit;
struct fd_handler handler; struct fd_handler handler;
unsigned long alloc; unsigned long alloc;
pthread_mutex_t mtx;
}; };
typedef unsigned char thread_type; typedef unsigned char thread_type;
@ -110,6 +112,7 @@ struct thread
const char *funcname; const char *funcname;
const char *schedfrom; const char *schedfrom;
int schedfrom_line; int schedfrom_line;
pthread_mutex_t mtx;
}; };
struct cpu_thread_history struct cpu_thread_history