Merge pull request #723 from qlyoung/fix-static-pipe-poker-buf

lib: more multithreading infra work
This commit is contained in:
Jafar Al-Gharaibeh 2017-06-30 10:00:43 -05:00 committed by GitHub
commit b5a46fd2a6
21 changed files with 230 additions and 202 deletions

View File

@ -131,7 +131,7 @@ lde(void)
ldpd_process = PROC_LDE_ENGINE;
log_procname = log_procnames[PROC_LDE_ENGINE];
master = thread_master_create();
master = thread_master_create(NULL);
/* setup signal handler */
signal_init(master, array_size(lde_signals), lde_signals);

View File

@ -109,7 +109,7 @@ ldpe(void)
ldpd_process = PROC_LDP_ENGINE;
log_procname = log_procnames[ldpd_process];
master = thread_master_create();
master = thread_master_create(NULL);
/* setup signal handler */
signal_init(master, array_size(ldpe_signals), ldpe_signals);

View File

@ -86,7 +86,7 @@ struct frr_pthread *frr_pthread_new(const char *name, unsigned int id,
XCALLOC(MTYPE_FRR_PTHREAD,
sizeof(struct frr_pthread));
fpt->id = id;
fpt->master = thread_master_create();
fpt->master = thread_master_create(name);
fpt->start_routine = start_routine;
fpt->stop_routine = stop_routine;
fpt->name = XSTRDUP(MTYPE_FRR_PTHREAD, name);

View File

@ -38,7 +38,7 @@ int main(int argc, char **argv)
{
struct thread thread;
master = thread_master_create ();
master = thread_master_create(NULL);
openzlog ("grammar_sandbox", "NONE", 0,
LOG_CONS|LOG_NDELAY|LOG_PID, LOG_DAEMON);

View File

@ -366,7 +366,7 @@ struct thread_master *frr_init(void)
zprivs_init(di->privs);
master = thread_master_create();
master = thread_master_create(NULL);
signal_init(master, di->n_signals, di->signals);
if (di->flags & FRR_LIMITED_CLI)

View File

@ -509,16 +509,18 @@ zlog_signal(int signo, const char *action
);
s = buf;
if (!thread_current)
struct thread *tc;
tc = pthread_getspecific (thread_current);
if (!tc)
s = str_append (LOC, "no thread information available\n");
else
{
s = str_append (LOC, "in thread ");
s = str_append (LOC, thread_current->funcname);
s = str_append (LOC, tc->funcname);
s = str_append (LOC, " scheduled from ");
s = str_append (LOC, thread_current->schedfrom);
s = str_append (LOC, tc->schedfrom);
s = str_append (LOC, ":");
s = num_append (LOC, thread_current->schedfrom_line);
s = num_append (LOC, tc->schedfrom_line);
s = str_append (LOC, "\n");
}
@ -700,10 +702,13 @@ ZLOG_FUNC(zlog_debug, LOG_DEBUG)
void zlog_thread_info (int log_level)
{
if (thread_current)
struct thread *tc;
tc = pthread_getspecific (thread_current);
if (tc)
zlog(log_level, "Current thread function %s, scheduled from "
"file %s, line %u", thread_current->funcname,
thread_current->schedfrom, thread_current->schedfrom_line);
"file %s, line %u", tc->funcname,
tc->schedfrom, tc->schedfrom_line);
else
zlog(log_level, "Current thread not known/applicable");
}

View File

@ -47,16 +47,15 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
write (m->io_pipe[1], &wakebyte, 1); \
} while (0);
static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER;
static struct hash *cpu_record = NULL;
/* control variable for initializer */
pthread_once_t init_once = PTHREAD_ONCE_INIT;
pthread_key_t thread_current;
static unsigned long
timeval_elapsed (struct timeval a, struct timeval b)
{
return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
+ (a.tv_usec - b.tv_usec));
}
pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
static struct list *masters;
/* CLI start ---------------------------------------------------------------- */
static unsigned int
cpu_record_hash_key (struct cpu_thread_history *a)
{
@ -106,12 +105,12 @@ vty_out_cpu_thread_history(struct vty* vty,
}
static void
cpu_record_hash_print(struct hash_backet *bucket,
void *args[])
cpu_record_hash_print(struct hash_backet *bucket, void *args[])
{
struct cpu_thread_history *totals = args[0];
struct vty *vty = args[1];
thread_type *filter = args[2];
struct cpu_thread_history *a = bucket->data;
if ( !(a->types & *filter) )
@ -132,29 +131,131 @@ cpu_record_print(struct vty *vty, thread_type filter)
{
struct cpu_thread_history tmp;
void *args[3] = {&tmp, vty, &filter};
struct thread_master *m;
struct listnode *ln;
memset(&tmp, 0, sizeof tmp);
tmp.funcname = "TOTAL";
tmp.types = filter;
vty_outln (vty, "%21s %18s %18s",
"", "CPU (user+system):", "Real (wall-clock):");
pthread_mutex_lock (&masters_mtx);
{
for (ALL_LIST_ELEMENTS_RO (masters, ln, m)) {
const char *name = m->name ? m->name : "main";
char underline[strlen(name) + 1];
memset (underline, '-', sizeof (underline));
underline[sizeof(underline)] = '\0';
vty_out (vty, VTYNL);
vty_outln(vty, "Showing statistics for pthread %s", name);
vty_outln(vty, "-------------------------------%s", underline);
vty_outln(vty, "%21s %18s %18s", "", "CPU (user+system):", "Real (wall-clock):");
vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
vty_out(vty, " Avg uSec Max uSecs");
vty_outln(vty, " Type Thread");
if (m->cpu_record->count)
hash_iterate(m->cpu_record,
(void (*)(struct hash_backet *, void *))
cpu_record_hash_print,
args);
else
vty_outln(vty, "No data to display yet.");
vty_out(vty, VTYNL);
}
}
pthread_mutex_unlock (&masters_mtx);
vty_out(vty, VTYNL);
vty_outln(vty, "Total thread statistics");
vty_outln(vty, "-------------------------");
vty_outln(vty, "%21s %18s %18s", "", "CPU (user+system):", "Real (wall-clock):");
vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
vty_out(vty, " Avg uSec Max uSecs");
vty_outln (vty, " Type Thread");
pthread_mutex_lock (&cpu_record_mtx);
{
hash_iterate(cpu_record,
(void(*)(struct hash_backet*,void*))cpu_record_hash_print,
args);
}
pthread_mutex_unlock (&cpu_record_mtx);
vty_outln(vty, " Type Thread");
if (tmp.total_calls > 0)
vty_out_cpu_thread_history(vty, &tmp);
}
static void
cpu_record_hash_clear (struct hash_backet *bucket, void *args[])
{
thread_type *filter = args[0];
struct hash *cpu_record = args[1];
struct cpu_thread_history *a = bucket->data;
if ( !(a->types & *filter) )
return;
hash_release (cpu_record, bucket->data);
}
static void
cpu_record_clear (thread_type filter)
{
thread_type *tmp = &filter;
struct thread_master *m;
struct listnode *ln;
pthread_mutex_lock (&masters_mtx);
{
for (ALL_LIST_ELEMENTS_RO (masters, ln, m)) {
pthread_mutex_lock (&m->mtx);
{
void *args[2] = { tmp, m->cpu_record };
hash_iterate (m->cpu_record,
(void (*) (struct hash_backet*,void*))
cpu_record_hash_clear,
args);
}
pthread_mutex_unlock (&m->mtx);
}
}
pthread_mutex_unlock (&masters_mtx);
}
static thread_type
parse_filter (const char *filterstr)
{
int i = 0;
int filter = 0;
while (filterstr[i] != '\0')
{
switch (filterstr[i])
{
case 'r':
case 'R':
filter |= (1 << THREAD_READ);
break;
case 'w':
case 'W':
filter |= (1 << THREAD_WRITE);
break;
case 't':
case 'T':
filter |= (1 << THREAD_TIMER);
break;
case 'e':
case 'E':
filter |= (1 << THREAD_EVENT);
break;
case 'x':
case 'X':
filter |= (1 << THREAD_EXECUTE);
break;
default:
break;
}
++i;
}
return filter;
}
DEFUN (show_thread_cpu,
show_thread_cpu_cmd,
"show thread cpu [FILTER]",
@ -163,138 +264,41 @@ DEFUN (show_thread_cpu,
"Thread CPU usage\n"
"Display filter (rwtexb)\n")
{
int idx_filter = 3;
int i = 0;
thread_type filter = (thread_type) -1U;
int idx = 0;
if (argc > 3)
{
filter = 0;
while (argv[idx_filter]->arg[i] != '\0')
{
switch ( argv[idx_filter]->arg[i] )
{
case 'r':
case 'R':
filter |= (1 << THREAD_READ);
break;
case 'w':
case 'W':
filter |= (1 << THREAD_WRITE);
break;
case 't':
case 'T':
filter |= (1 << THREAD_TIMER);
break;
case 'e':
case 'E':
filter |= (1 << THREAD_EVENT);
break;
case 'x':
case 'X':
filter |= (1 << THREAD_EXECUTE);
break;
default:
break;
}
++i;
}
if (filter == 0)
{
vty_outln (vty, "Invalid filter \"%s\" specified,"
" must contain at least one of 'RWTEXB'",
argv[idx_filter]->arg);
return CMD_WARNING;
}
if (argv_find (argv, argc, "FILTER", &idx)) {
filter = parse_filter (argv[idx]->arg);
if (!filter) {
vty_outln(vty, "Invalid filter \"%s\" specified; must contain at least"
"one of 'RWTEXB'%s", argv[idx]->arg);
return CMD_WARNING;
}
}
cpu_record_print(vty, filter);
return CMD_SUCCESS;
}
static void
cpu_record_hash_clear (struct hash_backet *bucket,
void *args)
{
thread_type *filter = args;
struct cpu_thread_history *a = bucket->data;
if ( !(a->types & *filter) )
return;
pthread_mutex_lock (&cpu_record_mtx);
{
hash_release (cpu_record, bucket->data);
}
pthread_mutex_unlock (&cpu_record_mtx);
}
static void
cpu_record_clear (thread_type filter)
{
thread_type *tmp = &filter;
pthread_mutex_lock (&cpu_record_mtx);
{
hash_iterate (cpu_record,
(void (*) (struct hash_backet*,void*)) cpu_record_hash_clear,
tmp);
}
pthread_mutex_unlock (&cpu_record_mtx);
}
DEFUN (clear_thread_cpu,
clear_thread_cpu_cmd,
"clear thread cpu [FILTER]",
"Clear stored data\n"
"Clear stored data in all pthreads\n"
"Thread information\n"
"Thread CPU usage\n"
"Display filter (rwtexb)\n")
{
int idx_filter = 3;
int i = 0;
thread_type filter = (thread_type) -1U;
int idx = 0;
if (argc > 3)
{
filter = 0;
while (argv[idx_filter]->arg[i] != '\0')
{
switch ( argv[idx_filter]->arg[i] )
{
case 'r':
case 'R':
filter |= (1 << THREAD_READ);
break;
case 'w':
case 'W':
filter |= (1 << THREAD_WRITE);
break;
case 't':
case 'T':
filter |= (1 << THREAD_TIMER);
break;
case 'e':
case 'E':
filter |= (1 << THREAD_EVENT);
break;
case 'x':
case 'X':
filter |= (1 << THREAD_EXECUTE);
break;
default:
break;
}
++i;
}
if (filter == 0)
{
vty_outln (vty, "Invalid filter \"%s\" specified,"
" must contain at least one of 'RWTEXB'",
argv[idx_filter]->arg);
return CMD_WARNING;
}
if (argv_find (argv, argc, "FILTER", &idx)) {
filter = parse_filter (argv[idx]->arg);
if (!filter) {
vty_outln(vty, "Invalid filter \"%s\" specified; must contain at least"
"one of 'RWTEXB'%s", argv[idx]->arg);
return CMD_WARNING;
}
}
cpu_record_clear (filter);
return CMD_SUCCESS;
@ -306,6 +310,8 @@ thread_cmd_init (void)
install_element (VIEW_NODE, &show_thread_cpu_cmd);
install_element (ENABLE_NODE, &clear_thread_cpu_cmd);
}
/* CLI end ------------------------------------------------------------------ */
static int
thread_timer_cmp(void *a, void *b)
@ -334,31 +340,37 @@ cancelreq_del (void *cr)
XFREE (MTYPE_TMP, cr);
}
/* initializer, only ever called once */
static void initializer ()
{
if (!masters)
masters = list_new();
pthread_key_create (&thread_current, NULL);
}
/* Allocate new thread master. */
struct thread_master *
thread_master_create (void)
thread_master_create (const char *name)
{
struct thread_master *rv;
struct rlimit limit;
getrlimit(RLIMIT_NOFILE, &limit);
pthread_mutex_lock (&cpu_record_mtx);
{
if (cpu_record == NULL)
cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
(int (*) (const void *, const void *))
cpu_record_hash_cmp);
}
pthread_mutex_unlock (&cpu_record_mtx);
pthread_once (&init_once, &initializer);
rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
if (rv == NULL)
return NULL;
/* Initialize master mutex */
pthread_mutex_init (&rv->mtx, NULL);
pthread_cond_init (&rv->cancel_cond, NULL);
/* Set name */
rv->name = name ? XSTRDUP (MTYPE_THREAD_MASTER, name) : NULL;
/* Initialize I/O task data structures */
getrlimit(RLIMIT_NOFILE, &limit);
rv->fd_limit = (int)limit.rlim_cur;
rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
if (rv->read == NULL)
@ -366,7 +378,6 @@ thread_master_create (void)
XFREE (MTYPE_THREAD_MASTER, rv);
return NULL;
}
rv->write = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
if (rv->write == NULL)
{
@ -375,20 +386,32 @@ thread_master_create (void)
return NULL;
}
rv->cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
(int (*) (const void *, const void *))
cpu_record_hash_cmp);
/* Initialize the timer queues */
rv->timer = pqueue_create();
rv->timer->cmp = thread_timer_cmp;
rv->timer->update = thread_timer_update;
/* Initialize thread_fetch() settings */
rv->spin = true;
rv->handle_signals = true;
/* Set pthread owner, should be updated by actual owner */
rv->owner = pthread_self();
rv->cancel_req = list_new ();
rv->cancel_req->del = cancelreq_del;
rv->canceled = true;
/* Initialize pipe poker */
pipe (rv->io_pipe);
set_nonblocking (rv->io_pipe[0]);
set_nonblocking (rv->io_pipe[1]);
/* Initialize data structures for poll() */
rv->handler.pfdsize = rv->fd_limit;
rv->handler.pfdcount = 0;
rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER,
@ -396,6 +419,13 @@ thread_master_create (void)
rv->handler.copy = XCALLOC (MTYPE_THREAD_MASTER,
sizeof (struct pollfd) * rv->handler.pfdsize);
/* add to list */
pthread_mutex_lock (&masters_mtx);
{
listnode_add (masters, rv);
}
pthread_mutex_unlock (&masters_mtx);
return rv;
}
@ -545,20 +575,13 @@ thread_master_free (struct thread_master *m)
close (m->io_pipe[1]);
list_delete (m->cancel_req);
hash_clean (m->cpu_record, cpu_record_hash_free);
hash_free (m->cpu_record);
m->cpu_record = NULL;
XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
XFREE (MTYPE_THREAD_MASTER, m->handler.copy);
XFREE (MTYPE_THREAD_MASTER, m);
pthread_mutex_lock (&cpu_record_mtx);
{
if (cpu_record)
{
hash_clean (cpu_record, cpu_record_hash_free);
hash_free (cpu_record);
cpu_record = NULL;
}
}
pthread_mutex_unlock (&cpu_record_mtx);
}
/* Return remain time in second. */
@ -630,12 +653,8 @@ thread_get (struct thread_master *m, u_char type,
{
tmp.func = func;
tmp.funcname = funcname;
pthread_mutex_lock (&cpu_record_mtx);
{
thread->hist = hash_get (cpu_record, &tmp,
(void * (*) (void *))cpu_record_hash_alloc);
}
pthread_mutex_unlock (&cpu_record_mtx);
thread->hist = hash_get (m->cpu_record, &tmp,
(void * (*) (void *))cpu_record_hash_alloc);
}
thread->hist->total_active++;
thread->func = func;
@ -676,7 +695,7 @@ fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
num = poll (pfds, count + 1, timeout);
static unsigned char trash[64];
unsigned char trash[64];
if (num > 0 && pfds[count].revents != 0 && num--)
while (read (m->io_pipe[0], &trash, sizeof (trash)) > 0);
@ -1401,6 +1420,13 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
return fetch;
}
static unsigned long
timeval_elapsed (struct timeval a, struct timeval b)
{
return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
+ (a.tv_usec - b.tv_usec));
}
unsigned long
thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime)
{
@ -1449,8 +1475,6 @@ thread_getrusage (RUSAGE_T *r)
getrusage(RUSAGE_SELF, &(r->cpu));
}
struct thread *thread_current = NULL;
/* We check thread consumed time. If the system has getrusage, we'll
use that to get in-depth stats on the performance of the thread in addition
to wall clock time stats from gettimeofday. */
@ -1463,9 +1487,9 @@ thread_call (struct thread *thread)
GETRUSAGE (&before);
thread->real = before.real;
thread_current = thread;
pthread_setspecific (thread_current, thread);
(*thread->func) (thread);
thread_current = NULL;
pthread_setspecific (thread_current, NULL);
GETRUSAGE (&after);
@ -1518,12 +1542,8 @@ funcname_thread_execute (struct thread_master *m,
tmp.func = dummy.func = func;
tmp.funcname = dummy.funcname = funcname;
pthread_mutex_lock (&cpu_record_mtx);
{
dummy.hist = hash_get (cpu_record, &tmp,
(void * (*) (void *))cpu_record_hash_alloc);
}
pthread_mutex_unlock (&cpu_record_mtx);
dummy.hist = hash_get (m->cpu_record, &tmp,
(void * (*) (void *))cpu_record_hash_alloc);
dummy.schedfrom = schedfrom;
dummy.schedfrom_line = fromln;

View File

@ -71,6 +71,8 @@ struct cancel_req {
/* Master of the theads. */
struct thread_master
{
char *name;
struct thread **read;
struct thread **write;
struct pqueue *timer;
@ -80,6 +82,7 @@ struct thread_master
struct list *cancel_req;
bool canceled;
pthread_cond_t cancel_cond;
struct hash *cpu_record;
int io_pipe[2];
int fd_limit;
struct fd_handler handler;
@ -177,7 +180,7 @@ struct cpu_thread_history
#define thread_execute(m,f,a,v) funcname_thread_execute(m,f,a,v,#f,__FILE__,__LINE__)
/* Prototypes. */
extern struct thread_master *thread_master_create (void);
extern struct thread_master *thread_master_create (const char *);
extern void thread_master_free (struct thread_master *);
extern void thread_master_free_unused(struct thread_master *);
@ -220,6 +223,6 @@ extern unsigned long thread_consumed_time(RUSAGE_T *after, RUSAGE_T *before,
unsigned long *cpu_time_elapsed);
/* only for use in logging functions! */
extern struct thread *thread_current;
extern pthread_key_t thread_current;
#endif /* _ZEBRA_THREAD_H */

View File

@ -326,7 +326,7 @@ main (int argc, char *argv[])
/* Initialization */
zprivs_init (&ospfd_privs);
master = thread_master_create ();
master = thread_master_create(NULL);
/* Open connection to OSPF daemon */
oclient = ospf_apiclient_connect (args[1], ASYNCPORT);

View File

@ -1331,7 +1331,7 @@ main (void)
{
int i = 0;
qobj_init ();
bgp_master_init (thread_master_create ());
bgp_master_init (thread_master_create(NULL));
master = bm->master;
bgp_option_set (BGP_OPT_NO_LISTEN);
bgp_attr_init ();

View File

@ -648,7 +648,7 @@ main (void)
term_bgp_debug_as4 = -1UL;
qobj_init ();
master = thread_master_create ();
master = thread_master_create(NULL);
bgp_master_init (master);
vrf_init (NULL, NULL, NULL, NULL);
bgp_option_set (BGP_OPT_NO_LISTEN);

View File

@ -748,7 +748,7 @@ main (void)
term_bgp_debug_as4 = -1UL;
qobj_init ();
master = thread_master_create ();
master = thread_master_create(NULL);
bgp_master_init (master);
vrf_init (NULL, NULL, NULL, NULL);
bgp_option_set (BGP_OPT_NO_LISTEN);

View File

@ -376,7 +376,7 @@ static int
global_test_init (void)
{
qobj_init ();
master = thread_master_create ();
master = thread_master_create(NULL);
zclient = zclient_new(master);
bgp_master_init (master);
vrf_init (NULL, NULL, NULL, NULL);

View File

@ -116,7 +116,7 @@ main (int argc, char **argv)
progname = ((p = strrchr (argv[0], '/')) ? ++p : argv[0]);
/* master init. */
master = thread_master_create ();
master = thread_master_create(NULL);
while (1)
{

View File

@ -67,7 +67,7 @@ main (int argc, char **argv)
umask (0027);
/* master init. */
master = thread_master_create ();
master = thread_master_create(NULL);
openzlog("common-cli", "NONE", 0, LOG_CONS | LOG_NDELAY | LOG_PID,
LOG_DAEMON);

View File

@ -45,7 +45,7 @@ threadfunc (struct thread *thread)
int
main (void)
{
master = thread_master_create ();
master = thread_master_create(NULL);
signal_init (master, array_size(sigs), sigs);
openzlog("testsegv", "NONE", 0, LOG_CONS | LOG_NDELAY | LOG_PID, LOG_DAEMON);

View File

@ -61,7 +61,7 @@ struct thread t;
int
main (void)
{
master = thread_master_create ();
master = thread_master_create(NULL);
signal_init (master, array_size(sigs), sigs);
openzlog("testsig", "NONE", 0, LOG_CONS | LOG_NDELAY | LOG_PID, LOG_DAEMON);

View File

@ -115,7 +115,7 @@ int main(int argc, char **argv)
struct thread t;
struct timeval **alarms;
master = thread_master_create();
master = thread_master_create(NULL);
log_buf_len = SCHEDULE_TIMERS * (TIMESTR_LEN + 1) + 1;
log_buf_pos = 0;

View File

@ -49,7 +49,7 @@ int main(int argc, char **argv)
struct timeval tv_start, tv_lap, tv_stop;
unsigned long t_schedule, t_remove;
master = thread_master_create();
master = thread_master_create(NULL);
prng = prng_new(0);
timers = calloc(SCHEDULE_TIMERS, sizeof(*timers));

View File

@ -140,7 +140,7 @@ int main (int argc, char *argv[])
printf ("Sequence to be tested: %s\n", sequence);
master = thread_master_create();
master = thread_master_create(NULL);
init_zclient (master, ZSERV_PATH);
zebra_send_label_manager_connect ();

View File

@ -200,7 +200,7 @@ main (int argc, char **argv)
if (argc == 1)
usage_exit ();
master = thread_master_create();
master = thread_master_create(NULL);
/* Establish connection to zebra. */
zclient = zclient_new(master);
zclient->enable = 1;