From a45dc9742c3e4e504f66bf18b7bc9da3c218233a Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Mon, 22 Jan 2018 14:23:55 -0500 Subject: [PATCH 1/4] lib: streamline frr_pthreads, add default loop Some work on FRR's pthread wrapper. * Provide a built-in way to synchronize thread startup * Make utility functions take frr_pthread * instead of its integer ID * Pass frr_pthread * as pthread start function argument * Correct some comment styling * Rename some variables to match naming conventions in the file * Change parameter ordering in stop function prototype to follow the convention in the other functions * Default new frr_pthreads to using a vanilla event loop For the last point, the original goal when designing the implementation of pthreads into FRR was to be able to use the thread.c event based system inside pthreads. This code essentially encapuslates all the thread.c functionality into an easy to use pthread out of the box. Creating a new frr_pthread with a null attributes field will cause the created frr_pthread to run a thread.c event loop. The upshot of this is that it is now possible to safely run existing functions in a pthread in roughly 3 lines of code. It also serves as an example / starting point for others. Signed-off-by: Quentin Young --- lib/frr_pthread.c | 229 ++++++++++++++++++++++++++++++++-------------- lib/frr_pthread.h | 142 +++++++++++++++++++--------- 2 files changed, 256 insertions(+), 115 deletions(-) diff --git a/lib/frr_pthread.c b/lib/frr_pthread.c index de522e5ef9..72b47ae5c3 100644 --- a/lib/frr_pthread.c +++ b/lib/frr_pthread.c @@ -1,5 +1,5 @@ /* - * Utilities and interfaces for managing POSIX threads + * Utilities and interfaces for managing POSIX threads within FRR. * Copyright (C) 2017 Cumulus Networks * * This program is free software; you can redistribute it and/or modify @@ -25,79 +25,99 @@ #include "memory.h" #include "hash.h" -DEFINE_MTYPE_STATIC(LIB, FRR_PTHREAD, "FRR POSIX Thread"); +DEFINE_MTYPE(LIB, FRR_PTHREAD, "FRR POSIX Thread"); DEFINE_MTYPE(LIB, PTHREAD_PRIM, "POSIX synchronization primitives"); +/* id for next created pthread */ static unsigned int next_id = 0; -/* Hash table of all frr_pthreads along with synchronization primitive(s) and - * hash table callbacks. - * ------------------------------------------------------------------------ */ -static struct hash *pthread_table; -static pthread_mutex_t pthread_table_mtx = PTHREAD_MUTEX_INITIALIZER; +/* default frr_pthread start/stop routine prototypes */ +static void *fpt_run(void *arg); +static int fpt_halt(struct frr_pthread *fpt, void **res); -/* pthread_table->hash_cmp */ -static int pthread_table_hash_cmp(const void *value1, const void *value2) +/* default frr_pthread attributes */ +struct frr_pthread_attr frr_pthread_attr_default = { + .id = 0, + .start = fpt_run, + .stop = fpt_halt, + .name = "Anonymous", +}; + +/* hash table to keep track of all frr_pthreads */ +static struct hash *frr_pthread_hash; +static pthread_mutex_t frr_pthread_hash_mtx = PTHREAD_MUTEX_INITIALIZER; + +/* frr_pthread_hash->hash_cmp */ +static int frr_pthread_hash_cmp(const void *value1, const void *value2) { const struct frr_pthread *tq1 = value1; const struct frr_pthread *tq2 = value2; - return (tq1->id == tq2->id); + return (tq1->attr.id == tq2->attr.id); } -/* pthread_table->hash_key */ -static unsigned int pthread_table_hash_key(void *value) +/* frr_pthread_hash->hash_key */ +static unsigned int frr_pthread_hash_key(void *value) { - return ((struct frr_pthread *)value)->id; + return ((struct frr_pthread *)value)->attr.id; } + /* ------------------------------------------------------------------------ */ void frr_pthread_init() { - pthread_mutex_lock(&pthread_table_mtx); + pthread_mutex_lock(&frr_pthread_hash_mtx); { - pthread_table = hash_create(pthread_table_hash_key, - pthread_table_hash_cmp, NULL); + frr_pthread_hash = hash_create(frr_pthread_hash_key, + frr_pthread_hash_cmp, NULL); } - pthread_mutex_unlock(&pthread_table_mtx); + pthread_mutex_unlock(&frr_pthread_hash_mtx); } void frr_pthread_finish() { - pthread_mutex_lock(&pthread_table_mtx); + pthread_mutex_lock(&frr_pthread_hash_mtx); { - hash_clean(pthread_table, + hash_clean(frr_pthread_hash, (void (*)(void *))frr_pthread_destroy); - hash_free(pthread_table); + hash_free(frr_pthread_hash); } - pthread_mutex_unlock(&pthread_table_mtx); + pthread_mutex_unlock(&frr_pthread_hash_mtx); } -struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, - void *(*start_routine)(void *), - int (*stop_routine)(void **, - struct frr_pthread *)) +struct frr_pthread *frr_pthread_new(struct frr_pthread_attr *attr) { static struct frr_pthread holder = {0}; struct frr_pthread *fpt = NULL; - pthread_mutex_lock(&pthread_table_mtx); + attr = attr ? attr : &frr_pthread_attr_default; + + pthread_mutex_lock(&frr_pthread_hash_mtx); { - holder.id = id; + holder.attr.id = attr->id; - if (!hash_lookup(pthread_table, &holder)) { - struct frr_pthread *fpt = XCALLOC( - MTYPE_FRR_PTHREAD, sizeof(struct frr_pthread)); - fpt->id = id; - fpt->master = thread_master_create(name); - fpt->start_routine = start_routine; - fpt->stop_routine = stop_routine; - fpt->name = XSTRDUP(MTYPE_FRR_PTHREAD, name); + if (!hash_lookup(frr_pthread_hash, &holder)) { + fpt = XCALLOC(MTYPE_FRR_PTHREAD, + sizeof(struct frr_pthread)); + /* create new thread master */ + fpt->master = thread_master_create(attr->name); + /* set attributes */ + fpt->attr = *attr; + if (attr == &frr_pthread_attr_default) + fpt->attr.id = frr_pthread_get_id(); + /* initialize startup synchronization primitives */ + fpt->running_cond_mtx = XCALLOC( + MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t)); + fpt->running_cond = XCALLOC(MTYPE_PTHREAD_PRIM, + sizeof(pthread_cond_t)); + pthread_mutex_init(fpt->running_cond_mtx, NULL); + pthread_cond_init(fpt->running_cond, NULL); - hash_get(pthread_table, fpt, hash_alloc_intern); + /* insert into global thread hash */ + hash_get(frr_pthread_hash, fpt, hash_alloc_intern); } } - pthread_mutex_unlock(&pthread_table_mtx); + pthread_mutex_unlock(&frr_pthread_hash_mtx); return fpt; } @@ -105,7 +125,11 @@ struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, void frr_pthread_destroy(struct frr_pthread *fpt) { thread_master_free(fpt->master); - XFREE(MTYPE_FRR_PTHREAD, fpt->name); + + pthread_mutex_destroy(fpt->running_cond_mtx); + pthread_cond_destroy(fpt->running_cond); + XFREE(MTYPE_PTHREAD_PRIM, fpt->running_cond_mtx); + XFREE(MTYPE_PTHREAD_PRIM, fpt->running_cond); XFREE(MTYPE_FRR_PTHREAD, fpt); } @@ -114,74 +138,82 @@ struct frr_pthread *frr_pthread_get(unsigned int id) static struct frr_pthread holder = {0}; struct frr_pthread *fpt; - pthread_mutex_lock(&pthread_table_mtx); + pthread_mutex_lock(&frr_pthread_hash_mtx); { - holder.id = id; - fpt = hash_lookup(pthread_table, &holder); + holder.attr.id = id; + fpt = hash_lookup(frr_pthread_hash, &holder); } - pthread_mutex_unlock(&pthread_table_mtx); + pthread_mutex_unlock(&frr_pthread_hash_mtx); return fpt; } -int frr_pthread_run(unsigned int id, const pthread_attr_t *attr, void *arg) +int frr_pthread_run(struct frr_pthread *fpt, const pthread_attr_t *attr) { - struct frr_pthread *fpt = frr_pthread_get(id); int ret; - if (!fpt) - return -1; + ret = pthread_create(&fpt->thread, attr, fpt->attr.start, fpt); - ret = pthread_create(&fpt->thread, attr, fpt->start_routine, arg); - - /* Per pthread_create(3), the contents of fpt->thread are undefined if - * pthread_create() did not succeed. Reset this value to zero. */ + /* + * Per pthread_create(3), the contents of fpt->thread are undefined if + * pthread_create() did not succeed. Reset this value to zero. + */ if (ret < 0) memset(&fpt->thread, 0x00, sizeof(fpt->thread)); return ret; } -/** - * Calls the stop routine for the frr_pthread and resets any relevant fields. - * - * @param fpt - the frr_pthread to stop - * @param result - pointer to result pointer - * @return the return code from the stop routine - */ -static int frr_pthread_stop_actual(struct frr_pthread *fpt, void **result) +void frr_pthread_wait_running(struct frr_pthread *fpt) { - int ret = (*fpt->stop_routine)(result, fpt); + pthread_mutex_lock(fpt->running_cond_mtx); + { + while (!fpt->running) + pthread_cond_wait(fpt->running_cond, + fpt->running_cond_mtx); + } + pthread_mutex_unlock(fpt->running_cond_mtx); +} + +void frr_pthread_notify_running(struct frr_pthread *fpt) +{ + pthread_mutex_lock(fpt->running_cond_mtx); + { + fpt->running = true; + pthread_cond_signal(fpt->running_cond); + } + pthread_mutex_unlock(fpt->running_cond_mtx); +} + +int frr_pthread_stop(struct frr_pthread *fpt, void **result) +{ + int ret = (*fpt->attr.stop)(fpt, result); memset(&fpt->thread, 0x00, sizeof(fpt->thread)); return ret; } -int frr_pthread_stop(unsigned int id, void **result) -{ - struct frr_pthread *fpt = frr_pthread_get(id); - return frr_pthread_stop_actual(fpt, result); -} - -/** +/* * Callback for hash_iterate to stop all frr_pthread's. */ static void frr_pthread_stop_all_iter(struct hash_backet *hb, void *arg) { struct frr_pthread *fpt = hb->data; - frr_pthread_stop_actual(fpt, NULL); + frr_pthread_stop(fpt, NULL); } void frr_pthread_stop_all() { - pthread_mutex_lock(&pthread_table_mtx); + pthread_mutex_lock(&frr_pthread_hash_mtx); { - hash_iterate(pthread_table, frr_pthread_stop_all_iter, NULL); + hash_iterate(frr_pthread_hash, frr_pthread_stop_all_iter, NULL); } - pthread_mutex_unlock(&pthread_table_mtx); + pthread_mutex_unlock(&frr_pthread_hash_mtx); } unsigned int frr_pthread_get_id() { + /* just a sanity check, this should never happen */ + assert(next_id <= INT_MAX - 1); return next_id++; } @@ -189,3 +221,60 @@ void frr_pthread_yield(void) { (void)sched_yield(); } + +/* + * ---------------------------------------------------------------------------- + * Default Event Loop + * ---------------------------------------------------------------------------- + */ + +/* dummy task for sleeper pipe */ +static int fpt_dummy(struct thread *thread) +{ + return 0; +} + +/* poison pill task to end event loop */ +static int fpt_finish(struct thread *thread) +{ + struct frr_pthread *fpt = THREAD_ARG(thread); + atomic_store_explicit(&fpt->running, false, memory_order_relaxed); + return 0; +} + +/* stop function, called from other threads to halt this one */ +static int fpt_halt(struct frr_pthread *fpt, void **res) +{ + thread_add_event(fpt->master, &fpt_finish, fpt, 0, NULL); + pthread_join(fpt->thread, res); + fpt = NULL; + + return 0; +} + +/* entry pthread function & main event loop */ +static void *fpt_run(void *arg) +{ + struct frr_pthread *fpt = arg; + fpt->master->owner = pthread_self(); + + int sleeper[2]; + pipe(sleeper); + thread_add_read(fpt->master, &fpt_dummy, NULL, sleeper[0], NULL); + + fpt->master->handle_signals = false; + + frr_pthread_notify_running(fpt); + + struct thread task; + while (atomic_load_explicit(&fpt->running, memory_order_relaxed)) { + if (thread_fetch(fpt->master, &task)) { + thread_call(&task); + } + } + + close(sleeper[1]); + close(sleeper[0]); + + return NULL; +} diff --git a/lib/frr_pthread.h b/lib/frr_pthread.h index 7915b43a46..2cc50196a8 100644 --- a/lib/frr_pthread.h +++ b/lib/frr_pthread.h @@ -1,5 +1,5 @@ /* - * Utilities and interfaces for managing POSIX threads + * Utilities and interfaces for managing POSIX threads within FRR. * Copyright (C) 2017 Cumulus Networks * * This program is free software; you can redistribute it and/or modify @@ -21,39 +21,73 @@ #define _FRR_PTHREAD_H #include +#include "frratomic.h" #include "memory.h" #include "thread.h" +DECLARE_MTYPE(FRR_PTHREAD); DECLARE_MTYPE(PTHREAD_PRIM); +struct frr_pthread; +struct frr_pthread_attr; + +struct frr_pthread_attr { + int id; + void *(*start)(void *); + int (*stop)(struct frr_pthread *, void **); + const char *name; +}; + struct frr_pthread { /* pthread id */ pthread_t thread; - /* frr thread identifier */ - unsigned int id; - /* thread master for this pthread's thread.c event loop */ struct thread_master *master; - /* start routine */ - void *(*start_routine)(void *); + /* caller-specified data; start & stop funcs, name, id */ + struct frr_pthread_attr attr; - /* stop routine */ - int (*stop_routine)(void **, struct frr_pthread *); + /* + * Notification mechanism for allowing pthreads to notify their parents + * when they are ready to do work. This mechanism has two associated + * functions: + * + * - frr_pthread_wait_running() + * This function should be called by the spawning thread after + * frr_pthread_run(). It safely waits until the spawned thread + * indicates that is ready to do work by posting to the condition + * variable. + * + * - frr_pthread_notify_running() + * This function should be called by the spawned thread when it is + * ready to do work. It will wake up any threads waiting on the + * previously described condition. + */ + pthread_cond_t *running_cond; + pthread_mutex_t *running_cond_mtx; + _Atomic bool running; - /* the (hopefully descriptive) name of this thread */ - char *name; + /* + * Fake thread-specific storage. No constraints on usage. Helpful when + * creating reentrant pthread implementations. Can be used to pass + * argument to pthread entry function. + */ + void *data; }; -/* Initializes this module. +extern struct frr_pthread_attr frr_pthread_attr_default; + +/* + * Initializes this module. * * Must be called before using any of the other functions. */ void frr_pthread_init(void); -/* Uninitializes this module. +/* + * Uninitializes this module. * * Destroys all registered frr_pthread's and internal data structures. * @@ -62,34 +96,23 @@ void frr_pthread_init(void); */ void frr_pthread_finish(void); -/* Creates a new frr_pthread. +/* + * Creates a new frr_pthread with the given attributes. * - * If the provided ID is already assigned to an existing frr_pthread, the - * return value will be NULL. - * - * @param name - the name of the thread. Doesn't have to be unique, but it - * probably should be. This value is copied and may be safely free'd upon - * return. - * - * @param id - the integral ID of the thread. MUST be unique. The caller may - * use this id to retrieve the thread. - * - * @param start_routine - start routine for the pthread, will be passed to - * pthread_create (see those docs for details) - * - * @param stop_routine - stop routine for the pthread, called to terminate the - * thread. This function should gracefully stop the pthread and clean up any - * thread-specific resources. The passed pointer is used to return a data - * result. + * The 'attr' argument should be filled out with the desired attributes, + * including ID, start and stop functions and the desired name. Alternatively, + * if attr is NULL, the default attributes will be used. The pthread will be + * set up to run a basic threadmaster loop and the name will be "Anonymous". + * Scheduling tasks onto the threadmaster in the 'master' field of the returned + * frr_pthread will cause them to run on that pthread. * + * @param attr - the thread attributes * @return the created frr_pthread upon success, or NULL upon failure */ -struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, - void *(*start_routine)(void *), - int (*stop_routine)(void **, - struct frr_pthread *)); +struct frr_pthread *frr_pthread_new(struct frr_pthread_attr *attr); -/* Destroys an frr_pthread. +/* + * Destroys an frr_pthread. * * Assumes that the associated pthread, if any, has already terminated. * @@ -97,38 +120,66 @@ struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, */ void frr_pthread_destroy(struct frr_pthread *fpt); -/* Gets an existing frr_pthread by its id. +/* + * Gets an existing frr_pthread by its id. * * @return frr_thread associated with the provided id, or NULL on error */ struct frr_pthread *frr_pthread_get(unsigned int id); -/* Creates a new pthread and binds it to a frr_pthread. +/* + * Creates a new pthread and binds it to a frr_pthread. * * This function is a wrapper for pthread_create. The first parameter is the * frr_pthread to bind the created pthread to. All subsequent arguments are - * passed unmodified to pthread_create(). + * passed unmodified to pthread_create(). The frr_pthread * provided will be + * used as the argument to the pthread entry function. If it is necessary to + * pass additional data, the 'data' field in the frr_pthread may be used. * * This function returns the same code as pthread_create(). If the value is * zero, the provided frr_pthread is bound to a running POSIX thread. If the * value is less than zero, the provided frr_pthread is guaranteed to be a * clean instance that may be susbsequently passed to frr_pthread_run(). * - * @param id - frr_pthread to bind the created pthread to + * @param fpt - frr_pthread * to run * @param attr - see pthread_create(3) - * @param arg - see pthread_create(3) * * @return see pthread_create(3) */ -int frr_pthread_run(unsigned int id, const pthread_attr_t *attr, void *arg); +int frr_pthread_run(struct frr_pthread *fpt, const pthread_attr_t *attr); -/* Stops an frr_pthread with a result. +/* + * Waits until the specified pthread has finished setting up and is ready to + * begin work. * - * @param id - frr_pthread to stop + * If the pthread's code makes use of the startup synchronization mechanism, + * this function should be called before attempting to use the functionality + * exposed by the pthread. It waits until the 'running' condition is satisfied + * (see struct definition of frr_pthread). + * + * @param fpt - the frr_pthread * to wait on + */ +void frr_pthread_wait_running(struct frr_pthread *fpt); + +/* + * Notifies other pthreads that the calling thread has finished setting up and + * is ready to begin work. + * + * This will allow any other pthreads waiting in 'frr_pthread_wait_running' to + * proceed. + * + * @param fpt - the frr_pthread * that has finished setting up + */ +void frr_pthread_notify_running(struct frr_pthread *fpt); + +/* + * Stops a frr_pthread with a result. + * + * @param fpt - frr_pthread * to stop * @param result - where to store the thread's result, if any. May be NULL if a * result is not needed. */ -int frr_pthread_stop(unsigned int id, void **result); +int frr_pthread_stop(struct frr_pthread *fpt, void **result); /* Stops all frr_pthread's. */ void frr_pthread_stop_all(void); @@ -136,7 +187,8 @@ void frr_pthread_stop_all(void); /* Yields the current thread of execution */ void frr_pthread_yield(void); -/* Returns a unique identifier for use with frr_pthread_new(). +/* + * Returns a unique identifier for use with frr_pthread_new(). * * Internally, this is an integer that increments after each call to this * function. Because the number of pthreads created should never exceed INT_MAX From a715eab3ce7b3aedcd8f809fc7f7f512f29cceb9 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 24 Jan 2018 11:07:27 -0500 Subject: [PATCH 2/4] bgpd: update pthreads to use lib changes Use the new threading facilities provided in lib/ to streamline the threads used in bgpd. In particular, all of the lifecycle code has been removed from the I/O thread and replaced with the default loop. Did not do the same to the keepalives thread as it is much smaller (doesn't need the event system). Also cleaned up some comments to match the style guide. Signed-off-by: Quentin Young --- bgpd/bgp_io.c | 138 ++++++++---------------------------------- bgpd/bgp_io.h | 17 ------ bgpd/bgp_keepalives.c | 86 +++++++++++++------------- bgpd/bgp_keepalives.h | 2 +- bgpd/bgpd.c | 39 ++++++------ 5 files changed, 92 insertions(+), 190 deletions(-) diff --git a/bgpd/bgp_io.c b/bgpd/bgp_io.c index f4bfc90b7e..59b2d1cdaa 100644 --- a/bgpd/bgp_io.c +++ b/bgpd/bgp_io.c @@ -51,100 +51,12 @@ static bool validate_header(struct peer *); #define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred #define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error -/* Plumbing & control variables for thread lifecycle - * ------------------------------------------------------------------------ */ -bool bgp_io_thread_run; -pthread_mutex_t *running_cond_mtx; -pthread_cond_t *running_cond; - -/* Unused callback for thread_add_read() */ -static int bgp_io_dummy(struct thread *thread) { return 0; } - -/* Poison pill task */ -static int bgp_io_finish(struct thread *thread) -{ - bgp_io_thread_run = false; - return 0; -} - -/* Extern lifecycle control functions. init -> start -> stop - * ------------------------------------------------------------------------ */ -void bgp_io_init() -{ - bgp_io_thread_run = false; - - running_cond_mtx = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t)); - running_cond = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_cond_t)); - - pthread_mutex_init(running_cond_mtx, NULL); - pthread_cond_init(running_cond, NULL); - - /* unlocked in bgp_io_wait_running() */ - pthread_mutex_lock(running_cond_mtx); -} - -void *bgp_io_start(void *arg) -{ - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - fpt->master->owner = pthread_self(); - - // fd so we can sleep in poll() - int sleeper[2]; - pipe(sleeper); - thread_add_read(fpt->master, &bgp_io_dummy, NULL, sleeper[0], NULL); - - // we definitely don't want to handle signals - fpt->master->handle_signals = false; - - struct thread task; - - pthread_mutex_lock(running_cond_mtx); - { - bgp_io_thread_run = true; - pthread_cond_signal(running_cond); - } - pthread_mutex_unlock(running_cond_mtx); - - while (bgp_io_thread_run) { - if (thread_fetch(fpt->master, &task)) { - thread_call(&task); - } - } - - close(sleeper[1]); - close(sleeper[0]); - - return NULL; -} - -void bgp_io_wait_running() -{ - while (!bgp_io_thread_run) - pthread_cond_wait(running_cond, running_cond_mtx); - - /* locked in bgp_io_init() */ - pthread_mutex_unlock(running_cond_mtx); -} - -int bgp_io_stop(void **result, struct frr_pthread *fpt) -{ - thread_add_event(fpt->master, &bgp_io_finish, NULL, 0, NULL); - pthread_join(fpt->thread, result); - - pthread_mutex_destroy(running_cond_mtx); - pthread_cond_destroy(running_cond); - - XFREE(MTYPE_PTHREAD_PRIM, running_cond_mtx); - XFREE(MTYPE_PTHREAD_PRIM, running_cond); - - return 0; -} - -/* Extern API -------------------------------------------------------------- */ +/* Thread external API ----------------------------------------------------- */ void bgp_writes_on(struct peer *peer) { - assert(bgp_io_thread_run); + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + assert(fpt->running); assert(peer->status != Deleted); assert(peer->obuf); @@ -154,8 +66,6 @@ void bgp_writes_on(struct peer *peer) assert(!peer->t_connect_check_w); assert(peer->fd); - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd, &peer->t_write); SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); @@ -163,9 +73,8 @@ void bgp_writes_on(struct peer *peer) void bgp_writes_off(struct peer *peer) { - assert(bgp_io_thread_run); - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + assert(fpt->running); thread_cancel_async(fpt->master, &peer->t_write, NULL); THREAD_OFF(peer->t_generate_updgrp_packets); @@ -175,7 +84,8 @@ void bgp_writes_off(struct peer *peer) void bgp_reads_on(struct peer *peer) { - assert(bgp_io_thread_run); + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + assert(fpt->running); assert(peer->status != Deleted); assert(peer->ibuf); @@ -186,8 +96,6 @@ void bgp_reads_on(struct peer *peer) assert(!peer->t_connect_check_w); assert(peer->fd); - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); - thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, &peer->t_read); @@ -196,9 +104,8 @@ void bgp_reads_on(struct peer *peer) void bgp_reads_off(struct peer *peer) { - assert(bgp_io_thread_run); - struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); + assert(fpt->running); thread_cancel_async(fpt->master, &peer->t_read, NULL); THREAD_OFF(peer->t_process_packet); @@ -206,9 +113,9 @@ void bgp_reads_off(struct peer *peer) UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); } -/* Internal functions ------------------------------------------------------- */ +/* Thread internal functions ----------------------------------------------- */ -/** +/* * Called from I/O pthread when a file descriptor has become ready for writing. */ static int bgp_process_writes(struct thread *thread) @@ -231,11 +138,13 @@ static int bgp_process_writes(struct thread *thread) } pthread_mutex_unlock(&peer->io_mtx); - if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */ + /* no problem */ + if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { } + /* problem */ if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) { - reschedule = false; /* problem */ + reschedule = false; fatal = true; } @@ -250,7 +159,7 @@ static int bgp_process_writes(struct thread *thread) return 0; } -/** +/* * Called from I/O pthread when a file descriptor has become ready for reading, * or has hung up. * @@ -321,8 +230,10 @@ static int bgp_process_reads(struct thread *thread) /* if this fails we are seriously screwed */ assert(pktsize <= BGP_MAX_PACKET_SIZE); - /* If we have that much data, chuck it into its own - * stream and append to input queue for processing. */ + /* + * If we have that much data, chuck it into its own + * stream and append to input queue for processing. + */ if (ringbuf_remain(ibw) >= pktsize) { struct stream *pkt = stream_new(pktsize); assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize); @@ -356,7 +267,7 @@ static int bgp_process_reads(struct thread *thread) return 0; } -/** +/* * Flush peer output buffer. * * This function pops packets off of peer->obuf and writes them to peer->fd. @@ -379,7 +290,6 @@ static uint16_t bgp_write(struct peer *peer) uint16_t status = 0; uint32_t wpkt_quanta_old; - // cache current write quanta wpkt_quanta_old = atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed); @@ -398,7 +308,7 @@ static uint16_t bgp_write(struct peer *peer) } goto done; - } else if (num != writenum) // incomplete write + } else if (num != writenum) stream_forward_getp(s, num); } while (num != writenum); @@ -427,8 +337,10 @@ static uint16_t bgp_write(struct peer *peer) if (peer->v_start >= (60 * 2)) peer->v_start = (60 * 2); - /* Handle Graceful Restart case where the state changes - * to Connect instead of Idle */ + /* + * Handle Graceful Restart case where the state changes + * to Connect instead of Idle. + */ BGP_EVENT_ADD(peer, BGP_Stop); goto done; @@ -472,7 +384,7 @@ done : { return status; } -/** +/* * Reads a chunk of data from peer->fd into peer->ibuf_work. * * @return status flag (see top-of-file) diff --git a/bgpd/bgp_io.h b/bgpd/bgp_io.h index 7cfd1db710..14a12d3705 100644 --- a/bgpd/bgp_io.h +++ b/bgpd/bgp_io.h @@ -28,14 +28,6 @@ #include "bgpd/bgpd.h" #include "frr_pthread.h" -/** - * Initializes data structures and flags for the write thread. - * - * This function must be called from the main thread before - * bgp_writes_start() is invoked. - */ -extern void bgp_io_init(void); - /** * Start function for write thread. * @@ -43,15 +35,6 @@ extern void bgp_io_init(void); */ extern void *bgp_io_start(void *arg); -/** - * Wait until the IO thread is ready to accept jobs. - * - * This function must be called immediately after the thread has been created - * for running. Use of other functions before calling this one will result in - * undefined behavior. - */ -extern void bgp_io_wait_running(void); - /** * Start function for write thread. * diff --git a/bgpd/bgp_keepalives.c b/bgpd/bgp_keepalives.c index afa280a799..a6976109a8 100644 --- a/bgpd/bgp_keepalives.c +++ b/bgpd/bgp_keepalives.c @@ -36,14 +36,14 @@ #include "bgpd/bgp_keepalives.h" /* clang-format on */ -/** +/* * Peer KeepAlive Timer. * Associates a peer with the time of its last keepalive. */ struct pkat { - // the peer to send keepalives to + /* the peer to send keepalives to */ struct peer *peer; - // absolute time of last keepalive sent + /* absolute time of last keepalive sent */ struct timeval last; }; @@ -52,9 +52,6 @@ static pthread_mutex_t *peerhash_mtx; static pthread_cond_t *peerhash_cond; static struct hash *peerhash; -/* Thread control flag. */ -bool bgp_keepalives_thread_run = false; - static struct pkat *pkat_new(struct peer *peer) { struct pkat *pkat = XMALLOC(MTYPE_TMP, sizeof(struct pkat)); @@ -100,10 +97,10 @@ static void peer_process(struct hash_backet *hb, void *arg) static struct timeval tolerance = {0, 100000}; - // calculate elapsed time since last keepalive + /* calculate elapsed time since last keepalive */ monotime_since(&pkat->last, &elapsed); - // calculate difference between elapsed time and configured time + /* calculate difference between elapsed time and configured time */ ka.tv_sec = pkat->peer->v_keepalive; timersub(&ka, &elapsed, &diff); @@ -118,10 +115,10 @@ static void peer_process(struct hash_backet *hb, void *arg) bgp_keepalive_send(pkat->peer); monotime(&pkat->last); memset(&elapsed, 0x00, sizeof(struct timeval)); - diff = ka; // time until next keepalive == peer keepalive time + diff = ka; } - // if calculated next update for this peer < current delay, use it + /* if calculated next update for this peer < current delay, use it */ if (next_update->tv_sec <= 0 || timercmp(&diff, next_update, <)) *next_update = diff; } @@ -139,29 +136,9 @@ static unsigned int peer_hash_key(void *arg) return (uintptr_t)pkat->peer; } -void bgp_keepalives_init() -{ - peerhash_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t)); - peerhash_cond = XCALLOC(MTYPE_TMP, sizeof(pthread_cond_t)); - - // initialize mutex - pthread_mutex_init(peerhash_mtx, NULL); - - // use monotonic clock with condition variable - pthread_condattr_t attrs; - pthread_condattr_init(&attrs); - pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC); - pthread_cond_init(peerhash_cond, &attrs); - pthread_condattr_destroy(&attrs); - - // initialize peer hashtable - peerhash = hash_create_size(2048, peer_hash_key, peer_hash_cmp, NULL); -} - +/* Cleanup handler / deinitializer. */ static void bgp_keepalives_finish(void *arg) { - bgp_keepalives_thread_run = false; - if (peerhash) { hash_clean(peerhash, pkat_del); hash_free(peerhash); @@ -177,32 +154,50 @@ static void bgp_keepalives_finish(void *arg) XFREE(MTYPE_TMP, peerhash_cond); } -/** +/* * Entry function for peer keepalive generation pthread. - * - * bgp_keepalives_init() must be called prior to this. */ void *bgp_keepalives_start(void *arg) { + struct frr_pthread *fpt = arg; + fpt->master->owner = pthread_self(); + struct timeval currtime = {0, 0}; struct timeval aftertime = {0, 0}; struct timeval next_update = {0, 0}; struct timespec next_update_ts = {0, 0}; + peerhash_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t)); + peerhash_cond = XCALLOC(MTYPE_TMP, sizeof(pthread_cond_t)); + + /* initialize mutex */ + pthread_mutex_init(peerhash_mtx, NULL); + + /* use monotonic clock with condition variable */ + pthread_condattr_t attrs; + pthread_condattr_init(&attrs); + pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC); + pthread_cond_init(peerhash_cond, &attrs); + pthread_condattr_destroy(&attrs); + + /* initialize peer hashtable */ + peerhash = hash_create_size(2048, peer_hash_key, peer_hash_cmp, NULL); pthread_mutex_lock(peerhash_mtx); - // register cleanup handler + /* register cleanup handler */ pthread_cleanup_push(&bgp_keepalives_finish, NULL); - bgp_keepalives_thread_run = true; + /* notify anybody waiting on us that we are done starting up */ + frr_pthread_notify_running(fpt); - while (bgp_keepalives_thread_run) { + while (atomic_load_explicit(&fpt->running, memory_order_relaxed)) { if (peerhash->count > 0) pthread_cond_timedwait(peerhash_cond, peerhash_mtx, &next_update_ts); else while (peerhash->count == 0 - && bgp_keepalives_thread_run) + && atomic_load_explicit(&fpt->running, + memory_order_relaxed)) pthread_cond_wait(peerhash_cond, peerhash_mtx); monotime(&currtime); @@ -219,7 +214,7 @@ void *bgp_keepalives_start(void *arg) TIMEVAL_TO_TIMESPEC(&next_update, &next_update_ts); } - // clean up + /* clean up */ pthread_cleanup_pop(1); return NULL; @@ -229,6 +224,9 @@ void *bgp_keepalives_start(void *arg) void bgp_keepalives_on(struct peer *peer) { + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES); + assert(fpt->running); + /* placeholder bucket data to use for fast key lookups */ static struct pkat holder = {0}; @@ -253,6 +251,9 @@ void bgp_keepalives_on(struct peer *peer) void bgp_keepalives_off(struct peer *peer) { + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES); + assert(fpt->running); + /* placeholder bucket data to use for fast key lookups */ static struct pkat holder = {0}; @@ -283,10 +284,13 @@ void bgp_keepalives_wake() pthread_mutex_unlock(peerhash_mtx); } -int bgp_keepalives_stop(void **result, struct frr_pthread *fpt) +int bgp_keepalives_stop(struct frr_pthread *fpt, void **result) { - bgp_keepalives_thread_run = false; + assert(fpt->running); + + atomic_store_explicit(&fpt->running, false, memory_order_relaxed); bgp_keepalives_wake(); + pthread_join(fpt->thread, result); return 0; } diff --git a/bgpd/bgp_keepalives.h b/bgpd/bgp_keepalives.h index 1fbd035b9e..d1cb7d2462 100644 --- a/bgpd/bgp_keepalives.h +++ b/bgpd/bgp_keepalives.h @@ -88,6 +88,6 @@ extern void bgp_keepalives_wake(void); /** * Stops the thread and blocks until it terminates. */ -int bgp_keepalives_stop(void **result, struct frr_pthread *fpt); +int bgp_keepalives_stop(struct frr_pthread *fpt, void **result); #endif /* _FRR_BGP_KEEPALIVES_H */ diff --git a/bgpd/bgpd.c b/bgpd/bgpd.c index 7db73043cc..19f0c8cabf 100644 --- a/bgpd/bgpd.c +++ b/bgpd/bgpd.c @@ -7470,30 +7470,33 @@ static void bgp_pthreads_init() { frr_pthread_init(); - frr_pthread_new("BGP i/o thread", PTHREAD_IO, bgp_io_start, - bgp_io_stop); - frr_pthread_new("BGP keepalives thread", PTHREAD_KEEPALIVES, - bgp_keepalives_start, bgp_keepalives_stop); - - /* pre-run initialization */ - bgp_keepalives_init(); - bgp_io_init(); + struct frr_pthread_attr io = { + .id = PTHREAD_IO, + .start = frr_pthread_attr_default.start, + .stop = frr_pthread_attr_default.stop, + .name = "BGP I/O thread", + }; + struct frr_pthread_attr ka = { + .id = PTHREAD_KEEPALIVES, + .start = bgp_keepalives_start, + .stop = bgp_keepalives_stop, + .name = "BGP Keepalives thread", + }; + frr_pthread_new(&io); + frr_pthread_new(&ka); } void bgp_pthreads_run() { - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setschedpolicy(&attr, SCHED_FIFO); + struct frr_pthread *io = frr_pthread_get(PTHREAD_IO); + struct frr_pthread *ka = frr_pthread_get(PTHREAD_KEEPALIVES); - /* - * I/O related code assumes the thread is ready for work at all times, - * so we wait until it is. - */ - frr_pthread_run(PTHREAD_IO, &attr, NULL); - bgp_io_wait_running(); + frr_pthread_run(io, NULL); + frr_pthread_run(ka, NULL); - frr_pthread_run(PTHREAD_KEEPALIVES, &attr, NULL); + /* Wait until threads are ready. */ + frr_pthread_wait_running(io); + frr_pthread_wait_running(ka); } void bgp_pthreads_finish() From cadc5f3377eec33c17e5cc230b0671c884c3fcca Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 24 Jan 2018 15:53:07 -0500 Subject: [PATCH 3/4] tests: make tests happy for pthread changes Signed-off-by: Quentin Young --- tests/bgpd/test_aspath.c | 6 +++++- tests/bgpd/test_capability.c | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/bgpd/test_aspath.c b/tests/bgpd/test_aspath.c index 56808bc8ad..9e5cb7fe54 100644 --- a/tests/bgpd/test_aspath.c +++ b/tests/bgpd/test_aspath.c @@ -25,8 +25,9 @@ #include "privs.h" #include "queue.h" #include "filter.h" +#include "frr_pthread.h" -#include "bgpd/bgpd.h" +#include "bgpd/bgpd.c" #include "bgpd/bgp_aspath.h" #include "bgpd/bgp_attr.h" #include "bgpd/bgp_packet.h" @@ -1272,6 +1273,9 @@ static int handle_attr_test(struct aspath_tests *t) struct aspath *asp; size_t datalen; + bgp_pthreads_init(); + frr_pthread_get(PTHREAD_KEEPALIVES)->running = true; + asp = make_aspath(t->segment->asdata, t->segment->len, 0); peer.curr = stream_new(BGP_MAX_PACKET_SIZE); diff --git a/tests/bgpd/test_capability.c b/tests/bgpd/test_capability.c index a5092708e2..3d5518f3b7 100644 --- a/tests/bgpd/test_capability.c +++ b/tests/bgpd/test_capability.c @@ -27,8 +27,9 @@ #include "memory.h" #include "queue.h" #include "filter.h" +#include "frr_pthread.h" -#include "bgpd/bgpd.h" +#include "bgpd/bgpd.c" #include "bgpd/bgp_open.h" #include "bgpd/bgp_debug.h" #include "bgpd/bgp_packet.h" @@ -915,6 +916,9 @@ int main(void) vrf_init(NULL, NULL, NULL, NULL); bgp_option_set(BGP_OPT_NO_LISTEN); + bgp_pthreads_init(); + frr_pthread_get(PTHREAD_KEEPALIVES)->running = true; + if (fileno(stdout) >= 0) tty = isatty(fileno(stdout)); From 096476ddb09fd7bf8946c5429bd9903dd33f3d78 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 24 Jan 2018 17:47:17 -0500 Subject: [PATCH 4/4] bgpd: check flags before attempting keepalive ops If a peer already has keepalives turned on when asking to turn them on, return immediately. Same thing for turning them off. Signed-off-by: Quentin Young --- bgpd/bgp_keepalives.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bgpd/bgp_keepalives.c b/bgpd/bgp_keepalives.c index a6976109a8..5a48c7013e 100644 --- a/bgpd/bgp_keepalives.c +++ b/bgpd/bgp_keepalives.c @@ -224,6 +224,9 @@ void *bgp_keepalives_start(void *arg) void bgp_keepalives_on(struct peer *peer) { + if (CHECK_FLAG(peer->thread_flags, PEER_THREAD_KEEPALIVES_ON)) + return; + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES); assert(fpt->running); @@ -251,6 +254,9 @@ void bgp_keepalives_on(struct peer *peer) void bgp_keepalives_off(struct peer *peer) { + if (!CHECK_FLAG(peer->thread_flags, PEER_THREAD_KEEPALIVES_ON)) + return; + struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES); assert(fpt->running);