RB: remove locking from ringbuffer.

make ref_count atomic

Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
This commit is contained in:
Angus Salkeld 2010-10-18 21:35:27 +11:00
parent c4614afa79
commit 6b1164effc
3 changed files with 21 additions and 288 deletions

View File

@ -20,6 +20,7 @@
*/
#include "ringbuffer_int.h"
#include <qb/qbdefs.h>
#include <qb/qbatomic.h>
//#define CRAZY_DEBUG_PRINTFS 1
#ifdef CRAZY_DEBUG_PRINTFS
@ -77,10 +78,7 @@ do { \
} while (0)
#endif
static size_t _qb_rb_space_free_locked_(qb_ringbuffer_t * rb);
static size_t _qb_rb_space_used_locked_(qb_ringbuffer_t * rb);
static void _qb_rb_chunk_check_locked_(qb_ringbuffer_t * rb, uint32_t pointer);
static void _qb_rb_chunk_reclaim_locked_(qb_ringbuffer_t * rb);
static void qb_rb_chunk_check(qb_ringbuffer_t * rb, uint32_t pointer);
qb_ringbuffer_t *qb_rb_open(const char *name, size_t size, uint32_t flags,
size_t shared_user_data_size)
@ -134,12 +132,6 @@ qb_ringbuffer_t *qb_rb_open(const char *name, size_t size, uint32_t flags,
rb->shared_hdr->count = 0;
strncpy(rb->shared_hdr->hdr_path, path, PATH_MAX);
}
if (qb_rb_lock_create(rb, flags) < 0) {
qb_util_log(LOG_ERR, "couldn't get a shared lock %s",
strerror(errno));
goto cleanup_hdr;
}
if (qb_rb_sem_create(rb, flags) < 0) {
qb_util_log(LOG_ERR, "couldn't get a semaphore %s",
strerror(errno));
@ -179,9 +171,7 @@ qb_ringbuffer_t *qb_rb_open(const char *name, size_t size, uint32_t flags,
rb->shared_data[rb->shared_hdr->size] = 5;
rb->shared_hdr->ref_count = 1;
} else {
rb->lock_fn(rb);
rb->shared_hdr->ref_count++;
rb->unlock_fn(rb);
qb_atomic_int_inc(&rb->shared_hdr->ref_count);
}
close(fd_hdr);
@ -198,7 +188,6 @@ cleanup_hdr:
close(fd_hdr);
if (flags & QB_RB_FLAG_CREATE) {
unlink(rb->shared_hdr->hdr_path);
rb->lock_destroy_fn(rb);
rb->sem_destroy_fn(rb);
}
if (rb && (rb->shared_hdr != MAP_FAILED && rb->shared_hdr != NULL)) {
@ -210,19 +199,11 @@ cleanup_hdr:
void qb_rb_close(qb_ringbuffer_t * rb, int32_t force_it)
{
int32_t destroy_it = 0;
rb->lock_fn(rb);
rb->shared_hdr->ref_count--;
qb_util_log(LOG_DEBUG, "ref_count:%d", rb->shared_hdr->ref_count);
if (rb->shared_hdr->ref_count == 0) {
destroy_it = 1;
}
rb->unlock_fn(rb);
int32_t destroy_it = QB_FALSE;
destroy_it = qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count);
if (destroy_it || force_it) {
qb_util_log(LOG_DEBUG, "Destroying ringbuffer");
rb->lock_destroy_fn(rb);
rb->sem_destroy_fn(rb);
unlink(rb->shared_hdr->data_path);
@ -244,7 +225,7 @@ void *qb_rb_shared_user_data_get(qb_ringbuffer_t * rb)
return rb->shared_hdr->user_data;
}
static size_t _qb_rb_space_free_locked_(qb_ringbuffer_t * rb)
ssize_t qb_rb_space_free(qb_ringbuffer_t * rb)
{
uint32_t write_size;
uint32_t read_size;
@ -267,27 +248,7 @@ static size_t _qb_rb_space_free_locked_(qb_ringbuffer_t * rb)
return (space_free * sizeof(uint32_t));
}
ssize_t qb_rb_space_free(qb_ringbuffer_t * rb)
{
size_t space_free;
int32_t res = 0;
res = rb->lock_fn(rb);
if (res < 0) {
return res;
}
space_free = _qb_rb_space_free_locked_(rb);
res = rb->unlock_fn(rb);
if (res < 0) {
/* aarg stuck locked! */
qb_util_log(LOG_ERR, "failed to unlock ringbuffer lock %s",
strerror(errno));
return res;
}
return space_free;
}
static size_t _qb_rb_space_used_locked_(qb_ringbuffer_t * rb)
ssize_t qb_rb_space_used(qb_ringbuffer_t * rb)
{
uint32_t write_size;
uint32_t read_size;
@ -308,66 +269,26 @@ static size_t _qb_rb_space_used_locked_(qb_ringbuffer_t * rb)
return (space_used * sizeof(uint32_t));
}
ssize_t qb_rb_space_used(qb_ringbuffer_t * rb)
{
ssize_t used = 0;
int32_t res = 0;
res = rb->lock_fn(rb);
if (res < 0) {
return res;
}
used = _qb_rb_space_used_locked_(rb);
res = rb->unlock_fn(rb);
if (res < 0) {
/* aarg stuck locked! */
qb_util_log(LOG_ERR, "failed to unlock ringbuffer lock %s",
strerror(errno));
return res;
}
return used;
}
ssize_t qb_rb_chunks_used(qb_ringbuffer_t * rb)
{
ssize_t count = -1;
int32_t res = 0;
res = rb->lock_fn(rb);
if (res < 0) {
return res;
}
count = rb->shared_hdr->count;
res = rb->unlock_fn(rb);
if (res < 0) {
/* aarg stuck locked! */
qb_util_log(LOG_ERR, "failed to unlock ringbuffer lock %s",
strerror(errno));
return res;
}
return count;
return rb->shared_hdr->count;
}
void *qb_rb_chunk_alloc(qb_ringbuffer_t * rb, size_t len)
{
uint32_t write_pt;
if (rb->lock_fn(rb) < 0) {
return NULL;
}
/*
* Reclaim data if we are over writing and we need space
*/
if (rb->flags & QB_RB_FLAG_OVERWRITE) {
while (_qb_rb_space_free_locked_(rb) <
while (qb_rb_space_free(rb) <
(len + QB_RB_CHUNK_HEADER_SIZE + 4)) {
_qb_rb_chunk_reclaim_locked_(rb);
qb_rb_chunk_reclaim(rb);
}
} else {
if (_qb_rb_space_free_locked_(rb) <
if (qb_rb_space_free(rb) <
(len + QB_RB_CHUNK_HEADER_SIZE + 4)) {
rb->unlock_fn(rb);
errno = EAGAIN;
return NULL;
}
@ -389,8 +310,7 @@ void *qb_rb_chunk_alloc(qb_ringbuffer_t * rb, size_t len)
}
static uint32_t
_qb_rb_chunk_step_locked_(qb_ringbuffer_t * rb, uint32_t pointer)
static uint32_t qb_rb_chunk_step(qb_ringbuffer_t * rb, uint32_t pointer)
{
uint32_t chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer);
/*
@ -415,7 +335,6 @@ _qb_rb_chunk_step_locked_(qb_ringbuffer_t * rb, uint32_t pointer)
int32_t qb_rb_chunk_commit(qb_ringbuffer_t * rb, size_t len)
{
uint32_t old_write_pt = rb->shared_hdr->write_pt;
int32_t res = 0;
/*
* commit the magic & chunk_size
@ -427,19 +346,12 @@ int32_t qb_rb_chunk_commit(qb_ringbuffer_t * rb, size_t len)
/*
* commit the new write pointer
*/
rb->shared_hdr->write_pt = _qb_rb_chunk_step_locked_(rb, old_write_pt);
rb->shared_hdr->write_pt = qb_rb_chunk_step(rb, old_write_pt);
DEBUG_PRINTF("%s: read: %u, write: %u (was:%u)\n", __func__,
rb->shared_hdr->read_pt, rb->shared_hdr->write_pt,
old_write_pt);
res = rb->unlock_fn(rb);
if (res < 0) {
qb_util_log(LOG_ERR, "failed to unlock ringbuffer lock %s",
strerror(-res));
return res;
}
/*
* post the notification to the reader
*/
@ -465,17 +377,17 @@ ssize_t qb_rb_chunk_write(qb_ringbuffer_t * rb, const void *data, size_t len)
return len;
}
static void _qb_rb_chunk_reclaim_locked_(qb_ringbuffer_t * rb)
void qb_rb_chunk_reclaim(qb_ringbuffer_t * rb)
{
uint32_t old_read_pt = rb->shared_hdr->read_pt;
if (_qb_rb_space_used_locked_(rb) == 0) {
if (qb_rb_space_used(rb) == 0) {
return;
}
_qb_rb_chunk_check_locked_(rb, old_read_pt);
qb_rb_chunk_check(rb, old_read_pt);
rb->shared_hdr->read_pt = _qb_rb_chunk_step_locked_(rb, old_read_pt);
rb->shared_hdr->read_pt = qb_rb_chunk_step(rb, old_read_pt);
rb->shared_hdr->count--;
/*
@ -489,13 +401,6 @@ static void _qb_rb_chunk_reclaim_locked_(qb_ringbuffer_t * rb)
rb->shared_hdr->write_pt);
}
void qb_rb_chunk_reclaim(qb_ringbuffer_t * rb)
{
rb->lock_fn(rb);
_qb_rb_chunk_reclaim_locked_(rb);
rb->unlock_fn(rb);
}
ssize_t qb_rb_chunk_peek(qb_ringbuffer_t * rb, void **data_out, int32_t timeout)
{
uint32_t read_pt;
@ -540,23 +445,15 @@ qb_rb_chunk_read(qb_ringbuffer_t * rb, void *data_out, size_t len,
return res;
}
res = rb->lock_fn(rb);
if (res < 0) {
qb_util_log(LOG_ERR, "could not lock ringbuffer %s",
strerror(-res));
return res;
}
if (_qb_rb_space_used_locked_(rb) == 0) {
rb->unlock_fn(rb);
if (qb_rb_space_used(rb) == 0) {
return -ENOMSG;
}
read_pt = rb->shared_hdr->read_pt;
_qb_rb_chunk_check_locked_(rb, read_pt);
qb_rb_chunk_check(rb, read_pt);
chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt);
if (len < chunk_size) {
rb->unlock_fn(rb);
return -ENOBUFS;
}
@ -564,8 +461,7 @@ qb_rb_chunk_read(qb_ringbuffer_t * rb, void *data_out, size_t len,
&rb->shared_data[read_pt + QB_RB_CHUNK_HEADER_WORDS],
chunk_size);
_qb_rb_chunk_reclaim_locked_(rb);
rb->unlock_fn(rb);
qb_rb_chunk_reclaim(rb);
return chunk_size;
}
@ -586,7 +482,7 @@ static void print_header(qb_ringbuffer_t * rb)
printf(" =>used [%zu bytes]\n", qb_rb_space_used(rb));
}
static void _qb_rb_chunk_check_locked_(qb_ringbuffer_t * rb, uint32_t pointer)
static void qb_rb_chunk_check(qb_ringbuffer_t * rb, uint32_t pointer)
{
uint32_t chunk_size;
uint32_t chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, pointer);
@ -596,7 +492,6 @@ static void _qb_rb_chunk_check_locked_(qb_ringbuffer_t * rb, uint32_t pointer)
printf("size: %x\n", chunk_size);
printf("magic: %x\n", chunk_magic);
print_header(rb);
rb->unlock_fn(rb);
assert(0);
}
}

View File

@ -170,46 +170,6 @@ static int32_t my_sysv_sem_destroy(qb_ringbuffer_t * rb)
}
}
static int32_t my_sysv_lock_it_create(qb_ringbuffer_t * rb, uint32_t flags)
{
union semun options;
int32_t res;
key_t sem_key;
sem_key = ftok(rb->shared_hdr->hdr_path, rb->shared_hdr->size);
if (sem_key == -1) {
res = -errno;
qb_util_log(LOG_ERR, "couldn't get a sem id %s",
strerror(errno));
return res;
}
if (flags & QB_RB_FLAG_CREATE) {
rb->lock_id = semget(sem_key, 1, IPC_CREAT | IPC_EXCL | 0600);
if (rb->lock_id == -1) {
res = -errno;
qb_util_log(LOG_ERR, "couldn't create a semaphore %s",
strerror(errno));
return res;
}
options.val = 0;
res = semctl(rb->lock_id, 0, SETVAL, options);
} else {
rb->lock_id = semget(sem_key, 0, 0600);
if (rb->lock_id == -1) {
res = -errno;
qb_util_log(LOG_ERR, "couldn't get a sem id %s",
strerror(errno));
return res;
}
res = 0;
}
qb_util_log(LOG_INFO, "sem key:%d, id:%d, value:%d",
sem_key, rb->lock_id, semctl(rb->lock_id, 0, GETVAL, 0));
return res;
}
static int32_t my_posix_sem_create(struct qb_ringbuffer_s *rb, uint32_t flags)
{
@ -296,114 +256,3 @@ int32_t qb_rb_sem_create(struct qb_ringbuffer_s * rb, uint32_t flags)
}
}
static int32_t my_posix_lock_it(qb_ringbuffer_t * rb)
{
return -pthread_spin_lock(&rb->shared_hdr->spinlock);
}
static int32_t my_sysv_lock_it(qb_ringbuffer_t * rb)
{
struct sembuf sops[2];
int32_t res = 0;
/*
* atomically wait for sem to get to 0 and then incr.
*/
sops[0].sem_num = 0;
sops[0].sem_op = 0;
sops[0].sem_flg = 0;
sops[1].sem_num = 0;
sops[1].sem_op = 1;
sops[1].sem_flg = 0;
semop_again:
if (semop(rb->lock_id, sops, 2) == -1) {
if (errno == EINTR) {
goto semop_again;
}
res = -errno;
qb_util_log(LOG_ERR, "could not lock it : %s", strerror(errno));
return res;
}
return res;
}
static int32_t my_posix_unlock_it(qb_ringbuffer_t * rb)
{
return -pthread_spin_unlock(&rb->shared_hdr->spinlock);
}
static int32_t my_sysv_unlock_it(qb_ringbuffer_t * rb)
{
struct sembuf lock_it;
int32_t res = 0;
lock_it.sem_num = 0;
lock_it.sem_op = -1;
lock_it.sem_flg = IPC_NOWAIT;
semop_again:
if (semop(rb->lock_id, &lock_it, 1) == -1) {
if (errno == EINTR) {
goto semop_again;
}
res = -errno;
qb_util_log(LOG_ERR, "could not unlock it : %s",
strerror(errno));
return res;
}
return res;
}
static int32_t my_posix_lock_it_destroy(qb_ringbuffer_t * rb)
{
return -pthread_spin_destroy(&rb->shared_hdr->spinlock);
}
static int32_t my_sysv_lock_it_destroy(qb_ringbuffer_t * rb)
{
if (semctl(rb->lock_id, 0, IPC_RMID, 0) == -1) {
return -errno;
} else {
return 0;
}
}
int32_t qb_rb_lock_create(struct qb_ringbuffer_s * rb, uint32_t flags)
{
int32_t pshared = 0;
int32_t can_use_shared_posix = 0;
#if _POSIX_THREAD_PROCESS_SHARED > 0
can_use_shared_posix = 1;
#endif
if (((rb->flags & QB_RB_FLAG_SHARED_PROCESS) == 0) &&
(rb->flags & QB_RB_FLAG_SHARED_THREAD) == 0) {
rb->lock_fn = my_null_fn;
rb->unlock_fn = my_null_fn;
rb->lock_destroy_fn = my_null_fn;
return 0;
} else if ((can_use_shared_posix &&
(rb->flags & QB_RB_FLAG_SHARED_PROCESS)) ||
rb->flags & QB_RB_FLAG_SHARED_THREAD) {
rb->lock_fn = my_posix_lock_it;
rb->unlock_fn = my_posix_unlock_it;
rb->lock_destroy_fn = my_posix_lock_it_destroy;
if (rb->flags & QB_RB_FLAG_SHARED_PROCESS) {
pshared = PTHREAD_PROCESS_SHARED;
}
if (flags & QB_RB_FLAG_CREATE) {
return -pthread_spin_init(&rb->shared_hdr->spinlock,
pshared);
} else {
return 0;
}
} else {
rb->lock_fn = my_sysv_lock_it;
rb->unlock_fn = my_sysv_unlock_it;
rb->lock_destroy_fn = my_sysv_lock_it_destroy;
return my_sysv_lock_it_create(rb, flags);
}
}

View File

@ -35,11 +35,6 @@
struct qb_ringbuffer_s;
int32_t qb_rb_lock_create(struct qb_ringbuffer_s *rb, uint32_t flags);
typedef int32_t(*qb_rb_lock_fn_t) (struct qb_ringbuffer_s * rb);
typedef int32_t(*qb_rb_unlock_fn_t) (struct qb_ringbuffer_s * rb);
typedef int32_t(*qb_rb_lock_destroy_fn_t) (struct qb_ringbuffer_s * rb);
int32_t qb_rb_sem_create(struct qb_ringbuffer_s *rb, uint32_t flags);
typedef int32_t(*qb_rb_sem_post_fn_t) (struct qb_ringbuffer_s * rb);
typedef int32_t(*qb_rb_sem_timedwait_fn_t) (struct qb_ringbuffer_s * rb,
@ -55,21 +50,15 @@ struct qb_ringbuffer_shared_s {
char data_path[PATH_MAX];
int32_t ref_count;
sem_t posix_sem;
pthread_spinlock_t spinlock;
char user_data[1];
};
struct qb_ringbuffer_s {
uint32_t flags;
int32_t lock_id;
int32_t sem_id;
struct qb_ringbuffer_shared_s *shared_hdr;
uint32_t *shared_data;
qb_rb_lock_fn_t lock_fn;
qb_rb_unlock_fn_t unlock_fn;
qb_rb_lock_destroy_fn_t lock_destroy_fn;
qb_rb_sem_post_fn_t sem_post_fn;
qb_rb_sem_timedwait_fn_t sem_timedwait_fn;
qb_rb_sem_destroy_fn_t sem_destroy_fn;