Merge pull request #1672 from qlyoung/frr-pthread-improvements

FRR pthread improvements
This commit is contained in:
Renato Westphal 2018-01-26 10:47:55 -02:00 committed by GitHub
commit cbbb31b25b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 364 additions and 307 deletions

View File

@ -51,100 +51,12 @@ static bool validate_header(struct peer *);
#define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred #define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred
#define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error #define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error
/* Plumbing & control variables for thread lifecycle /* Thread external API ----------------------------------------------------- */
* ------------------------------------------------------------------------ */
bool bgp_io_thread_run;
pthread_mutex_t *running_cond_mtx;
pthread_cond_t *running_cond;
/* Unused callback for thread_add_read() */
static int bgp_io_dummy(struct thread *thread) { return 0; }
/* Poison pill task */
static int bgp_io_finish(struct thread *thread)
{
bgp_io_thread_run = false;
return 0;
}
/* Extern lifecycle control functions. init -> start -> stop
* ------------------------------------------------------------------------ */
void bgp_io_init()
{
bgp_io_thread_run = false;
running_cond_mtx = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t));
running_cond = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_cond_t));
pthread_mutex_init(running_cond_mtx, NULL);
pthread_cond_init(running_cond, NULL);
/* unlocked in bgp_io_wait_running() */
pthread_mutex_lock(running_cond_mtx);
}
void *bgp_io_start(void *arg)
{
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
fpt->master->owner = pthread_self();
// fd so we can sleep in poll()
int sleeper[2];
pipe(sleeper);
thread_add_read(fpt->master, &bgp_io_dummy, NULL, sleeper[0], NULL);
// we definitely don't want to handle signals
fpt->master->handle_signals = false;
struct thread task;
pthread_mutex_lock(running_cond_mtx);
{
bgp_io_thread_run = true;
pthread_cond_signal(running_cond);
}
pthread_mutex_unlock(running_cond_mtx);
while (bgp_io_thread_run) {
if (thread_fetch(fpt->master, &task)) {
thread_call(&task);
}
}
close(sleeper[1]);
close(sleeper[0]);
return NULL;
}
void bgp_io_wait_running()
{
while (!bgp_io_thread_run)
pthread_cond_wait(running_cond, running_cond_mtx);
/* locked in bgp_io_init() */
pthread_mutex_unlock(running_cond_mtx);
}
int bgp_io_stop(void **result, struct frr_pthread *fpt)
{
thread_add_event(fpt->master, &bgp_io_finish, NULL, 0, NULL);
pthread_join(fpt->thread, result);
pthread_mutex_destroy(running_cond_mtx);
pthread_cond_destroy(running_cond);
XFREE(MTYPE_PTHREAD_PRIM, running_cond_mtx);
XFREE(MTYPE_PTHREAD_PRIM, running_cond);
return 0;
}
/* Extern API -------------------------------------------------------------- */
void bgp_writes_on(struct peer *peer) void bgp_writes_on(struct peer *peer)
{ {
assert(bgp_io_thread_run); struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);
assert(peer->status != Deleted); assert(peer->status != Deleted);
assert(peer->obuf); assert(peer->obuf);
@ -154,8 +66,6 @@ void bgp_writes_on(struct peer *peer)
assert(!peer->t_connect_check_w); assert(!peer->t_connect_check_w);
assert(peer->fd); assert(peer->fd);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd, thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd,
&peer->t_write); &peer->t_write);
SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON); SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
@ -163,9 +73,8 @@ void bgp_writes_on(struct peer *peer)
void bgp_writes_off(struct peer *peer) void bgp_writes_off(struct peer *peer)
{ {
assert(bgp_io_thread_run);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);
thread_cancel_async(fpt->master, &peer->t_write, NULL); thread_cancel_async(fpt->master, &peer->t_write, NULL);
THREAD_OFF(peer->t_generate_updgrp_packets); THREAD_OFF(peer->t_generate_updgrp_packets);
@ -175,7 +84,8 @@ void bgp_writes_off(struct peer *peer)
void bgp_reads_on(struct peer *peer) void bgp_reads_on(struct peer *peer)
{ {
assert(bgp_io_thread_run); struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);
assert(peer->status != Deleted); assert(peer->status != Deleted);
assert(peer->ibuf); assert(peer->ibuf);
@ -186,8 +96,6 @@ void bgp_reads_on(struct peer *peer)
assert(!peer->t_connect_check_w); assert(!peer->t_connect_check_w);
assert(peer->fd); assert(peer->fd);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd, thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read); &peer->t_read);
@ -196,9 +104,8 @@ void bgp_reads_on(struct peer *peer)
void bgp_reads_off(struct peer *peer) void bgp_reads_off(struct peer *peer)
{ {
assert(bgp_io_thread_run);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO); struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);
thread_cancel_async(fpt->master, &peer->t_read, NULL); thread_cancel_async(fpt->master, &peer->t_read, NULL);
THREAD_OFF(peer->t_process_packet); THREAD_OFF(peer->t_process_packet);
@ -206,9 +113,9 @@ void bgp_reads_off(struct peer *peer)
UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON); UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
} }
/* Internal functions ------------------------------------------------------- */ /* Thread internal functions ----------------------------------------------- */
/** /*
* Called from I/O pthread when a file descriptor has become ready for writing. * Called from I/O pthread when a file descriptor has become ready for writing.
*/ */
static int bgp_process_writes(struct thread *thread) static int bgp_process_writes(struct thread *thread)
@ -231,11 +138,13 @@ static int bgp_process_writes(struct thread *thread)
} }
pthread_mutex_unlock(&peer->io_mtx); pthread_mutex_unlock(&peer->io_mtx);
if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */ /* no problem */
if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
} }
/* problem */
if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) { if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
reschedule = false; /* problem */ reschedule = false;
fatal = true; fatal = true;
} }
@ -250,7 +159,7 @@ static int bgp_process_writes(struct thread *thread)
return 0; return 0;
} }
/** /*
* Called from I/O pthread when a file descriptor has become ready for reading, * Called from I/O pthread when a file descriptor has become ready for reading,
* or has hung up. * or has hung up.
* *
@ -321,8 +230,10 @@ static int bgp_process_reads(struct thread *thread)
/* if this fails we are seriously screwed */ /* if this fails we are seriously screwed */
assert(pktsize <= BGP_MAX_PACKET_SIZE); assert(pktsize <= BGP_MAX_PACKET_SIZE);
/* If we have that much data, chuck it into its own /*
* stream and append to input queue for processing. */ * If we have that much data, chuck it into its own
* stream and append to input queue for processing.
*/
if (ringbuf_remain(ibw) >= pktsize) { if (ringbuf_remain(ibw) >= pktsize) {
struct stream *pkt = stream_new(pktsize); struct stream *pkt = stream_new(pktsize);
assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize); assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize);
@ -356,7 +267,7 @@ static int bgp_process_reads(struct thread *thread)
return 0; return 0;
} }
/** /*
* Flush peer output buffer. * Flush peer output buffer.
* *
* This function pops packets off of peer->obuf and writes them to peer->fd. * This function pops packets off of peer->obuf and writes them to peer->fd.
@ -379,7 +290,6 @@ static uint16_t bgp_write(struct peer *peer)
uint16_t status = 0; uint16_t status = 0;
uint32_t wpkt_quanta_old; uint32_t wpkt_quanta_old;
// cache current write quanta
wpkt_quanta_old = wpkt_quanta_old =
atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed); atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed);
@ -398,7 +308,7 @@ static uint16_t bgp_write(struct peer *peer)
} }
goto done; goto done;
} else if (num != writenum) // incomplete write } else if (num != writenum)
stream_forward_getp(s, num); stream_forward_getp(s, num);
} while (num != writenum); } while (num != writenum);
@ -427,8 +337,10 @@ static uint16_t bgp_write(struct peer *peer)
if (peer->v_start >= (60 * 2)) if (peer->v_start >= (60 * 2))
peer->v_start = (60 * 2); peer->v_start = (60 * 2);
/* Handle Graceful Restart case where the state changes /*
* to Connect instead of Idle */ * Handle Graceful Restart case where the state changes
* to Connect instead of Idle.
*/
BGP_EVENT_ADD(peer, BGP_Stop); BGP_EVENT_ADD(peer, BGP_Stop);
goto done; goto done;
@ -472,7 +384,7 @@ done : {
return status; return status;
} }
/** /*
* Reads a chunk of data from peer->fd into peer->ibuf_work. * Reads a chunk of data from peer->fd into peer->ibuf_work.
* *
* @return status flag (see top-of-file) * @return status flag (see top-of-file)

View File

@ -28,14 +28,6 @@
#include "bgpd/bgpd.h" #include "bgpd/bgpd.h"
#include "frr_pthread.h" #include "frr_pthread.h"
/**
* Initializes data structures and flags for the write thread.
*
* This function must be called from the main thread before
* bgp_writes_start() is invoked.
*/
extern void bgp_io_init(void);
/** /**
* Start function for write thread. * Start function for write thread.
* *
@ -43,15 +35,6 @@ extern void bgp_io_init(void);
*/ */
extern void *bgp_io_start(void *arg); extern void *bgp_io_start(void *arg);
/**
* Wait until the IO thread is ready to accept jobs.
*
* This function must be called immediately after the thread has been created
* for running. Use of other functions before calling this one will result in
* undefined behavior.
*/
extern void bgp_io_wait_running(void);
/** /**
* Start function for write thread. * Start function for write thread.
* *

View File

@ -36,14 +36,14 @@
#include "bgpd/bgp_keepalives.h" #include "bgpd/bgp_keepalives.h"
/* clang-format on */ /* clang-format on */
/** /*
* Peer KeepAlive Timer. * Peer KeepAlive Timer.
* Associates a peer with the time of its last keepalive. * Associates a peer with the time of its last keepalive.
*/ */
struct pkat { struct pkat {
// the peer to send keepalives to /* the peer to send keepalives to */
struct peer *peer; struct peer *peer;
// absolute time of last keepalive sent /* absolute time of last keepalive sent */
struct timeval last; struct timeval last;
}; };
@ -52,9 +52,6 @@ static pthread_mutex_t *peerhash_mtx;
static pthread_cond_t *peerhash_cond; static pthread_cond_t *peerhash_cond;
static struct hash *peerhash; static struct hash *peerhash;
/* Thread control flag. */
bool bgp_keepalives_thread_run = false;
static struct pkat *pkat_new(struct peer *peer) static struct pkat *pkat_new(struct peer *peer)
{ {
struct pkat *pkat = XMALLOC(MTYPE_TMP, sizeof(struct pkat)); struct pkat *pkat = XMALLOC(MTYPE_TMP, sizeof(struct pkat));
@ -100,10 +97,10 @@ static void peer_process(struct hash_backet *hb, void *arg)
static struct timeval tolerance = {0, 100000}; static struct timeval tolerance = {0, 100000};
// calculate elapsed time since last keepalive /* calculate elapsed time since last keepalive */
monotime_since(&pkat->last, &elapsed); monotime_since(&pkat->last, &elapsed);
// calculate difference between elapsed time and configured time /* calculate difference between elapsed time and configured time */
ka.tv_sec = pkat->peer->v_keepalive; ka.tv_sec = pkat->peer->v_keepalive;
timersub(&ka, &elapsed, &diff); timersub(&ka, &elapsed, &diff);
@ -118,10 +115,10 @@ static void peer_process(struct hash_backet *hb, void *arg)
bgp_keepalive_send(pkat->peer); bgp_keepalive_send(pkat->peer);
monotime(&pkat->last); monotime(&pkat->last);
memset(&elapsed, 0x00, sizeof(struct timeval)); memset(&elapsed, 0x00, sizeof(struct timeval));
diff = ka; // time until next keepalive == peer keepalive time diff = ka;
} }
// if calculated next update for this peer < current delay, use it /* if calculated next update for this peer < current delay, use it */
if (next_update->tv_sec <= 0 || timercmp(&diff, next_update, <)) if (next_update->tv_sec <= 0 || timercmp(&diff, next_update, <))
*next_update = diff; *next_update = diff;
} }
@ -139,29 +136,9 @@ static unsigned int peer_hash_key(void *arg)
return (uintptr_t)pkat->peer; return (uintptr_t)pkat->peer;
} }
void bgp_keepalives_init() /* Cleanup handler / deinitializer. */
{
peerhash_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t));
peerhash_cond = XCALLOC(MTYPE_TMP, sizeof(pthread_cond_t));
// initialize mutex
pthread_mutex_init(peerhash_mtx, NULL);
// use monotonic clock with condition variable
pthread_condattr_t attrs;
pthread_condattr_init(&attrs);
pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
pthread_cond_init(peerhash_cond, &attrs);
pthread_condattr_destroy(&attrs);
// initialize peer hashtable
peerhash = hash_create_size(2048, peer_hash_key, peer_hash_cmp, NULL);
}
static void bgp_keepalives_finish(void *arg) static void bgp_keepalives_finish(void *arg)
{ {
bgp_keepalives_thread_run = false;
if (peerhash) { if (peerhash) {
hash_clean(peerhash, pkat_del); hash_clean(peerhash, pkat_del);
hash_free(peerhash); hash_free(peerhash);
@ -177,32 +154,50 @@ static void bgp_keepalives_finish(void *arg)
XFREE(MTYPE_TMP, peerhash_cond); XFREE(MTYPE_TMP, peerhash_cond);
} }
/** /*
* Entry function for peer keepalive generation pthread. * Entry function for peer keepalive generation pthread.
*
* bgp_keepalives_init() must be called prior to this.
*/ */
void *bgp_keepalives_start(void *arg) void *bgp_keepalives_start(void *arg)
{ {
struct frr_pthread *fpt = arg;
fpt->master->owner = pthread_self();
struct timeval currtime = {0, 0}; struct timeval currtime = {0, 0};
struct timeval aftertime = {0, 0}; struct timeval aftertime = {0, 0};
struct timeval next_update = {0, 0}; struct timeval next_update = {0, 0};
struct timespec next_update_ts = {0, 0}; struct timespec next_update_ts = {0, 0};
peerhash_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t));
peerhash_cond = XCALLOC(MTYPE_TMP, sizeof(pthread_cond_t));
/* initialize mutex */
pthread_mutex_init(peerhash_mtx, NULL);
/* use monotonic clock with condition variable */
pthread_condattr_t attrs;
pthread_condattr_init(&attrs);
pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
pthread_cond_init(peerhash_cond, &attrs);
pthread_condattr_destroy(&attrs);
/* initialize peer hashtable */
peerhash = hash_create_size(2048, peer_hash_key, peer_hash_cmp, NULL);
pthread_mutex_lock(peerhash_mtx); pthread_mutex_lock(peerhash_mtx);
// register cleanup handler /* register cleanup handler */
pthread_cleanup_push(&bgp_keepalives_finish, NULL); pthread_cleanup_push(&bgp_keepalives_finish, NULL);
bgp_keepalives_thread_run = true; /* notify anybody waiting on us that we are done starting up */
frr_pthread_notify_running(fpt);
while (bgp_keepalives_thread_run) { while (atomic_load_explicit(&fpt->running, memory_order_relaxed)) {
if (peerhash->count > 0) if (peerhash->count > 0)
pthread_cond_timedwait(peerhash_cond, peerhash_mtx, pthread_cond_timedwait(peerhash_cond, peerhash_mtx,
&next_update_ts); &next_update_ts);
else else
while (peerhash->count == 0 while (peerhash->count == 0
&& bgp_keepalives_thread_run) && atomic_load_explicit(&fpt->running,
memory_order_relaxed))
pthread_cond_wait(peerhash_cond, peerhash_mtx); pthread_cond_wait(peerhash_cond, peerhash_mtx);
monotime(&currtime); monotime(&currtime);
@ -219,7 +214,7 @@ void *bgp_keepalives_start(void *arg)
TIMEVAL_TO_TIMESPEC(&next_update, &next_update_ts); TIMEVAL_TO_TIMESPEC(&next_update, &next_update_ts);
} }
// clean up /* clean up */
pthread_cleanup_pop(1); pthread_cleanup_pop(1);
return NULL; return NULL;
@ -229,6 +224,12 @@ void *bgp_keepalives_start(void *arg)
void bgp_keepalives_on(struct peer *peer) void bgp_keepalives_on(struct peer *peer)
{ {
if (CHECK_FLAG(peer->thread_flags, PEER_THREAD_KEEPALIVES_ON))
return;
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES);
assert(fpt->running);
/* placeholder bucket data to use for fast key lookups */ /* placeholder bucket data to use for fast key lookups */
static struct pkat holder = {0}; static struct pkat holder = {0};
@ -253,6 +254,12 @@ void bgp_keepalives_on(struct peer *peer)
void bgp_keepalives_off(struct peer *peer) void bgp_keepalives_off(struct peer *peer)
{ {
if (!CHECK_FLAG(peer->thread_flags, PEER_THREAD_KEEPALIVES_ON))
return;
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES);
assert(fpt->running);
/* placeholder bucket data to use for fast key lookups */ /* placeholder bucket data to use for fast key lookups */
static struct pkat holder = {0}; static struct pkat holder = {0};
@ -283,10 +290,13 @@ void bgp_keepalives_wake()
pthread_mutex_unlock(peerhash_mtx); pthread_mutex_unlock(peerhash_mtx);
} }
int bgp_keepalives_stop(void **result, struct frr_pthread *fpt) int bgp_keepalives_stop(struct frr_pthread *fpt, void **result)
{ {
bgp_keepalives_thread_run = false; assert(fpt->running);
atomic_store_explicit(&fpt->running, false, memory_order_relaxed);
bgp_keepalives_wake(); bgp_keepalives_wake();
pthread_join(fpt->thread, result); pthread_join(fpt->thread, result);
return 0; return 0;
} }

View File

@ -88,6 +88,6 @@ extern void bgp_keepalives_wake(void);
/** /**
* Stops the thread and blocks until it terminates. * Stops the thread and blocks until it terminates.
*/ */
int bgp_keepalives_stop(void **result, struct frr_pthread *fpt); int bgp_keepalives_stop(struct frr_pthread *fpt, void **result);
#endif /* _FRR_BGP_KEEPALIVES_H */ #endif /* _FRR_BGP_KEEPALIVES_H */

View File

@ -7470,30 +7470,33 @@ static void bgp_pthreads_init()
{ {
frr_pthread_init(); frr_pthread_init();
frr_pthread_new("BGP i/o thread", PTHREAD_IO, bgp_io_start, struct frr_pthread_attr io = {
bgp_io_stop); .id = PTHREAD_IO,
frr_pthread_new("BGP keepalives thread", PTHREAD_KEEPALIVES, .start = frr_pthread_attr_default.start,
bgp_keepalives_start, bgp_keepalives_stop); .stop = frr_pthread_attr_default.stop,
.name = "BGP I/O thread",
/* pre-run initialization */ };
bgp_keepalives_init(); struct frr_pthread_attr ka = {
bgp_io_init(); .id = PTHREAD_KEEPALIVES,
.start = bgp_keepalives_start,
.stop = bgp_keepalives_stop,
.name = "BGP Keepalives thread",
};
frr_pthread_new(&io);
frr_pthread_new(&ka);
} }
void bgp_pthreads_run() void bgp_pthreads_run()
{ {
pthread_attr_t attr; struct frr_pthread *io = frr_pthread_get(PTHREAD_IO);
pthread_attr_init(&attr); struct frr_pthread *ka = frr_pthread_get(PTHREAD_KEEPALIVES);
pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
/* frr_pthread_run(io, NULL);
* I/O related code assumes the thread is ready for work at all times, frr_pthread_run(ka, NULL);
* so we wait until it is.
*/
frr_pthread_run(PTHREAD_IO, &attr, NULL);
bgp_io_wait_running();
frr_pthread_run(PTHREAD_KEEPALIVES, &attr, NULL); /* Wait until threads are ready. */
frr_pthread_wait_running(io);
frr_pthread_wait_running(ka);
} }
void bgp_pthreads_finish() void bgp_pthreads_finish()

View File

@ -1,5 +1,5 @@
/* /*
* Utilities and interfaces for managing POSIX threads * Utilities and interfaces for managing POSIX threads within FRR.
* Copyright (C) 2017 Cumulus Networks * Copyright (C) 2017 Cumulus Networks
* *
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify
@ -25,79 +25,99 @@
#include "memory.h" #include "memory.h"
#include "hash.h" #include "hash.h"
DEFINE_MTYPE_STATIC(LIB, FRR_PTHREAD, "FRR POSIX Thread"); DEFINE_MTYPE(LIB, FRR_PTHREAD, "FRR POSIX Thread");
DEFINE_MTYPE(LIB, PTHREAD_PRIM, "POSIX synchronization primitives"); DEFINE_MTYPE(LIB, PTHREAD_PRIM, "POSIX synchronization primitives");
/* id for next created pthread */
static unsigned int next_id = 0; static unsigned int next_id = 0;
/* Hash table of all frr_pthreads along with synchronization primitive(s) and /* default frr_pthread start/stop routine prototypes */
* hash table callbacks. static void *fpt_run(void *arg);
* ------------------------------------------------------------------------ */ static int fpt_halt(struct frr_pthread *fpt, void **res);
static struct hash *pthread_table;
static pthread_mutex_t pthread_table_mtx = PTHREAD_MUTEX_INITIALIZER;
/* pthread_table->hash_cmp */ /* default frr_pthread attributes */
static int pthread_table_hash_cmp(const void *value1, const void *value2) struct frr_pthread_attr frr_pthread_attr_default = {
.id = 0,
.start = fpt_run,
.stop = fpt_halt,
.name = "Anonymous",
};
/* hash table to keep track of all frr_pthreads */
static struct hash *frr_pthread_hash;
static pthread_mutex_t frr_pthread_hash_mtx = PTHREAD_MUTEX_INITIALIZER;
/* frr_pthread_hash->hash_cmp */
static int frr_pthread_hash_cmp(const void *value1, const void *value2)
{ {
const struct frr_pthread *tq1 = value1; const struct frr_pthread *tq1 = value1;
const struct frr_pthread *tq2 = value2; const struct frr_pthread *tq2 = value2;
return (tq1->id == tq2->id); return (tq1->attr.id == tq2->attr.id);
} }
/* pthread_table->hash_key */ /* frr_pthread_hash->hash_key */
static unsigned int pthread_table_hash_key(void *value) static unsigned int frr_pthread_hash_key(void *value)
{ {
return ((struct frr_pthread *)value)->id; return ((struct frr_pthread *)value)->attr.id;
} }
/* ------------------------------------------------------------------------ */ /* ------------------------------------------------------------------------ */
void frr_pthread_init() void frr_pthread_init()
{ {
pthread_mutex_lock(&pthread_table_mtx); pthread_mutex_lock(&frr_pthread_hash_mtx);
{ {
pthread_table = hash_create(pthread_table_hash_key, frr_pthread_hash = hash_create(frr_pthread_hash_key,
pthread_table_hash_cmp, NULL); frr_pthread_hash_cmp, NULL);
} }
pthread_mutex_unlock(&pthread_table_mtx); pthread_mutex_unlock(&frr_pthread_hash_mtx);
} }
void frr_pthread_finish() void frr_pthread_finish()
{ {
pthread_mutex_lock(&pthread_table_mtx); pthread_mutex_lock(&frr_pthread_hash_mtx);
{ {
hash_clean(pthread_table, hash_clean(frr_pthread_hash,
(void (*)(void *))frr_pthread_destroy); (void (*)(void *))frr_pthread_destroy);
hash_free(pthread_table); hash_free(frr_pthread_hash);
} }
pthread_mutex_unlock(&pthread_table_mtx); pthread_mutex_unlock(&frr_pthread_hash_mtx);
} }
struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, struct frr_pthread *frr_pthread_new(struct frr_pthread_attr *attr)
void *(*start_routine)(void *),
int (*stop_routine)(void **,
struct frr_pthread *))
{ {
static struct frr_pthread holder = {0}; static struct frr_pthread holder = {0};
struct frr_pthread *fpt = NULL; struct frr_pthread *fpt = NULL;
pthread_mutex_lock(&pthread_table_mtx); attr = attr ? attr : &frr_pthread_attr_default;
pthread_mutex_lock(&frr_pthread_hash_mtx);
{ {
holder.id = id; holder.attr.id = attr->id;
if (!hash_lookup(pthread_table, &holder)) { if (!hash_lookup(frr_pthread_hash, &holder)) {
struct frr_pthread *fpt = XCALLOC( fpt = XCALLOC(MTYPE_FRR_PTHREAD,
MTYPE_FRR_PTHREAD, sizeof(struct frr_pthread)); sizeof(struct frr_pthread));
fpt->id = id; /* create new thread master */
fpt->master = thread_master_create(name); fpt->master = thread_master_create(attr->name);
fpt->start_routine = start_routine; /* set attributes */
fpt->stop_routine = stop_routine; fpt->attr = *attr;
fpt->name = XSTRDUP(MTYPE_FRR_PTHREAD, name); if (attr == &frr_pthread_attr_default)
fpt->attr.id = frr_pthread_get_id();
/* initialize startup synchronization primitives */
fpt->running_cond_mtx = XCALLOC(
MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t));
fpt->running_cond = XCALLOC(MTYPE_PTHREAD_PRIM,
sizeof(pthread_cond_t));
pthread_mutex_init(fpt->running_cond_mtx, NULL);
pthread_cond_init(fpt->running_cond, NULL);
hash_get(pthread_table, fpt, hash_alloc_intern); /* insert into global thread hash */
hash_get(frr_pthread_hash, fpt, hash_alloc_intern);
} }
} }
pthread_mutex_unlock(&pthread_table_mtx); pthread_mutex_unlock(&frr_pthread_hash_mtx);
return fpt; return fpt;
} }
@ -105,7 +125,11 @@ struct frr_pthread *frr_pthread_new(const char *name, unsigned int id,
void frr_pthread_destroy(struct frr_pthread *fpt) void frr_pthread_destroy(struct frr_pthread *fpt)
{ {
thread_master_free(fpt->master); thread_master_free(fpt->master);
XFREE(MTYPE_FRR_PTHREAD, fpt->name);
pthread_mutex_destroy(fpt->running_cond_mtx);
pthread_cond_destroy(fpt->running_cond);
XFREE(MTYPE_PTHREAD_PRIM, fpt->running_cond_mtx);
XFREE(MTYPE_PTHREAD_PRIM, fpt->running_cond);
XFREE(MTYPE_FRR_PTHREAD, fpt); XFREE(MTYPE_FRR_PTHREAD, fpt);
} }
@ -114,74 +138,82 @@ struct frr_pthread *frr_pthread_get(unsigned int id)
static struct frr_pthread holder = {0}; static struct frr_pthread holder = {0};
struct frr_pthread *fpt; struct frr_pthread *fpt;
pthread_mutex_lock(&pthread_table_mtx); pthread_mutex_lock(&frr_pthread_hash_mtx);
{ {
holder.id = id; holder.attr.id = id;
fpt = hash_lookup(pthread_table, &holder); fpt = hash_lookup(frr_pthread_hash, &holder);
} }
pthread_mutex_unlock(&pthread_table_mtx); pthread_mutex_unlock(&frr_pthread_hash_mtx);
return fpt; return fpt;
} }
int frr_pthread_run(unsigned int id, const pthread_attr_t *attr, void *arg) int frr_pthread_run(struct frr_pthread *fpt, const pthread_attr_t *attr)
{ {
struct frr_pthread *fpt = frr_pthread_get(id);
int ret; int ret;
if (!fpt) ret = pthread_create(&fpt->thread, attr, fpt->attr.start, fpt);
return -1;
ret = pthread_create(&fpt->thread, attr, fpt->start_routine, arg); /*
* Per pthread_create(3), the contents of fpt->thread are undefined if
/* Per pthread_create(3), the contents of fpt->thread are undefined if * pthread_create() did not succeed. Reset this value to zero.
* pthread_create() did not succeed. Reset this value to zero. */ */
if (ret < 0) if (ret < 0)
memset(&fpt->thread, 0x00, sizeof(fpt->thread)); memset(&fpt->thread, 0x00, sizeof(fpt->thread));
return ret; return ret;
} }
/** void frr_pthread_wait_running(struct frr_pthread *fpt)
* Calls the stop routine for the frr_pthread and resets any relevant fields.
*
* @param fpt - the frr_pthread to stop
* @param result - pointer to result pointer
* @return the return code from the stop routine
*/
static int frr_pthread_stop_actual(struct frr_pthread *fpt, void **result)
{ {
int ret = (*fpt->stop_routine)(result, fpt); pthread_mutex_lock(fpt->running_cond_mtx);
{
while (!fpt->running)
pthread_cond_wait(fpt->running_cond,
fpt->running_cond_mtx);
}
pthread_mutex_unlock(fpt->running_cond_mtx);
}
void frr_pthread_notify_running(struct frr_pthread *fpt)
{
pthread_mutex_lock(fpt->running_cond_mtx);
{
fpt->running = true;
pthread_cond_signal(fpt->running_cond);
}
pthread_mutex_unlock(fpt->running_cond_mtx);
}
int frr_pthread_stop(struct frr_pthread *fpt, void **result)
{
int ret = (*fpt->attr.stop)(fpt, result);
memset(&fpt->thread, 0x00, sizeof(fpt->thread)); memset(&fpt->thread, 0x00, sizeof(fpt->thread));
return ret; return ret;
} }
int frr_pthread_stop(unsigned int id, void **result) /*
{
struct frr_pthread *fpt = frr_pthread_get(id);
return frr_pthread_stop_actual(fpt, result);
}
/**
* Callback for hash_iterate to stop all frr_pthread's. * Callback for hash_iterate to stop all frr_pthread's.
*/ */
static void frr_pthread_stop_all_iter(struct hash_backet *hb, void *arg) static void frr_pthread_stop_all_iter(struct hash_backet *hb, void *arg)
{ {
struct frr_pthread *fpt = hb->data; struct frr_pthread *fpt = hb->data;
frr_pthread_stop_actual(fpt, NULL); frr_pthread_stop(fpt, NULL);
} }
void frr_pthread_stop_all() void frr_pthread_stop_all()
{ {
pthread_mutex_lock(&pthread_table_mtx); pthread_mutex_lock(&frr_pthread_hash_mtx);
{ {
hash_iterate(pthread_table, frr_pthread_stop_all_iter, NULL); hash_iterate(frr_pthread_hash, frr_pthread_stop_all_iter, NULL);
} }
pthread_mutex_unlock(&pthread_table_mtx); pthread_mutex_unlock(&frr_pthread_hash_mtx);
} }
unsigned int frr_pthread_get_id() unsigned int frr_pthread_get_id()
{ {
/* just a sanity check, this should never happen */
assert(next_id <= INT_MAX - 1);
return next_id++; return next_id++;
} }
@ -189,3 +221,60 @@ void frr_pthread_yield(void)
{ {
(void)sched_yield(); (void)sched_yield();
} }
/*
* ----------------------------------------------------------------------------
* Default Event Loop
* ----------------------------------------------------------------------------
*/
/* dummy task for sleeper pipe */
static int fpt_dummy(struct thread *thread)
{
return 0;
}
/* poison pill task to end event loop */
static int fpt_finish(struct thread *thread)
{
struct frr_pthread *fpt = THREAD_ARG(thread);
atomic_store_explicit(&fpt->running, false, memory_order_relaxed);
return 0;
}
/* stop function, called from other threads to halt this one */
static int fpt_halt(struct frr_pthread *fpt, void **res)
{
thread_add_event(fpt->master, &fpt_finish, fpt, 0, NULL);
pthread_join(fpt->thread, res);
fpt = NULL;
return 0;
}
/* entry pthread function & main event loop */
static void *fpt_run(void *arg)
{
struct frr_pthread *fpt = arg;
fpt->master->owner = pthread_self();
int sleeper[2];
pipe(sleeper);
thread_add_read(fpt->master, &fpt_dummy, NULL, sleeper[0], NULL);
fpt->master->handle_signals = false;
frr_pthread_notify_running(fpt);
struct thread task;
while (atomic_load_explicit(&fpt->running, memory_order_relaxed)) {
if (thread_fetch(fpt->master, &task)) {
thread_call(&task);
}
}
close(sleeper[1]);
close(sleeper[0]);
return NULL;
}

