lib: fix data race in thread history collection

Thread statistics are collected and stored in a hashtable shared across
threads, but while the hashtable itself is protected by a mutex, the
records themselves were not being updated safely. Change all thread
history collection to use atomic operations.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
This commit is contained in:
Quentin Young 2018-04-20 17:27:16 -04:00
parent 8aeee10fe2
commit fbcac826c9
2 changed files with 84 additions and 39 deletions

View File

@ -32,6 +32,7 @@
#include "sigevent.h" #include "sigevent.h"
#include "network.h" #include "network.h"
#include "jhash.h" #include "jhash.h"
#include "frratomic.h"
DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread") DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master") DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
@ -104,25 +105,41 @@ static void vty_out_cpu_thread_history(struct vty *vty,
static void cpu_record_hash_print(struct hash_backet *bucket, void *args[]) static void cpu_record_hash_print(struct hash_backet *bucket, void *args[])
{ {
struct cpu_thread_history *totals = args[0]; struct cpu_thread_history *totals = args[0];
struct cpu_thread_history copy;
struct vty *vty = args[1]; struct vty *vty = args[1];
thread_type *filter = args[2]; uint8_t *filter = args[2];
struct cpu_thread_history *a = bucket->data; struct cpu_thread_history *a = bucket->data;
if (!(a->types & *filter)) copy.total_active =
atomic_load_explicit(&a->total_active, memory_order_seq_cst);
copy.total_calls =
atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
copy.cpu.total =
atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
copy.real.total =
atomic_load_explicit(&a->real.total, memory_order_seq_cst);
copy.real.max =
atomic_load_explicit(&a->real.max, memory_order_seq_cst);
copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
copy.funcname = a->funcname;
if (!(copy.types & *filter))
return; return;
vty_out_cpu_thread_history(vty, a);
totals->total_active += a->total_active; vty_out_cpu_thread_history(vty, &copy);
totals->total_calls += a->total_calls; totals->total_active += copy.total_active;
totals->real.total += a->real.total; totals->total_calls += copy.total_calls;
if (totals->real.max < a->real.max) totals->real.total += copy.real.total;
totals->real.max = a->real.max; if (totals->real.max < copy.real.max)
totals->cpu.total += a->cpu.total; totals->real.max = copy.real.max;
if (totals->cpu.max < a->cpu.max) totals->cpu.total += copy.cpu.total;
totals->cpu.max = a->cpu.max; if (totals->cpu.max < copy.cpu.max)
totals->cpu.max = copy.cpu.max;
} }
static void cpu_record_print(struct vty *vty, thread_type filter) static void cpu_record_print(struct vty *vty, uint8_t filter)
{ {
struct cpu_thread_history tmp; struct cpu_thread_history tmp;
void *args[3] = {&tmp, vty, &filter}; void *args[3] = {&tmp, vty, &filter};
@ -183,7 +200,7 @@ static void cpu_record_print(struct vty *vty, thread_type filter)
static void cpu_record_hash_clear(struct hash_backet *bucket, void *args[]) static void cpu_record_hash_clear(struct hash_backet *bucket, void *args[])
{ {
thread_type *filter = args[0]; uint8_t *filter = args[0];
struct hash *cpu_record = args[1]; struct hash *cpu_record = args[1];
struct cpu_thread_history *a = bucket->data; struct cpu_thread_history *a = bucket->data;
@ -194,9 +211,9 @@ static void cpu_record_hash_clear(struct hash_backet *bucket, void *args[])
hash_release(cpu_record, bucket->data); hash_release(cpu_record, bucket->data);
} }
static void cpu_record_clear(thread_type filter) static void cpu_record_clear(uint8_t filter)
{ {
thread_type *tmp = &filter; uint8_t *tmp = &filter;
struct thread_master *m; struct thread_master *m;
struct listnode *ln; struct listnode *ln;
@ -218,7 +235,7 @@ static void cpu_record_clear(thread_type filter)
pthread_mutex_unlock(&masters_mtx); pthread_mutex_unlock(&masters_mtx);
} }
static thread_type parse_filter(const char *filterstr) static uint8_t parse_filter(const char *filterstr)
{ {
int i = 0; int i = 0;
int filter = 0; int filter = 0;
@ -261,7 +278,7 @@ DEFUN (show_thread_cpu,
"Thread CPU usage\n" "Thread CPU usage\n"
"Display filter (rwtexb)\n") "Display filter (rwtexb)\n")
{ {
thread_type filter = (thread_type)-1U; uint8_t filter = (uint8_t)-1U;
int idx = 0; int idx = 0;
if (argv_find(argv, argc, "FILTER", &idx)) { if (argv_find(argv, argc, "FILTER", &idx)) {
@ -287,7 +304,7 @@ DEFUN (clear_thread_cpu,
"Thread CPU usage\n" "Thread CPU usage\n"
"Display filter (rwtexb)\n") "Display filter (rwtexb)\n")
{ {
thread_type filter = (thread_type)-1U; uint8_t filter = (uint8_t)-1U;
int idx = 0; int idx = 0;
if (argv_find(argv, argc, "FILTER", &idx)) { if (argv_find(argv, argc, "FILTER", &idx)) {
@ -1492,12 +1509,22 @@ void thread_getrusage(RUSAGE_T *r)
getrusage(RUSAGE_SELF, &(r->cpu)); getrusage(RUSAGE_SELF, &(r->cpu));
} }
/* We check thread consumed time. If the system has getrusage, we'll /*
use that to get in-depth stats on the performance of the thread in addition * Call a thread.
to wall clock time stats from gettimeofday. */ *
* This function will atomically update the thread's usage history. At present
* this is the only spot where usage history is written. Nevertheless the code
* has been written such that the introduction of writers in the future should
* not need to update it provided the writers atomically perform only the
* operations done here, i.e. updating the total and maximum times. In
* particular, the maximum real and cpu times must be monotonically increasing
* or this code is not correct.
*/
void thread_call(struct thread *thread) void thread_call(struct thread *thread)
{ {
unsigned long realtime, cputime; _Atomic unsigned long realtime, cputime;
unsigned long exp;
unsigned long helper;
RUSAGE_T before, after; RUSAGE_T before, after;
GETRUSAGE(&before); GETRUSAGE(&before);
@ -1509,16 +1536,35 @@ void thread_call(struct thread *thread)
GETRUSAGE(&after); GETRUSAGE(&after);
realtime = thread_consumed_time(&after, &before, &cputime); realtime = thread_consumed_time(&after, &before, &helper);
thread->hist->real.total += realtime; cputime = helper;
if (thread->hist->real.max < realtime)
thread->hist->real.max = realtime;
thread->hist->cpu.total += cputime;
if (thread->hist->cpu.max < cputime)
thread->hist->cpu.max = cputime;
++(thread->hist->total_calls); /* update realtime */
thread->hist->types |= (1 << thread->add_type); atomic_fetch_add_explicit(&thread->hist->real.total, realtime,
memory_order_seq_cst);
exp = atomic_load_explicit(&thread->hist->real.max,
memory_order_seq_cst);
while (exp < realtime
&& !atomic_compare_exchange_weak_explicit(
&thread->hist->real.max, &exp, realtime,
memory_order_seq_cst, memory_order_seq_cst))
;
/* update cputime */
atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
memory_order_seq_cst);
exp = atomic_load_explicit(&thread->hist->cpu.max,
memory_order_seq_cst);
while (exp < cputime
&& !atomic_compare_exchange_weak_explicit(
&thread->hist->cpu.max, &exp, cputime,
memory_order_seq_cst, memory_order_seq_cst))
;
atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
memory_order_seq_cst);
atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
memory_order_seq_cst);
#ifdef CONSUMED_TIME_CHECK #ifdef CONSUMED_TIME_CHECK
if (realtime > CONSUMED_TIME_CHECK) { if (realtime > CONSUMED_TIME_CHECK) {

View File

@ -25,6 +25,7 @@
#include <pthread.h> #include <pthread.h>
#include <poll.h> #include <poll.h>
#include "monotime.h" #include "monotime.h"
#include "frratomic.h"
struct rusage_t { struct rusage_t {
struct rusage cpu; struct rusage cpu;
@ -91,12 +92,10 @@ struct thread_master {
pthread_t owner; pthread_t owner;
}; };
typedef unsigned char thread_type;
/* Thread itself. */ /* Thread itself. */
struct thread { struct thread {
thread_type type; /* thread type */ uint8_t type; /* thread type */
thread_type add_type; /* thread type */ uint8_t add_type; /* thread type */
struct thread *next; /* next pointer of the thread */ struct thread *next; /* next pointer of the thread */
struct thread *prev; /* previous pointer of the thread */ struct thread *prev; /* previous pointer of the thread */
struct thread **ref; /* external reference (if given) */ struct thread **ref; /* external reference (if given) */
@ -120,13 +119,13 @@ struct thread {
struct cpu_thread_history { struct cpu_thread_history {
int (*func)(struct thread *); int (*func)(struct thread *);
unsigned int total_calls; _Atomic unsigned int total_calls;
unsigned int total_active; _Atomic unsigned int total_active;
struct time_stats { struct time_stats {
unsigned long total, max; _Atomic unsigned long total, max;
} real; } real;
struct time_stats cpu; struct time_stats cpu;
thread_type types; _Atomic uint8_t types;
const char *funcname; const char *funcname;
}; };