lib: add mt-safe variants for stream_fifo ops

stream_fifo is used as our standard internal message queue. Message
queues are useful in multithreaded environments. Up until now I have
been doing my own synchronization when using stream_fifo in this way;
this patch gets rid of the need for that boilerplate and decreases the
risk of locking mistakes when working with this datastructure.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
This commit is contained in:
Quentin Young 2018-04-21 16:51:54 -04:00
parent e8f95403e4
commit 363e24c651
2 changed files with 147 additions and 6 deletions

View File

@ -21,6 +21,7 @@
#include <zebra.h>
#include <stddef.h>
#include <pthread.h>
#include "stream.h"
#include "memory.h"
@ -1101,6 +1102,7 @@ struct stream_fifo *stream_fifo_new(void)
struct stream_fifo *new;
new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
pthread_mutex_init(&new->mtx, NULL);
return new;
}
@ -1115,7 +1117,16 @@ void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
fifo->tail = s;
fifo->tail->next = NULL;
fifo->count++;
atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
}
void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
{
pthread_mutex_lock(&fifo->mtx);
{
stream_fifo_push(fifo, s);
}
pthread_mutex_unlock(&fifo->mtx);
}
/* Delete first stream from fifo. */
@ -1131,7 +1142,8 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo)
if (fifo->head == NULL)
fifo->tail = NULL;
fifo->count--;
atomic_fetch_sub_explicit(&fifo->count, 1,
memory_order_release);
/* ensure stream is scrubbed of references to this fifo */
s->next = NULL;
@ -1140,12 +1152,37 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo)
return s;
}
/* Return first fifo entry. */
struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
{
struct stream *ret;
pthread_mutex_lock(&fifo->mtx);
{
ret = stream_fifo_pop(fifo);
}
pthread_mutex_unlock(&fifo->mtx);
return ret;
}
struct stream *stream_fifo_head(struct stream_fifo *fifo)
{
return fifo->head;
}
struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
{
struct stream *ret;
pthread_mutex_lock(&fifo->mtx);
{
ret = stream_fifo_head(fifo);
}
pthread_mutex_unlock(&fifo->mtx);
return ret;
}
void stream_fifo_clean(struct stream_fifo *fifo)
{
struct stream *s;
@ -1156,11 +1193,26 @@ void stream_fifo_clean(struct stream_fifo *fifo)
stream_free(s);
}
fifo->head = fifo->tail = NULL;
fifo->count = 0;
atomic_store_explicit(&fifo->count, 0, memory_order_release);
}
void stream_fifo_clean_safe(struct stream_fifo *fifo)
{
pthread_mutex_lock(&fifo->mtx);
{
stream_fifo_clean(fifo);
}
pthread_mutex_unlock(&fifo->mtx);
}
size_t stream_fifo_count_safe(struct stream_fifo *fifo)
{
return atomic_load_explicit(&fifo->count, memory_order_acquire);
}
void stream_fifo_free(struct stream_fifo *fifo)
{
stream_fifo_clean(fifo);
pthread_mutex_destroy(&fifo->mtx);
XFREE(MTYPE_STREAM_FIFO, fifo);
}

View File

@ -22,6 +22,9 @@
#ifndef _ZEBRA_STREAM_H
#define _ZEBRA_STREAM_H
#include <pthread.h>
#include "frratomic.h"
#include "mpls.h"
#include "prefix.h"
@ -107,7 +110,11 @@ struct stream {
/* First in first out queue structure. */
struct stream_fifo {
size_t count;
/* lock for mt-safe operations */
pthread_mutex_t mtx;
/* number of streams in this fifo */
_Atomic size_t count;
struct stream *head;
struct stream *tail;
@ -240,12 +247,94 @@ extern int stream_empty(struct stream *); /* is the stream empty? */
/* deprecated */
extern uint8_t *stream_pnt(struct stream *);
/* Stream fifo. */
/*
* Operations on struct stream_fifo.
*
* Each function has a safe variant, which ensures that the operation performed
* is atomic with respect to the operations performed by all other safe
* variants. In other words, the safe variants lock the stream_fifo's mutex
* before performing their action. These are provided for convenience when
* using stream_fifo in a multithreaded context, to alleviate the need for the
* caller to implement their own synchronization around the stream_fifo.
*
* The following functions do not have safe variants. The caller must ensure
* that these operations are performed safely in a multithreaded context:
* - stream_fifo_new
* - stream_fifo_free
*/
/*
* Create a new stream_fifo.
*
* Returns:
* newly created stream_fifo
*/
extern struct stream_fifo *stream_fifo_new(void);
/*
* Push a stream onto a stream_fifo.
*
* fifo
* the stream_fifo to push onto
*
* s
* the stream to push onto the stream_fifo
*/
extern void stream_fifo_push(struct stream_fifo *fifo, struct stream *s);
extern void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s);
/*
* Pop a stream off a stream_fifo.
*
* fifo
* the stream_fifo to pop from
*
* Returns:
* the next stream in the stream_fifo
*/
extern struct stream *stream_fifo_pop(struct stream_fifo *fifo);
extern struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo);
/*
* Retrieve the next stream from a stream_fifo without popping it.
*
* fifo
* the stream_fifo to operate on
*
* Returns:
* the next stream that would be returned from stream_fifo_pop
*/
extern struct stream *stream_fifo_head(struct stream_fifo *fifo);
extern struct stream *stream_fifo_head_safe(struct stream_fifo *fifo);
/*
* Remove all streams from a stream_fifo.
*
* fifo
* the stream_fifo to clean
*/
extern void stream_fifo_clean(struct stream_fifo *fifo);
extern void stream_fifo_clean_safe(struct stream_fifo *fifo);
/*
* Retrieve number of streams on a stream_fifo.
*
* fifo
* the stream_fifo to retrieve the count for
*
* Returns:
* the number of streams on the stream_fifo
*/
extern size_t stream_fifo_count_safe(struct stream_fifo *fifo);
/*
* Free a stream_fifo.
*
* Calls stream_fifo_clean, then deinitializes the stream_fifo and frees it.
*
* fifo
* the stream_fifo to free
*/
extern void stream_fifo_free(struct stream_fifo *fifo);
/* This is here because "<< 24" is particularly problematic in C.