Merge pull request #3010 from opensourcerouting/no-frr-thread-id

lib: frr_pthread minor simplification
This commit is contained in:
Lou Berger 2018-09-22 15:02:09 -04:00 committed by GitHub
commit d127c61aeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 69 additions and 158 deletions

View File

@ -23,7 +23,7 @@
#include <zebra.h>
#include <pthread.h> // for pthread_mutex_unlock, pthread_mutex_lock
#include "frr_pthread.h" // for frr_pthread_get, frr_pthread
#include "frr_pthread.h"
#include "linklist.h" // for list_delete, list_delete_all_node, lis...
#include "log.h" // for zlog_debug, safe_strerror, zlog_err
#include "memory.h" // for MTYPE_TMP, XCALLOC, XFREE
@ -56,7 +56,7 @@ static bool validate_header(struct peer *);
void bgp_writes_on(struct peer *peer)
{
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
struct frr_pthread *fpt = bgp_pth_io;
assert(fpt->running);
assert(peer->status != Deleted);
@ -74,7 +74,7 @@ void bgp_writes_on(struct peer *peer)
void bgp_writes_off(struct peer *peer)
{
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
struct frr_pthread *fpt = bgp_pth_io;
assert(fpt->running);
thread_cancel_async(fpt->master, &peer->t_write, NULL);
@ -85,7 +85,7 @@ void bgp_writes_off(struct peer *peer)
void bgp_reads_on(struct peer *peer)
{
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
struct frr_pthread *fpt = bgp_pth_io;
assert(fpt->running);
assert(peer->status != Deleted);
@ -105,7 +105,7 @@ void bgp_reads_on(struct peer *peer)
void bgp_reads_off(struct peer *peer)
{
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
struct frr_pthread *fpt = bgp_pth_io;
assert(fpt->running);
thread_cancel_async(fpt->master, &peer->t_read, NULL);
@ -130,7 +130,7 @@ static int bgp_process_writes(struct thread *thread)
if (peer->fd < 0)
return -1;
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
struct frr_pthread *fpt = bgp_pth_io;
pthread_mutex_lock(&peer->io_mtx);
{
@ -182,7 +182,7 @@ static int bgp_process_reads(struct thread *thread)
if (peer->fd < 0 || bm->terminating)
return -1;
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
struct frr_pthread *fpt = bgp_pth_io;
pthread_mutex_lock(&peer->io_mtx);
{

View File

@ -229,7 +229,7 @@ void bgp_keepalives_on(struct peer *peer)
if (CHECK_FLAG(peer->thread_flags, PEER_THREAD_KEEPALIVES_ON))
return;
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES);
struct frr_pthread *fpt = bgp_pth_ka;
assert(fpt->running);
/* placeholder bucket data to use for fast key lookups */
@ -259,7 +259,7 @@ void bgp_keepalives_off(struct peer *peer)
if (!CHECK_FLAG(peer->thread_flags, PEER_THREAD_KEEPALIVES_ON))
return;
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES);
struct frr_pthread *fpt = bgp_pth_ka;
assert(fpt->running);
/* placeholder bucket data to use for fast key lookups */

View File

@ -7750,35 +7750,36 @@ static const struct cmd_variable_handler bgp_viewvrf_var_handlers[] = {
{.completions = NULL},
};
struct frr_pthread *bgp_pth_io;
struct frr_pthread *bgp_pth_ka;
static void bgp_pthreads_init()
{
assert(!bgp_pth_io);
assert(!bgp_pth_ka);
frr_pthread_init();
struct frr_pthread_attr io = {
.id = PTHREAD_IO,
.start = frr_pthread_attr_default.start,
.stop = frr_pthread_attr_default.stop,
};
struct frr_pthread_attr ka = {
.id = PTHREAD_KEEPALIVES,
.start = bgp_keepalives_start,
.stop = bgp_keepalives_stop,
};
frr_pthread_new(&io, "BGP I/O thread", "bgpd_io");
frr_pthread_new(&ka, "BGP Keepalives thread", "bgpd_ka");
bgp_pth_io = frr_pthread_new(&io, "BGP I/O thread", "bgpd_io");
bgp_pth_ka = frr_pthread_new(&ka, "BGP Keepalives thread", "bgpd_ka");
}
void bgp_pthreads_run()
{
struct frr_pthread *io = frr_pthread_get(PTHREAD_IO);
struct frr_pthread *ka = frr_pthread_get(PTHREAD_KEEPALIVES);
frr_pthread_run(io, NULL);
frr_pthread_run(ka, NULL);
frr_pthread_run(bgp_pth_io, NULL);
frr_pthread_run(bgp_pth_ka, NULL);
/* Wait until threads are ready. */
frr_pthread_wait_running(io);
frr_pthread_wait_running(ka);
frr_pthread_wait_running(bgp_pth_io);
frr_pthread_wait_running(bgp_pth_ka);
}
void bgp_pthreads_finish()

View File

@ -24,6 +24,7 @@
#include "qobj.h"
#include <pthread.h>
#include "frr_pthread.h"
#include "lib/json.h"
#include "vrf.h"
#include "vty.h"
@ -97,6 +98,9 @@ enum bgp_af_index {
for (afi = AFI_IP; afi < AFI_MAX; afi++) \
for (safi = SAFI_UNICAST; safi < SAFI_MAX; safi++)
extern struct frr_pthread *bgp_pth_io;
extern struct frr_pthread *bgp_pth_ka;
/* BGP master for system wide configurations and variables. */
struct bgp_master {
/* BGP instance list. */
@ -105,10 +109,6 @@ struct bgp_master {
/* BGP thread master. */
struct thread_master *master;
/* BGP pthreads. */
#define PTHREAD_IO (1 << 1)
#define PTHREAD_KEEPALIVES (1 << 2)
/* work queues */
struct work_queue *process_main_queue;

View File

@ -26,108 +26,77 @@
#include "frr_pthread.h"
#include "memory.h"
#include "hash.h"
#include "linklist.h"
DEFINE_MTYPE(LIB, FRR_PTHREAD, "FRR POSIX Thread");
DEFINE_MTYPE(LIB, PTHREAD_PRIM, "POSIX synchronization primitives");
/* id for next created pthread */
static _Atomic uint32_t next_id = 0;
/* default frr_pthread start/stop routine prototypes */
static void *fpt_run(void *arg);
static int fpt_halt(struct frr_pthread *fpt, void **res);
/* default frr_pthread attributes */
struct frr_pthread_attr frr_pthread_attr_default = {
.id = 0,
.start = fpt_run,
.stop = fpt_halt,
};
/* hash table to keep track of all frr_pthreads */
static struct hash *frr_pthread_hash;
static pthread_mutex_t frr_pthread_hash_mtx = PTHREAD_MUTEX_INITIALIZER;
/* frr_pthread_hash->hash_cmp */
static int frr_pthread_hash_cmp(const void *value1, const void *value2)
{
const struct frr_pthread *tq1 = value1;
const struct frr_pthread *tq2 = value2;
return (tq1->attr.id == tq2->attr.id);
}
/* frr_pthread_hash->hash_key */
static unsigned int frr_pthread_hash_key(void *value)
{
return ((struct frr_pthread *)value)->attr.id;
}
/* list to keep track of all frr_pthreads */
static pthread_mutex_t frr_pthread_list_mtx = PTHREAD_MUTEX_INITIALIZER;
static struct list *frr_pthread_list;
/* ------------------------------------------------------------------------ */
void frr_pthread_init()
{
pthread_mutex_lock(&frr_pthread_hash_mtx);
pthread_mutex_lock(&frr_pthread_list_mtx);
{
frr_pthread_hash = hash_create(frr_pthread_hash_key,
frr_pthread_hash_cmp, NULL);
frr_pthread_list = list_new();
frr_pthread_list->del = (void (*)(void *))&frr_pthread_destroy;
}
pthread_mutex_unlock(&frr_pthread_hash_mtx);
pthread_mutex_unlock(&frr_pthread_list_mtx);
}
void frr_pthread_finish()
{
pthread_mutex_lock(&frr_pthread_hash_mtx);
pthread_mutex_lock(&frr_pthread_list_mtx);
{
hash_clean(frr_pthread_hash,
(void (*)(void *))frr_pthread_destroy);
hash_free(frr_pthread_hash);
list_delete_and_null(&frr_pthread_list);
}
pthread_mutex_unlock(&frr_pthread_hash_mtx);
pthread_mutex_unlock(&frr_pthread_list_mtx);
}
struct frr_pthread *frr_pthread_new(struct frr_pthread_attr *attr,
const char *name, const char *os_name)
{
static struct frr_pthread holder = {};
struct frr_pthread *fpt = NULL;
attr = attr ? attr : &frr_pthread_attr_default;
pthread_mutex_lock(&frr_pthread_hash_mtx);
fpt = XCALLOC(MTYPE_FRR_PTHREAD, sizeof(struct frr_pthread));
/* initialize mutex */
pthread_mutex_init(&fpt->mtx, NULL);
/* create new thread master */
fpt->master = thread_master_create(name);
/* set attributes */
fpt->attr = *attr;
name = (name ? name : "Anonymous thread");
fpt->name = XSTRDUP(MTYPE_FRR_PTHREAD, name);
if (os_name)
snprintf(fpt->os_name, OS_THREAD_NAMELEN, "%s", os_name);
/* initialize startup synchronization primitives */
fpt->running_cond_mtx = XCALLOC(
MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t));
fpt->running_cond = XCALLOC(MTYPE_PTHREAD_PRIM,
sizeof(pthread_cond_t));
pthread_mutex_init(fpt->running_cond_mtx, NULL);
pthread_cond_init(fpt->running_cond, NULL);
pthread_mutex_lock(&frr_pthread_list_mtx);
{
holder.attr.id = attr->id;
if (!hash_lookup(frr_pthread_hash, &holder)) {
fpt = XCALLOC(MTYPE_FRR_PTHREAD,
sizeof(struct frr_pthread));
/* initialize mutex */
pthread_mutex_init(&fpt->mtx, NULL);
/* create new thread master */
fpt->master = thread_master_create(name);
/* set attributes */
fpt->attr = *attr;
name = (name ? name : "Anonymous thread");
fpt->name = XSTRDUP(MTYPE_FRR_PTHREAD, name);
if (os_name)
snprintf(fpt->os_name, OS_THREAD_NAMELEN,
"%s", os_name);
if (attr == &frr_pthread_attr_default)
fpt->attr.id = frr_pthread_get_id();
/* initialize startup synchronization primitives */
fpt->running_cond_mtx = XCALLOC(
MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t));
fpt->running_cond = XCALLOC(MTYPE_PTHREAD_PRIM,
sizeof(pthread_cond_t));
pthread_mutex_init(fpt->running_cond_mtx, NULL);
pthread_cond_init(fpt->running_cond, NULL);
/* insert into global thread hash */
hash_get(frr_pthread_hash, fpt, hash_alloc_intern);
}
listnode_add(frr_pthread_list, fpt);
}
pthread_mutex_unlock(&frr_pthread_hash_mtx);
pthread_mutex_unlock(&frr_pthread_list_mtx);
return fpt;
}
@ -180,21 +149,6 @@ int frr_pthread_set_name(struct frr_pthread *fpt, const char *name,
return ret;
}
struct frr_pthread *frr_pthread_get(uint32_t id)
{
static struct frr_pthread holder = {};
struct frr_pthread *fpt;
pthread_mutex_lock(&frr_pthread_hash_mtx);
{
holder.attr.id = id;
fpt = hash_lookup(frr_pthread_hash, &holder);
}
pthread_mutex_unlock(&frr_pthread_hash_mtx);
return fpt;
}
int frr_pthread_run(struct frr_pthread *fpt, const pthread_attr_t *attr)
{
int ret;
@ -239,36 +193,16 @@ int frr_pthread_stop(struct frr_pthread *fpt, void **result)
return ret;
}
/*
* Callback for hash_iterate to stop all frr_pthread's.
*/
static void frr_pthread_stop_all_iter(struct hash_backet *hb, void *arg)
{
struct frr_pthread *fpt = hb->data;
frr_pthread_stop(fpt, NULL);
}
void frr_pthread_stop_all()
{
pthread_mutex_lock(&frr_pthread_hash_mtx);
pthread_mutex_lock(&frr_pthread_list_mtx);
{
hash_iterate(frr_pthread_hash, frr_pthread_stop_all_iter, NULL);
struct listnode *n;
struct frr_pthread *fpt;
for (ALL_LIST_ELEMENTS_RO(frr_pthread_list, n, fpt))
frr_pthread_stop(fpt, NULL);
}
pthread_mutex_unlock(&frr_pthread_hash_mtx);
}
uint32_t frr_pthread_get_id(void)
{
_Atomic uint32_t nxid;
nxid = atomic_fetch_add_explicit(&next_id, 1, memory_order_seq_cst);
/* just a sanity check, this should never happen */
assert(nxid <= (UINT32_MAX - 1));
return nxid;
}
void frr_pthread_yield(void)
{
(void)sched_yield();
pthread_mutex_unlock(&frr_pthread_list_mtx);
}
/*

View File

@ -34,7 +34,6 @@ struct frr_pthread;
struct frr_pthread_attr;
struct frr_pthread_attr {
_Atomic uint32_t id;
void *(*start)(void *);
int (*stop)(struct frr_pthread *, void **);
};
@ -154,13 +153,6 @@ int frr_pthread_set_name(struct frr_pthread *fpt, const char *name,
*/
void frr_pthread_destroy(struct frr_pthread *fpt);
/*
* Gets an existing frr_pthread by its id.
*
* @return frr_thread associated with the provided id, or NULL on error
*/
struct frr_pthread *frr_pthread_get(uint32_t id);
/*
* Creates a new pthread and binds it to a frr_pthread.
*
@ -218,22 +210,6 @@ int frr_pthread_stop(struct frr_pthread *fpt, void **result);
/* Stops all frr_pthread's. */
void frr_pthread_stop_all(void);
/* Yields the current thread of execution */
void frr_pthread_yield(void);
/*
* Returns a unique identifier for use with frr_pthread_new().
*
* Internally, this is an integer that increments after each call to this
* function. Because the number of pthreads created should never exceed INT_MAX
* during the life of the program, there is no overflow protection. If by
* chance this function returns an ID which is already in use,
* frr_pthread_new() will fail when it is provided.
*
* @return unique identifier
*/
uint32_t frr_pthread_get_id(void);
#ifndef HAVE_PTHREAD_CONDATTR_SETCLOCK
#define pthread_condattr_setclock(A, B)
#endif

View File

@ -1273,9 +1273,6 @@ static int handle_attr_test(struct aspath_tests *t)
struct aspath *asp;
size_t datalen;
bgp_pthreads_init();
frr_pthread_get(PTHREAD_KEEPALIVES)->running = true;
asp = make_aspath(t->segment->asdata, t->segment->len, 0);
peer.curr = stream_new(BGP_MAX_PACKET_SIZE);
@ -1382,6 +1379,9 @@ int main(void)
i = 0;
bgp_pthreads_init();
bgp_pth_ka->running = true;
while (aspath_tests[i].desc) {
printf("aspath_attr test %d\n", i);
attr_test(&aspath_tests[i++]);

View File

@ -917,7 +917,7 @@ int main(void)
bgp_option_set(BGP_OPT_NO_LISTEN);
bgp_pthreads_init();
frr_pthread_get(PTHREAD_KEEPALIVES)->running = true;
bgp_pth_ka->running = true;
if (fileno(stdout) >= 0)
tty = isatty(fileno(stdout));

View File

@ -359,6 +359,7 @@ our $InitAttribute = qr{$InitAttributeData|$InitAttributeConst|$InitAttributeIni
# We need \b after 'init' otherwise 'initconst' will cause a false positive in a check
our $Attribute = qr{
const|
_Atomic|
__percpu|
__nocast|
__safe|

View File

@ -703,7 +703,6 @@ static struct zserv *zserv_client_create(int sock)
listnode_add(zebrad.client_list, client);
struct frr_pthread_attr zclient_pthr_attrs = {
.id = frr_pthread_get_id(),
.start = frr_pthread_attr_default.start,
.stop = frr_pthread_attr_default.stop
};