diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c index f4bfc90b7e..59b2d1cdaa 100644 --- a/bgpd/bgp_io.c +++ b/bgpd/bgp_io.c @@ -51,100 +51,12 @@ static bool validate_header(struct peer *); #define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred #define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error -/* Plumbing & control variables for thread lifecycle - * ------------------------------------------------------------------------ */ -bool bgp_io_thread_run; -pthread_mutex_t *running_cond_mtx; -pthread_cond_t *running_cond; - -/* Unused callback for thread_add_read() */ -static int bgp_io_dummy(struct thread *thread) { return 0; } - -/* Poison pill task */ -static int bgp_io_finish(struct thread *thread) -{ - bgp_io_thread_run = false; - return 0; -} - -/* Extern lifecycle control functions. init -> start -> stop - * ------------------------------------------------------------------------ */ -void bgp_io_init() -{ - bgp_io_thread_run = false; - - running_cond_mtx = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t)); - running_cond = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_cond_t)); - - pthread_mutex_init(running_cond_mtx, NULL); - pthread_cond_init(running_cond, NULL); - - /* unlocked in bgp_io_wait_running() */ - pthread_mutex_lock(running_cond_mtx); -} - -void *bgp_io_start(void *arg) -{ - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - fpt->master->owner = pthread_self(); - - // fd so we can sleep in poll() - int sleeper[2]; - pipe(sleeper); - thread_add_read(fpt->master, &bgp_io_dummy, NULL, sleeper[0], NULL); - - // we definitely don't want to handle signals - fpt->master->handle_signals = false; - - struct thread task; - - pthread_mutex_lock(running_cond_mtx); - { - bgp_io_thread_run = true; - pthread_cond_signal(running_cond); - } - pthread_mutex_unlock(running_cond_mtx); - - while (bgp_io_thread_run) { - if (thread_fetch(fpt->master, &task)) { - thread_call(&task); - } - } - - close(sleeper[1]); - close(sleeper[0]); - - return NULL; -} - -void bgp_io_wait_running() -{ - while (!bgp_io_thread_run) - pthread_cond_wait(running_cond, running_cond_mtx); - - /* locked in bgp_io_init() */ - pthread_mutex_unlock(running_cond_mtx); -} - -int bgp_io_stop(void **result, struct frr_pthread *fpt) -{ - thread_add_event(fpt->master, &bgp_io_finish, NULL, 0, NULL); - pthread_join(fpt->thread, result); - - pthread_mutex_destroy(running_cond_mtx); - pthread_cond_destroy(running_cond); - - XFREE(MTYPE_PTHREAD_PRIM, running_cond_mtx); - XFREE(MTYPE_PTHREAD_PRIM, running_cond); - - return 0; -} - -/* Extern API -------------------------------------------------------------- */ +/* Thread external API ----------------------------------------------------- */ void bgp_writes_on(struct peer *peer) { - assert(bgp_io_thread_run); + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + assert(fpt->running); assert(peer->status != Deleted); assert(peer->obuf); @@ -154,8 +66,6 @@ void bgp_writes_on(struct peer *peer) assert(!peer->t_connect_check_w); assert(peer->fd); - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd, &peer->t_write); SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); @@ -163,9 +73,8 @@ void bgp_writes_on(struct peer *peer) void bgp_writes_off(struct peer *peer) { - assert(bgp_io_thread_run); - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + assert(fpt->running); thread_cancel_async(fpt->master, &peer->t_write, NULL); THREAD_OFF(peer->t_generate_updgrp_packets); @@ -175,7 +84,8 @@ void bgp_writes_off(struct peer *peer) void bgp_reads_on(struct peer *peer) { - assert(bgp_io_thread_run); + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + assert(fpt->running); assert(peer->status != Deleted); assert(peer->ibuf); @@ -186,8 +96,6 @@ void bgp_reads_on(struct peer *peer) assert(!peer->t_connect_check_w); assert(peer->fd); - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, &peer->t_read); @@ -196,9 +104,8 @@ void bgp_reads_on(struct peer *peer) void bgp_reads_off(struct peer *peer) { - assert(bgp_io_thread_run); - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + assert(fpt->running); thread_cancel_async(fpt->master, &peer->t_read, NULL); THREAD_OFF(peer->t_process_packet); @@ -206,9 +113,9 @@ void bgp_reads_off(struct peer *peer) UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); } -/* Internal functions ------------------------------------------------------- */ +/* Thread internal functions ----------------------------------------------- */ -/** +/* * Called from I/O pthread when a file descriptor has become ready for writing. */ static int bgp_process_writes(struct thread *thread) @@ -231,11 +138,13 @@ static int bgp_process_writes(struct thread *thread) } pthread_mutex_unlock(&peer->io_mtx); - if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */ + /* no problem */ + if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { } + /* problem */ if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) { - reschedule = false; /* problem */ + reschedule = false; fatal = true; } @@ -250,7 +159,7 @@ static int bgp_process_writes(struct thread *thread) return 0; } -/** +/* * Called from I/O pthread when a file descriptor has become ready for reading, * or has hung up. * @@ -321,8 +230,10 @@ static int bgp_process_reads(struct thread *thread) /* if this fails we are seriously screwed */ assert(pktsize <= BGP_MAX_PACKET_SIZE); - /* If we have that much data, chuck it into its own - * stream and append to input queue for processing. */ + /* + * If we have that much data, chuck it into its own + * stream and append to input queue for processing. + */ if (ringbuf_remain(ibw) >= pktsize) { struct stream *pkt = stream_new(pktsize); assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize); @@ -356,7 +267,7 @@ static int bgp_process_reads(struct thread *thread) return 0; } -/** +/* * Flush peer output buffer. * * This function pops packets off of peer->obuf and writes them to peer->fd. @@ -379,7 +290,6 @@ static uint16_t bgp_write(struct peer *peer) uint16_t status = 0; uint32_t wpkt_quanta_old; - // cache current write quanta wpkt_quanta_old = atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed); @@ -398,7 +308,7 @@ static uint16_t bgp_write(struct peer *peer) } goto done; - } else if (num != writenum) // incomplete write + } else if (num != writenum) stream_forward_getp(s, num); } while (num != writenum); @@ -427,8 +337,10 @@ static uint16_t bgp_write(struct peer *peer) if (peer->v_start >= (60 * 2)) peer->v_start = (60 * 2); - /* Handle Graceful Restart case where the state changes - * to Connect instead of Idle */ + /* + * Handle Graceful Restart case where the state changes + * to Connect instead of Idle. + */ BGP_EVENT_ADD(peer, BGP_Stop); goto done; @@ -472,7 +384,7 @@ done : { return status; } -/** +/* * Reads a chunk of data from peer->fd into peer->ibuf_work. * * @return status flag (see top-of-file) diff --git a/bgpd/bgp_io.h b/bgpd/bgp_io.h index 7cfd1db710..14a12d3705 100644 --- a/bgpd/bgp_io.h +++ b/bgpd/bgp_io.h @@ -28,14 +28,6 @@ #include "bgpd/bgpd.h" #include "frr_pthread.h" -/** - * Initializes data structures and flags for the write thread. - * - * This function must be called from the main thread before - * bgp_writes_start() is invoked. - */ -extern void bgp_io_init(void); - /** * Start function for write thread. * @@ -43,15 +35,6 @@ extern void bgp_io_init(void); */ extern void *bgp_io_start(void *arg); -/** - * Wait until the IO thread is ready to accept jobs. - * - * This function must be called immediately after the thread has been created - * for running. Use of other functions before calling this one will result in - * undefined behavior. - */ -extern void bgp_io_wait_running(void); - /** * Start function for write thread. * diff --git a/bgpd/bgp_keepalives.c b/bgpd/bgp_keepalives.c index afa280a799..5a48c7013e 100644 --- a/bgpd/bgp_keepalives.c +++ b/bgpd/bgp_keepalives.c @@ -36,14 +36,14 @@ #include "bgpd/bgp_keepalives.h" /* clang-format on */ -/** +/* * Peer KeepAlive Timer. * Associates a peer with the time of its last keepalive. */ struct pkat { - // the peer to send keepalives to + /* the peer to send keepalives to */ struct peer *peer; - // absolute time of last keepalive sent + /* absolute time of last keepalive sent */ struct timeval last; }; @@ -52,9 +52,6 @@ static pthread_mutex_t *peerhash_mtx; static pthread_cond_t *peerhash_cond; static struct hash *peerhash; -/* Thread control flag. */ -bool bgp_keepalives_thread_run = false; - static struct pkat *pkat_new(struct peer *peer) { struct pkat *pkat = XMALLOC(MTYPE_TMP, sizeof(struct pkat)); @@ -100,10 +97,10 @@ static void peer_process(struct hash_backet *hb, void *arg) static struct timeval tolerance = {0, 100000}; - // calculate elapsed time since last keepalive + /* calculate elapsed time since last keepalive */ monotime_since(&pkat->last, &elapsed); - // calculate difference between elapsed time and configured time + /* calculate difference between elapsed time and configured time */ ka.tv_sec = pkat->peer->v_keepalive; timersub(&ka, &elapsed, &diff); @@ -118,10 +115,10 @@ static void peer_process(struct hash_backet *hb, void *arg) bgp_keepalive_send(pkat->peer); monotime(&pkat->last); memset(&elapsed, 0x00, sizeof(struct timeval)); - diff = ka; // time until next keepalive == peer keepalive time + diff = ka; } - // if calculated next update for this peer < current delay, use it + /* if calculated next update for this peer < current delay, use it */ if (next_update->tv_sec <= 0 || timercmp(&diff, next_update, <)) *next_update = diff; } @@ -139,29 +136,9 @@ static unsigned int peer_hash_key(void *arg) return (uintptr_t)pkat->peer; } -void bgp_keepalives_init() -{ - peerhash_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t)); - peerhash_cond = XCALLOC(MTYPE_TMP, sizeof(pthread_cond_t)); - - // initialize mutex - pthread_mutex_init(peerhash_mtx, NULL); - - // use monotonic clock with condition variable - pthread_condattr_t attrs; - pthread_condattr_init(&attrs); - pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC); - pthread_cond_init(peerhash_cond, &attrs); - pthread_condattr_destroy(&attrs); - - // initialize peer hashtable - peerhash = hash_create_size(2048, peer_hash_key, peer_hash_cmp, NULL); -} - +/* Cleanup handler / deinitializer. */ static void bgp_keepalives_finish(void *arg) { - bgp_keepalives_thread_run = false; - if (peerhash) { hash_clean(peerhash, pkat_del); hash_free(peerhash); @@ -177,32 +154,50 @@ static void bgp_keepalives_finish(void *arg) XFREE(MTYPE_TMP, peerhash_cond); } -/** +/* * Entry function for peer keepalive generation pthread. - * - * bgp_keepalives_init() must be called prior to this. */ void *bgp_keepalives_start(void *arg) { + struct frr_pthread *fpt = arg; + fpt->master->owner = pthread_self(); + struct timeval currtime = {0, 0}; struct timeval aftertime = {0, 0}; struct timeval next_update = {0, 0}; struct timespec next_update_ts = {0, 0}; + peerhash_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t)); + peerhash_cond = XCALLOC(MTYPE_TMP, sizeof(pthread_cond_t)); + + /* initialize mutex */ + pthread_mutex_init(peerhash_mtx, NULL); + + /* use monotonic clock with condition variable */ + pthread_condattr_t attrs; + pthread_condattr_init(&attrs); + pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC); + pthread_cond_init(peerhash_cond, &attrs); + pthread_condattr_destroy(&attrs); + + /* initialize peer hashtable */ + peerhash = hash_create_size(2048, peer_hash_key, peer_hash_cmp, NULL); pthread_mutex_lock(peerhash_mtx); - // register cleanup handler + /* register cleanup handler */ pthread_cleanup_push(&bgp_keepalives_finish, NULL); - bgp_keepalives_thread_run = true; + /* notify anybody waiting on us that we are done starting up */ + frr_pthread_notify_running(fpt); - while (bgp_keepalives_thread_run) { + while (atomic_load_explicit(&fpt->running, memory_order_relaxed)) { if (peerhash->count > 0) pthread_cond_timedwait(peerhash_cond, peerhash_mtx, &next_update_ts); else while (peerhash->count == 0 - && bgp_keepalives_thread_run) + && atomic_load_explicit(&fpt->running, + memory_order_relaxed)) pthread_cond_wait(peerhash_cond, peerhash_mtx); monotime(&currtime); @@ -219,7 +214,7 @@ void *bgp_keepalives_start(void *arg) TIMEVAL_TO_TIMESPEC(&next_update, &next_update_ts); } - // clean up + /* clean up */ pthread_cleanup_pop(1); return NULL; @@ -229,6 +224,12 @@ void *bgp_keepalives_start(void *arg) void bgp_keepalives_on(struct peer *peer) { + if (CHECK_FLAG(peer->thread_flags, PEER_THREAD_KEEPALIVES_ON)) + return; + + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES); + assert(fpt->running); + /* placeholder bucket data to use for fast key lookups */ static struct pkat holder = {0}; @@ -253,6 +254,12 @@ void bgp_keepalives_on(struct peer *peer) void bgp_keepalives_off(struct peer *peer) { + if (!CHECK_FLAG(peer->thread_flags, PEER_THREAD_KEEPALIVES_ON)) + return; + + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES); + assert(fpt->running); + /* placeholder bucket data to use for fast key lookups */ static struct pkat holder = {0}; @@ -283,10 +290,13 @@ void bgp_keepalives_wake() pthread_mutex_unlock(peerhash_mtx); } -int bgp_keepalives_stop(void **result, struct frr_pthread *fpt) +int bgp_keepalives_stop(struct frr_pthread *fpt, void **result) { - bgp_keepalives_thread_run = false; + assert(fpt->running); + + atomic_store_explicit(&fpt->running, false, memory_order_relaxed); bgp_keepalives_wake(); + pthread_join(fpt->thread, result); return 0; } diff --git a/bgpd/bgp_keepalives.h b/bgpd/bgp_keepalives.h index 1fbd035b9e..d1cb7d2462 100644 --- a/bgpd/bgp_keepalives.h +++ b/bgpd/bgp_keepalives.h @@ -88,6 +88,6 @@ extern void bgp_keepalives_wake(void); /** * Stops the thread and blocks until it terminates. */ -int bgp_keepalives_stop(void **result, struct frr_pthread *fpt); +int bgp_keepalives_stop(struct frr_pthread *fpt, void **result); #endif /* _FRR_BGP_KEEPALIVES_H */ diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c index 7db73043cc..19f0c8cabf 100644 --- a/bgpd/bgpd.c +++ b/bgpd/bgpd.c @@ -7470,30 +7470,33 @@ static void bgp_pthreads_init() { frr_pthread_init(); - frr_pthread_new("BGP i/o thread", PTHREAD_IO, bgp_io_start, - bgp_io_stop); - frr_pthread_new("BGP keepalives thread", PTHREAD_KEEPALIVES, - bgp_keepalives_start, bgp_keepalives_stop); - - /* pre-run initialization */ - bgp_keepalives_init(); - bgp_io_init(); + struct frr_pthread_attr io = { + .id = PTHREAD_IO, + .start = frr_pthread_attr_default.start, + .stop = frr_pthread_attr_default.stop, + .name = "BGP I/O thread", + }; + struct frr_pthread_attr ka = { + .id = PTHREAD_KEEPALIVES, + .start = bgp_keepalives_start, + .stop = bgp_keepalives_stop, + .name = "BGP Keepalives thread", + }; + frr_pthread_new(&io); + frr_pthread_new(&ka); } void bgp_pthreads_run() { - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setschedpolicy(&attr, SCHED_FIFO); + struct frr_pthread *io = frr_pthread_get(PTHREAD_IO); + struct frr_pthread *ka = frr_pthread_get(PTHREAD_KEEPALIVES); - /* - * I/O related code assumes the thread is ready for work at all times, - * so we wait until it is. - */ - frr_pthread_run(PTHREAD_IO, &attr, NULL); - bgp_io_wait_running(); + frr_pthread_run(io, NULL); + frr_pthread_run(ka, NULL); - frr_pthread_run(PTHREAD_KEEPALIVES, &attr, NULL); + /* Wait until threads are ready. */ + frr_pthread_wait_running(io); + frr_pthread_wait_running(ka); } void bgp_pthreads_finish() diff --git a/lib/frr_pthread.c b/lib/frr_pthread.c index de522e5ef9..72b47ae5c3 100644 --- a/lib/frr_pthread.c +++ b/lib/frr_pthread.c @@ -1,5 +1,5 @@ /* - * Utilities and interfaces for managing POSIX threads + * Utilities and interfaces for managing POSIX threads within FRR. * Copyright (C) 2017 Cumulus Networks * * This program is free software; you can redistribute it and/or modify @@ -25,79 +25,99 @@ #include "memory.h" #include "hash.h" -DEFINE_MTYPE_STATIC(LIB, FRR_PTHREAD, "FRR POSIX Thread"); +DEFINE_MTYPE(LIB, FRR_PTHREAD, "FRR POSIX Thread"); DEFINE_MTYPE(LIB, PTHREAD_PRIM, "POSIX synchronization primitives"); +/* id for next created pthread */ static unsigned int next_id = 0; -/* Hash table of all frr_pthreads along with synchronization primitive(s) and - * hash table callbacks. - * ------------------------------------------------------------------------ */ -static struct hash *pthread_table; -static pthread_mutex_t pthread_table_mtx = PTHREAD_MUTEX_INITIALIZER; +/* default frr_pthread start/stop routine prototypes */ +static void *fpt_run(void *arg); +static int fpt_halt(struct frr_pthread *fpt, void **res); -/* pthread_table->hash_cmp */ -static int pthread_table_hash_cmp(const void *value1, const void *value2) +/* default frr_pthread attributes */ +struct frr_pthread_attr frr_pthread_attr_default = { + .id = 0, + .start = fpt_run, + .stop = fpt_halt, + .name = "Anonymous", +}; + +/* hash table to keep track of all frr_pthreads */ +static struct hash *frr_pthread_hash; +static pthread_mutex_t frr_pthread_hash_mtx = PTHREAD_MUTEX_INITIALIZER; + +/* frr_pthread_hash->hash_cmp */ +static int frr_pthread_hash_cmp(const void *value1, const void *value2) { const struct frr_pthread *tq1 = value1; const struct frr_pthread *tq2 = value2; - return (tq1->id == tq2->id); + return (tq1->attr.id == tq2->attr.id); } -/* pthread_table->hash_key */ -static unsigned int pthread_table_hash_key(void *value) +/* frr_pthread_hash->hash_key */ +static unsigned int frr_pthread_hash_key(void *value) { - return ((struct frr_pthread *)value)->id; + return ((struct frr_pthread *)value)->attr.id; } + /* ------------------------------------------------------------------------ */ void frr_pthread_init() { - pthread_mutex_lock(&pthread_table_mtx); + pthread_mutex_lock(&frr_pthread_hash_mtx); { - pthread_table = hash_create(pthread_table_hash_key, - pthread_table_hash_cmp, NULL); + frr_pthread_hash = hash_create(frr_pthread_hash_key, + frr_pthread_hash_cmp, NULL); } - pthread_mutex_unlock(&pthread_table_mtx); + pthread_mutex_unlock(&frr_pthread_hash_mtx); } void frr_pthread_finish() { - pthread_mutex_lock(&pthread_table_mtx); + pthread_mutex_lock(&frr_pthread_hash_mtx); { - hash_clean(pthread_table, + hash_clean(frr_pthread_hash, (void (*)(void *))frr_pthread_destroy); - hash_free(pthread_table); + hash_free(frr_pthread_hash); } - pthread_mutex_unlock(&pthread_table_mtx); + pthread_mutex_unlock(&frr_pthread_hash_mtx); } -struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, - void *(*start_routine)(void *), - int (*stop_routine)(void **, - struct frr_pthread *)) +struct frr_pthread *frr_pthread_new(struct frr_pthread_attr *attr) { static struct frr_pthread holder = {0}; struct frr_pthread *fpt = NULL; - pthread_mutex_lock(&pthread_table_mtx); + attr = attr ? attr : &frr_pthread_attr_default; + + pthread_mutex_lock(&frr_pthread_hash_mtx); { - holder.id = id; + holder.attr.id = attr->id; - if (!hash_lookup(pthread_table, &holder)) { - struct frr_pthread *fpt = XCALLOC( - MTYPE_FRR_PTHREAD, sizeof(struct frr_pthread)); - fpt->id = id; - fpt->master = thread_master_create(name); - fpt->start_routine = start_routine; - fpt->stop_routine = stop_routine; - fpt->name = XSTRDUP(MTYPE_FRR_PTHREAD, name); + if (!hash_lookup(frr_pthread_hash, &holder)) { + fpt = XCALLOC(MTYPE_FRR_PTHREAD, + sizeof(struct frr_pthread)); + /* create new thread master */ + fpt->master = thread_master_create(attr->name); + /* set attributes */ + fpt->attr = *attr; + if (attr == &frr_pthread_attr_default) + fpt->attr.id = frr_pthread_get_id(); + /* initialize startup synchronization primitives */ + fpt->running_cond_mtx = XCALLOC( + MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t)); + fpt->running_cond = XCALLOC(MTYPE_PTHREAD_PRIM, + sizeof(pthread_cond_t)); + pthread_mutex_init(fpt->running_cond_mtx, NULL); + pthread_cond_init(fpt->running_cond, NULL); - hash_get(pthread_table, fpt, hash_alloc_intern); + /* insert into global thread hash */ + hash_get(frr_pthread_hash, fpt, hash_alloc_intern); } } - pthread_mutex_unlock(&pthread_table_mtx); + pthread_mutex_unlock(&frr_pthread_hash_mtx); return fpt; } @@ -105,7 +125,11 @@ struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, void frr_pthread_destroy(struct frr_pthread *fpt) { thread_master_free(fpt->master); - XFREE(MTYPE_FRR_PTHREAD, fpt->name); + + pthread_mutex_destroy(fpt->running_cond_mtx); + pthread_cond_destroy(fpt->running_cond); + XFREE(MTYPE_PTHREAD_PRIM, fpt->running_cond_mtx); + XFREE(MTYPE_PTHREAD_PRIM, fpt->running_cond); XFREE(MTYPE_FRR_PTHREAD, fpt); } @@ -114,74 +138,82 @@ struct frr_pthread *frr_pthread_get(unsigned int id) static struct frr_pthread holder = {0}; struct frr_pthread *fpt; - pthread_mutex_lock(&pthread_table_mtx); + pthread_mutex_lock(&frr_pthread_hash_mtx); { - holder.id = id; - fpt = hash_lookup(pthread_table, &holder); + holder.attr.id = id; + fpt = hash_lookup(frr_pthread_hash, &holder); } - pthread_mutex_unlock(&pthread_table_mtx); + pthread_mutex_unlock(&frr_pthread_hash_mtx); return fpt; } -int frr_pthread_run(unsigned int id, const pthread_attr_t *attr, void *arg) +int frr_pthread_run(struct frr_pthread *fpt, const pthread_attr_t *attr) { - struct frr_pthread *fpt = frr_pthread_get(id); int ret; - if (!fpt) - return -1; + ret = pthread_create(&fpt->thread, attr, fpt->attr.start, fpt); - ret = pthread_create(&fpt->thread, attr, fpt->start_routine, arg); - - /* Per pthread_create(3), the contents of fpt->thread are undefined if - * pthread_create() did not succeed. Reset this value to zero. */ + /* + * Per pthread_create(3), the contents of fpt->thread are undefined if + * pthread_create() did not succeed. Reset this value to zero. + */ if (ret < 0) memset(&fpt->thread, 0x00, sizeof(fpt->thread)); return ret; } -/** - * Calls the stop routine for the frr_pthread and resets any relevant fields. - * - * @param fpt - the frr_pthread to stop - * @param result - pointer to result pointer - * @return the return code from the stop routine - */ -static int frr_pthread_stop_actual(struct frr_pthread *fpt, void **result) +void frr_pthread_wait_running(struct frr_pthread *fpt) { - int ret = (*fpt->stop_routine)(result, fpt); + pthread_mutex_lock(fpt->running_cond_mtx); + { + while (!fpt->running) + pthread_cond_wait(fpt->running_cond, + fpt->running_cond_mtx); + } + pthread_mutex_unlock(fpt->running_cond_mtx); +} + +void frr_pthread_notify_running(struct frr_pthread *fpt) +{ + pthread_mutex_lock(fpt->running_cond_mtx); + { + fpt->running = true; + pthread_cond_signal(fpt->running_cond); + } + pthread_mutex_unlock(fpt->running_cond_mtx); +} + +int frr_pthread_stop(struct frr_pthread *fpt, void **result) +{ + int ret = (*fpt->attr.stop)(fpt, result); memset(&fpt->thread, 0x00, sizeof(fpt->thread)); return ret; } -int frr_pthread_stop(unsigned int id, void **result) -{ - struct frr_pthread *fpt = frr_pthread_get(id); - return frr_pthread_stop_actual(fpt, result); -} - -/** +/* * Callback for hash_iterate to stop all frr_pthread's. */ static void frr_pthread_stop_all_iter(struct hash_backet *hb, void *arg) { struct frr_pthread *fpt = hb->data; - frr_pthread_stop_actual(fpt, NULL); + frr_pthread_stop(fpt, NULL); } void frr_pthread_stop_all() { - pthread_mutex_lock(&pthread_table_mtx); + pthread_mutex_lock(&frr_pthread_hash_mtx); { - hash_iterate(pthread_table, frr_pthread_stop_all_iter, NULL); + hash_iterate(frr_pthread_hash, frr_pthread_stop_all_iter, NULL); } - pthread_mutex_unlock(&pthread_table_mtx); + pthread_mutex_unlock(&frr_pthread_hash_mtx); } unsigned int frr_pthread_get_id() { + /* just a sanity check, this should never happen */ + assert(next_id <= INT_MAX - 1); return next_id++; } @@ -189,3 +221,60 @@ void frr_pthread_yield(void) { (void)sched_yield(); } + +/* + * ---------------------------------------------------------------------------- + * Default Event Loop + * ---------------------------------------------------------------------------- + */ + +/* dummy task for sleeper pipe */ +static int fpt_dummy(struct thread *thread) +{ + return 0; +} + +/* poison pill task to end event loop */ +static int fpt_finish(struct thread *thread) +{ + struct frr_pthread *fpt = THREAD_ARG(thread); + atomic_store_explicit(&fpt->running, false, memory_order_relaxed); + return 0; +} + +/* stop function, called from other threads to halt this one */ +static int fpt_halt(struct frr_pthread *fpt, void **res) +{ + thread_add_event(fpt->master, &fpt_finish, fpt, 0, NULL); + pthread_join(fpt->thread, res); + fpt = NULL; + + return 0; +} + +/* entry pthread function & main event loop */ +static void *fpt_run(void *arg) +{ + struct frr_pthread *fpt = arg; + fpt->master->owner = pthread_self(); + + int sleeper[2]; + pipe(sleeper); + thread_add_read(fpt->master, &fpt_dummy, NULL, sleeper[0], NULL); + + fpt->master->handle_signals = false; + + frr_pthread_notify_running(fpt); + + struct thread task; + while (atomic_load_explicit(&fpt->running, memory_order_relaxed)) { + if (thread_fetch(fpt->master, &task)) { + thread_call(&task); + } + } + + close(sleeper[1]); + close(sleeper[0]); + + return NULL; +} diff --git a/lib/frr_pthread.h b/lib/frr_pthread.h index 7915b43a46..2cc50196a8 100644 --- a/lib/frr_pthread.h +++ b/lib/frr_pthread.h @@ -1,5 +1,5 @@ /* - * Utilities and interfaces for managing POSIX threads + * Utilities and interfaces for managing POSIX threads within FRR. * Copyright (C) 2017 Cumulus Networks * * This program is free software; you can redistribute it and/or modify @@ -21,39 +21,73 @@ #define _FRR_PTHREAD_H #include +#include "frratomic.h" #include "memory.h" #include "thread.h" +DECLARE_MTYPE(FRR_PTHREAD); DECLARE_MTYPE(PTHREAD_PRIM); +struct frr_pthread; +struct frr_pthread_attr; + +struct frr_pthread_attr { + int id; + void *(*start)(void *); + int (*stop)(struct frr_pthread *, void **); + const char *name; +}; + struct frr_pthread { /* pthread id */ pthread_t thread; - /* frr thread identifier */ - unsigned int id; - /* thread master for this pthread's thread.c event loop */ struct thread_master *master; - /* start routine */ - void *(*start_routine)(void *); + /* caller-specified data; start & stop funcs, name, id */ + struct frr_pthread_attr attr; - /* stop routine */ - int (*stop_routine)(void **, struct frr_pthread *); + /* + * Notification mechanism for allowing pthreads to notify their parents + * when they are ready to do work. This mechanism has two associated + * functions: + * + * - frr_pthread_wait_running() + * This function should be called by the spawning thread after + * frr_pthread_run(). It safely waits until the spawned thread + * indicates that is ready to do work by posting to the condition + * variable. + * + * - frr_pthread_notify_running() + * This function should be called by the spawned thread when it is + * ready to do work. It will wake up any threads waiting on the + * previously described condition. + */ + pthread_cond_t *running_cond; + pthread_mutex_t *running_cond_mtx; + _Atomic bool running; - /* the (hopefully descriptive) name of this thread */ - char *name; + /* + * Fake thread-specific storage. No constraints on usage. Helpful when + * creating reentrant pthread implementations. Can be used to pass + * argument to pthread entry function. + */ + void *data; }; -/* Initializes this module. +extern struct frr_pthread_attr frr_pthread_attr_default; + +/* + * Initializes this module. * * Must be called before using any of the other functions. */ void frr_pthread_init(void); -/* Uninitializes this module. +/* + * Uninitializes this module. * * Destroys all registered frr_pthread's and internal data structures. * @@ -62,34 +96,23 @@ void frr_pthread_init(void); */ void frr_pthread_finish(void); -/* Creates a new frr_pthread. +/* + * Creates a new frr_pthread with the given attributes. * - * If the provided ID is already assigned to an existing frr_pthread, the - * return value will be NULL. - * - * @param name - the name of the thread. Doesn't have to be unique, but it - * probably should be. This value is copied and may be safely free'd upon - * return. - * - * @param id - the integral ID of the thread. MUST be unique. The caller may - * use this id to retrieve the thread. - * - * @param start_routine - start routine for the pthread, will be passed to - * pthread_create (see those docs for details) - * - * @param stop_routine - stop routine for the pthread, called to terminate the - * thread. This function should gracefully stop the pthread and clean up any - * thread-specific resources. The passed pointer is used to return a data - * result. + * The 'attr' argument should be filled out with the desired attributes, + * including ID, start and stop functions and the desired name. Alternatively, + * if attr is NULL, the default attributes will be used. The pthread will be + * set up to run a basic threadmaster loop and the name will be "Anonymous". + * Scheduling tasks onto the threadmaster in the 'master' field of the returned + * frr_pthread will cause them to run on that pthread. * + * @param attr - the thread attributes * @return the created frr_pthread upon success, or NULL upon failure */ -struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, - void *(*start_routine)(void *), - int (*stop_routine)(void **, - struct frr_pthread *)); +struct frr_pthread *frr_pthread_new(struct frr_pthread_attr *attr); -/* Destroys an frr_pthread. +/* + * Destroys an frr_pthread. * * Assumes that the associated pthread, if any, has already terminated. * @@ -97,38 +120,66 @@ struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, */ void frr_pthread_destroy(struct frr_pthread *fpt); -/* Gets an existing frr_pthread by its id. +/* + * Gets an existing frr_pthread by its id. * * @return frr_thread associated with the provided id, or NULL on error */ struct frr_pthread *frr_pthread_get(unsigned int id); -/* Creates a new pthread and binds it to a frr_pthread. +/* + * Creates a new pthread and binds it to a frr_pthread. * * This function is a wrapper for pthread_create. The first parameter is the * frr_pthread to bind the created pthread to. All subsequent arguments are - * passed unmodified to pthread_create(). + * passed unmodified to pthread_create(). The frr_pthread * provided will be + * used as the argument to the pthread entry function. If it is necessary to + * pass additional data, the 'data' field in the frr_pthread may be used. * * This function returns the same code as pthread_create(). If the value is * zero, the provided frr_pthread is bound to a running POSIX thread. If the * value is less than zero, the provided frr_pthread is guaranteed to be a * clean instance that may be susbsequently passed to frr_pthread_run(). * - * @param id - frr_pthread to bind the created pthread to + * @param fpt - frr_pthread * to run * @param attr - see pthread_create(3) - * @param arg - see pthread_create(3) * * @return see pthread_create(3) */ -int frr_pthread_run(unsigned int id, const pthread_attr_t *attr, void *arg); +int frr_pthread_run(struct frr_pthread *fpt, const pthread_attr_t *attr); -/* Stops an frr_pthread with a result. +/* + * Waits until the specified pthread has finished setting up and is ready to + * begin work. * - * @param id - frr_pthread to stop + * If the pthread's code makes use of the startup synchronization mechanism, + * this function should be called before attempting to use the functionality + * exposed by the pthread. It waits until the 'running' condition is satisfied + * (see struct definition of frr_pthread). + * + * @param fpt - the frr_pthread * to wait on + */ +void frr_pthread_wait_running(struct frr_pthread *fpt); + +/* + * Notifies other pthreads that the calling thread has finished setting up and + * is ready to begin work. + * + * This will allow any other pthreads waiting in 'frr_pthread_wait_running' to + * proceed. + * + * @param fpt - the frr_pthread * that has finished setting up + */ +void frr_pthread_notify_running(struct frr_pthread *fpt); + +/* + * Stops a frr_pthread with a result. + * + * @param fpt - frr_pthread * to stop * @param result - where to store the thread's result, if any. May be NULL if a * result is not needed. */ -int frr_pthread_stop(unsigned int id, void **result); +int frr_pthread_stop(struct frr_pthread *fpt, void **result); /* Stops all frr_pthread's. */ void frr_pthread_stop_all(void); @@ -136,7 +187,8 @@ void frr_pthread_stop_all(void); /* Yields the current thread of execution */ void frr_pthread_yield(void); -/* Returns a unique identifier for use with frr_pthread_new(). +/* + * Returns a unique identifier for use with frr_pthread_new(). * * Internally, this is an integer that increments after each call to this * function. Because the number of pthreads created should never exceed INT_MAX diff --git a/tests/bgpd/test_aspath.c b/tests/bgpd/test_aspath.c index 56808bc8ad..9e5cb7fe54 100644 --- a/tests/bgpd/test_aspath.c +++ b/tests/bgpd/test_aspath.c @@ -25,8 +25,9 @@ #include "privs.h" #include "queue.h" #include "filter.h" +#include "frr_pthread.h" -#include "bgpd/bgpd.h" +#include "bgpd/bgpd.c" #include "bgpd/bgp_aspath.h" #include "bgpd/bgp_attr.h" #include "bgpd/bgp_packet.h" @@ -1272,6 +1273,9 @@ static int handle_attr_test(struct aspath_tests *t) struct aspath *asp; size_t datalen; + bgp_pthreads_init(); + frr_pthread_get(PTHREAD_KEEPALIVES)->running = true; + asp = make_aspath(t->segment->asdata, t->segment->len, 0); peer.curr = stream_new(BGP_MAX_PACKET_SIZE); diff --git a/tests/bgpd/test_capability.c b/tests/bgpd/test_capability.c index a5092708e2..3d5518f3b7 100644 --- a/tests/bgpd/test_capability.c +++ b/tests/bgpd/test_capability.c @@ -27,8 +27,9 @@ #include "memory.h" #include "queue.h" #include "filter.h" +#include "frr_pthread.h" -#include "bgpd/bgpd.h" +#include "bgpd/bgpd.c" #include "bgpd/bgp_open.h" #include "bgpd/bgp_debug.h" #include "bgpd/bgp_packet.h" @@ -915,6 +916,9 @@ int main(void) vrf_init(NULL, NULL, NULL, NULL); bgp_option_set(BGP_OPT_NO_LISTEN); + bgp_pthreads_init(); + frr_pthread_get(PTHREAD_KEEPALIVES)->running = true; + if (fileno(stdout) >= 0) tty = isatty(fileno(stdout));