View File

@ -1,5 +1,5 @@
/* /*
* Utilities and interfaces for managing POSIX threads * Utilities and interfaces for managing POSIX threads within FRR.
* Copyright (C) 2017 Cumulus Networks * Copyright (C) 2017 Cumulus Networks
* *
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify
@ -21,39 +21,73 @@
#define _FRR_PTHREAD_H #define _FRR_PTHREAD_H
#include <pthread.h> #include <pthread.h>
#include "frratomic.h"
#include "memory.h" #include "memory.h"
#include "thread.h" #include "thread.h"
DECLARE_MTYPE(FRR_PTHREAD);
DECLARE_MTYPE(PTHREAD_PRIM); DECLARE_MTYPE(PTHREAD_PRIM);
struct frr_pthread;
struct frr_pthread_attr;
struct frr_pthread_attr {
int id;
void *(*start)(void *);
int (*stop)(struct frr_pthread *, void **);
const char *name;
};
struct frr_pthread { struct frr_pthread {
/* pthread id */ /* pthread id */
pthread_t thread; pthread_t thread;
/* frr thread identifier */
unsigned int id;
/* thread master for this pthread's thread.c event loop */ /* thread master for this pthread's thread.c event loop */
struct thread_master *master; struct thread_master *master;
/* start routine */ /* caller-specified data; start & stop funcs, name, id */
void *(*start_routine)(void *); struct frr_pthread_attr attr;
/* stop routine */ /*
int (*stop_routine)(void **, struct frr_pthread *); * Notification mechanism for allowing pthreads to notify their parents
* when they are ready to do work. This mechanism has two associated
* functions:
*
* - frr_pthread_wait_running()
* This function should be called by the spawning thread after
* frr_pthread_run(). It safely waits until the spawned thread
* indicates that is ready to do work by posting to the condition
* variable.
*
* - frr_pthread_notify_running()
* This function should be called by the spawned thread when it is
* ready to do work. It will wake up any threads waiting on the
* previously described condition.
*/
pthread_cond_t *running_cond;
pthread_mutex_t *running_cond_mtx;
_Atomic bool running;
/* the (hopefully descriptive) name of this thread */ /*
char *name; * Fake thread-specific storage. No constraints on usage. Helpful when
* creating reentrant pthread implementations. Can be used to pass
* argument to pthread entry function.
*/
void *data;
}; };
/* Initializes this module. extern struct frr_pthread_attr frr_pthread_attr_default;
/*
* Initializes this module.
* *
* Must be called before using any of the other functions. * Must be called before using any of the other functions.
*/ */
void frr_pthread_init(void); void frr_pthread_init(void);
/* Uninitializes this module. /*
* Uninitializes this module.
* *
* Destroys all registered frr_pthread's and internal data structures. * Destroys all registered frr_pthread's and internal data structures.
* *
@ -62,34 +96,23 @@ void frr_pthread_init(void);
*/ */
void frr_pthread_finish(void); void frr_pthread_finish(void);
/* Creates a new frr_pthread. /*
* Creates a new frr_pthread with the given attributes.
* *
* If the provided ID is already assigned to an existing frr_pthread, the * The 'attr' argument should be filled out with the desired attributes,
* return value will be NULL. * including ID, start and stop functions and the desired name. Alternatively,
* * if attr is NULL, the default attributes will be used. The pthread will be
* @param name - the name of the thread. Doesn't have to be unique, but it * set up to run a basic threadmaster loop and the name will be "Anonymous".
* probably should be. This value is copied and may be safely free'd upon * Scheduling tasks onto the threadmaster in the 'master' field of the returned
* return. * frr_pthread will cause them to run on that pthread.
*
* @param id - the integral ID of the thread. MUST be unique. The caller may
* use this id to retrieve the thread.
*
* @param start_routine - start routine for the pthread, will be passed to
* pthread_create (see those docs for details)
*
* @param stop_routine - stop routine for the pthread, called to terminate the
* thread. This function should gracefully stop the pthread and clean up any
* thread-specific resources. The passed pointer is used to return a data
* result.
* *
* @param attr - the thread attributes
* @return the created frr_pthread upon success, or NULL upon failure * @return the created frr_pthread upon success, or NULL upon failure
*/ */
struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, struct frr_pthread *frr_pthread_new(struct frr_pthread_attr *attr);
void *(*start_routine)(void *),
int (*stop_routine)(void **,
struct frr_pthread *));
/* Destroys an frr_pthread. /*
* Destroys an frr_pthread.
* *
* Assumes that the associated pthread, if any, has already terminated. * Assumes that the associated pthread, if any, has already terminated.
* *
@ -97,38 +120,66 @@ struct frr_pthread *frr_pthread_new(const char *name, unsigned int id,
*/ */
void frr_pthread_destroy(struct frr_pthread *fpt); void frr_pthread_destroy(struct frr_pthread *fpt);
/* Gets an existing frr_pthread by its id. /*
* Gets an existing frr_pthread by its id.
* *
* @return frr_thread associated with the provided id, or NULL on error * @return frr_thread associated with the provided id, or NULL on error
*/ */
struct frr_pthread *frr_pthread_get(unsigned int id); struct frr_pthread *frr_pthread_get(unsigned int id);
/* Creates a new pthread and binds it to a frr_pthread. /*
* Creates a new pthread and binds it to a frr_pthread.
* *
* This function is a wrapper for pthread_create. The first parameter is the * This function is a wrapper for pthread_create. The first parameter is the
* frr_pthread to bind the created pthread to. All subsequent arguments are * frr_pthread to bind the created pthread to. All subsequent arguments are
* passed unmodified to pthread_create(). * passed unmodified to pthread_create(). The frr_pthread * provided will be
* used as the argument to the pthread entry function. If it is necessary to
* pass additional data, the 'data' field in the frr_pthread may be used.
* *
* This function returns the same code as pthread_create(). If the value is * This function returns the same code as pthread_create(). If the value is
* zero, the provided frr_pthread is bound to a running POSIX thread. If the * zero, the provided frr_pthread is bound to a running POSIX thread. If the
* value is less than zero, the provided frr_pthread is guaranteed to be a * value is less than zero, the provided frr_pthread is guaranteed to be a
* clean instance that may be susbsequently passed to frr_pthread_run(). * clean instance that may be susbsequently passed to frr_pthread_run().
* *
* @param id - frr_pthread to bind the created pthread to * @param fpt - frr_pthread * to run
* @param attr - see pthread_create(3) * @param attr - see pthread_create(3)
* @param arg - see pthread_create(3)
* *
* @return see pthread_create(3) * @return see pthread_create(3)
*/ */
int frr_pthread_run(unsigned int id, const pthread_attr_t *attr, void *arg); int frr_pthread_run(struct frr_pthread *fpt, const pthread_attr_t *attr);
/* Stops an frr_pthread with a result. /*
* Waits until the specified pthread has finished setting up and is ready to
* begin work.
* *
* @param id - frr_pthread to stop * If the pthread's code makes use of the startup synchronization mechanism,
* this function should be called before attempting to use the functionality
* exposed by the pthread. It waits until the 'running' condition is satisfied
* (see struct definition of frr_pthread).
*
* @param fpt - the frr_pthread * to wait on
*/
void frr_pthread_wait_running(struct frr_pthread *fpt);
/*
* Notifies other pthreads that the calling thread has finished setting up and
* is ready to begin work.
*
* This will allow any other pthreads waiting in 'frr_pthread_wait_running' to
* proceed.
*
* @param fpt - the frr_pthread * that has finished setting up
*/
void frr_pthread_notify_running(struct frr_pthread *fpt);
/*
* Stops a frr_pthread with a result.
*
* @param fpt - frr_pthread * to stop
* @param result - where to store the thread's result, if any. May be NULL if a * @param result - where to store the thread's result, if any. May be NULL if a
* result is not needed. * result is not needed.
*/ */
int frr_pthread_stop(unsigned int id, void **result); int frr_pthread_stop(struct frr_pthread *fpt, void **result);
/* Stops all frr_pthread's. */ /* Stops all frr_pthread's. */
void frr_pthread_stop_all(void); void frr_pthread_stop_all(void);
@ -136,7 +187,8 @@ void frr_pthread_stop_all(void);
/* Yields the current thread of execution */ /* Yields the current thread of execution */
void frr_pthread_yield(void); void frr_pthread_yield(void);
/* Returns a unique identifier for use with frr_pthread_new(). /*
* Returns a unique identifier for use with frr_pthread_new().
* *
* Internally, this is an integer that increments after each call to this * Internally, this is an integer that increments after each call to this
* function. Because the number of pthreads created should never exceed INT_MAX * function. Because the number of pthreads created should never exceed INT_MAX

View File

@ -25,8 +25,9 @@
#include "privs.h" #include "privs.h"
#include "queue.h" #include "queue.h"
#include "filter.h" #include "filter.h"
#include "frr_pthread.h"
#include "bgpd/bgpd.h" #include "bgpd/bgpd.c"
#include "bgpd/bgp_aspath.h" #include "bgpd/bgp_aspath.h"
#include "bgpd/bgp_attr.h" #include "bgpd/bgp_attr.h"
#include "bgpd/bgp_packet.h" #include "bgpd/bgp_packet.h"
@ -1272,6 +1273,9 @@ static int handle_attr_test(struct aspath_tests *t)
struct aspath *asp; struct aspath *asp;
size_t datalen; size_t datalen;
bgp_pthreads_init();
frr_pthread_get(PTHREAD_KEEPALIVES)->running = true;
asp = make_aspath(t->segment->asdata, t->segment->len, 0); asp = make_aspath(t->segment->asdata, t->segment->len, 0);
peer.curr = stream_new(BGP_MAX_PACKET_SIZE); peer.curr = stream_new(BGP_MAX_PACKET_SIZE);

View File

@ -27,8 +27,9 @@
#include "memory.h" #include "memory.h"
#include "queue.h" #include "queue.h"
#include "filter.h" #include "filter.h"
#include "frr_pthread.h"
#include "bgpd/bgpd.h" #include "bgpd/bgpd.c"
#include "bgpd/bgp_open.h" #include "bgpd/bgp_open.h"
#include "bgpd/bgp_debug.h" #include "bgpd/bgp_debug.h"
#include "bgpd/bgp_packet.h" #include "bgpd/bgp_packet.h"
@ -915,6 +916,9 @@ int main(void)
vrf_init(NULL, NULL, NULL, NULL); vrf_init(NULL, NULL, NULL, NULL);
bgp_option_set(BGP_OPT_NO_LISTEN); bgp_option_set(BGP_OPT_NO_LISTEN);
bgp_pthreads_init();
frr_pthread_get(PTHREAD_KEEPALIVES)->running = true;
if (fileno(stdout) >= 0) if (fileno(stdout) >= 0)
tty = isatty(fileno(stdout)); tty = isatty(fileno(stdout));