From 6b1164effce50d92e57bdeb2fa9e2f6b2ac63bcf Mon Sep 17 00:00:00 2001 From: Angus Salkeld Date: Mon, 18 Oct 2010 21:35:27 +1100 Subject: [PATCH] RB: remove locking from ringbuffer. make ref_count atomic Signed-off-by: Angus Salkeld --- lib/ringbuffer.c | 147 ++++++-------------------------------- lib/ringbuffer_helper.c | 151 ---------------------------------------- lib/ringbuffer_int.h | 11 --- 3 files changed, 21 insertions(+), 288 deletions(-) diff --git a/lib/ringbuffer.c b/lib/ringbuffer.c index 1a2abdf..6226472 100644 --- a/lib/ringbuffer.c +++ b/lib/ringbuffer.c @@ -20,6 +20,7 @@ */ #include "ringbuffer_int.h" #include +#include //#define CRAZY_DEBUG_PRINTFS 1 #ifdef CRAZY_DEBUG_PRINTFS @@ -77,10 +78,7 @@ do { \ } while (0) #endif -static size_t _qb_rb_space_free_locked_(qb_ringbuffer_t * rb); -static size_t _qb_rb_space_used_locked_(qb_ringbuffer_t * rb); -static void _qb_rb_chunk_check_locked_(qb_ringbuffer_t * rb, uint32_t pointer); -static void _qb_rb_chunk_reclaim_locked_(qb_ringbuffer_t * rb); +static void qb_rb_chunk_check(qb_ringbuffer_t * rb, uint32_t pointer); qb_ringbuffer_t *qb_rb_open(const char *name, size_t size, uint32_t flags, size_t shared_user_data_size) @@ -134,12 +132,6 @@ qb_ringbuffer_t *qb_rb_open(const char *name, size_t size, uint32_t flags, rb->shared_hdr->count = 0; strncpy(rb->shared_hdr->hdr_path, path, PATH_MAX); } - if (qb_rb_lock_create(rb, flags) < 0) { - qb_util_log(LOG_ERR, "couldn't get a shared lock %s", - strerror(errno)); - goto cleanup_hdr; - } - if (qb_rb_sem_create(rb, flags) < 0) { qb_util_log(LOG_ERR, "couldn't get a semaphore %s", strerror(errno)); @@ -179,9 +171,7 @@ qb_ringbuffer_t *qb_rb_open(const char *name, size_t size, uint32_t flags, rb->shared_data[rb->shared_hdr->size] = 5; rb->shared_hdr->ref_count = 1; } else { - rb->lock_fn(rb); - rb->shared_hdr->ref_count++; - rb->unlock_fn(rb); + qb_atomic_int_inc(&rb->shared_hdr->ref_count); } close(fd_hdr); @@ -198,7 +188,6 @@ cleanup_hdr: close(fd_hdr); if (flags & QB_RB_FLAG_CREATE) { unlink(rb->shared_hdr->hdr_path); - rb->lock_destroy_fn(rb); rb->sem_destroy_fn(rb); } if (rb && (rb->shared_hdr != MAP_FAILED && rb->shared_hdr != NULL)) { @@ -210,19 +199,11 @@ cleanup_hdr: void qb_rb_close(qb_ringbuffer_t * rb, int32_t force_it) { - int32_t destroy_it = 0; - - rb->lock_fn(rb); - rb->shared_hdr->ref_count--; - qb_util_log(LOG_DEBUG, "ref_count:%d", rb->shared_hdr->ref_count); - if (rb->shared_hdr->ref_count == 0) { - destroy_it = 1; - } - rb->unlock_fn(rb); + int32_t destroy_it = QB_FALSE; + destroy_it = qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count); if (destroy_it || force_it) { qb_util_log(LOG_DEBUG, "Destroying ringbuffer"); - rb->lock_destroy_fn(rb); rb->sem_destroy_fn(rb); unlink(rb->shared_hdr->data_path); @@ -244,7 +225,7 @@ void *qb_rb_shared_user_data_get(qb_ringbuffer_t * rb) return rb->shared_hdr->user_data; } -static size_t _qb_rb_space_free_locked_(qb_ringbuffer_t * rb) +ssize_t qb_rb_space_free(qb_ringbuffer_t * rb) { uint32_t write_size; uint32_t read_size; @@ -267,27 +248,7 @@ static size_t _qb_rb_space_free_locked_(qb_ringbuffer_t * rb) return (space_free * sizeof(uint32_t)); } -ssize_t qb_rb_space_free(qb_ringbuffer_t * rb) -{ - size_t space_free; - int32_t res = 0; - - res = rb->lock_fn(rb); - if (res < 0) { - return res; - } - space_free = _qb_rb_space_free_locked_(rb); - res = rb->unlock_fn(rb); - if (res < 0) { - /* aarg stuck locked! */ - qb_util_log(LOG_ERR, "failed to unlock ringbuffer lock %s", - strerror(errno)); - return res; - } - return space_free; -} - -static size_t _qb_rb_space_used_locked_(qb_ringbuffer_t * rb) +ssize_t qb_rb_space_used(qb_ringbuffer_t * rb) { uint32_t write_size; uint32_t read_size; @@ -308,66 +269,26 @@ static size_t _qb_rb_space_used_locked_(qb_ringbuffer_t * rb) return (space_used * sizeof(uint32_t)); } -ssize_t qb_rb_space_used(qb_ringbuffer_t * rb) -{ - ssize_t used = 0; - int32_t res = 0; - - res = rb->lock_fn(rb); - if (res < 0) { - return res; - } - used = _qb_rb_space_used_locked_(rb); - res = rb->unlock_fn(rb); - if (res < 0) { - /* aarg stuck locked! */ - qb_util_log(LOG_ERR, "failed to unlock ringbuffer lock %s", - strerror(errno)); - return res; - } - return used; -} - ssize_t qb_rb_chunks_used(qb_ringbuffer_t * rb) { - ssize_t count = -1; - int32_t res = 0; - - res = rb->lock_fn(rb); - if (res < 0) { - return res; - } - count = rb->shared_hdr->count; - res = rb->unlock_fn(rb); - if (res < 0) { - /* aarg stuck locked! */ - qb_util_log(LOG_ERR, "failed to unlock ringbuffer lock %s", - strerror(errno)); - return res; - } - return count; + return rb->shared_hdr->count; } void *qb_rb_chunk_alloc(qb_ringbuffer_t * rb, size_t len) { uint32_t write_pt; - if (rb->lock_fn(rb) < 0) { - return NULL; - } - /* * Reclaim data if we are over writing and we need space */ if (rb->flags & QB_RB_FLAG_OVERWRITE) { - while (_qb_rb_space_free_locked_(rb) < + while (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_HEADER_SIZE + 4)) { - _qb_rb_chunk_reclaim_locked_(rb); + qb_rb_chunk_reclaim(rb); } } else { - if (_qb_rb_space_free_locked_(rb) < + if (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_HEADER_SIZE + 4)) { - rb->unlock_fn(rb); errno = EAGAIN; return NULL; } @@ -389,8 +310,7 @@ void *qb_rb_chunk_alloc(qb_ringbuffer_t * rb, size_t len) } -static uint32_t -_qb_rb_chunk_step_locked_(qb_ringbuffer_t * rb, uint32_t pointer) +static uint32_t qb_rb_chunk_step(qb_ringbuffer_t * rb, uint32_t pointer) { uint32_t chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer); /* @@ -415,7 +335,6 @@ _qb_rb_chunk_step_locked_(qb_ringbuffer_t * rb, uint32_t pointer) int32_t qb_rb_chunk_commit(qb_ringbuffer_t * rb, size_t len) { uint32_t old_write_pt = rb->shared_hdr->write_pt; - int32_t res = 0; /* * commit the magic & chunk_size @@ -427,19 +346,12 @@ int32_t qb_rb_chunk_commit(qb_ringbuffer_t * rb, size_t len) /* * commit the new write pointer */ - rb->shared_hdr->write_pt = _qb_rb_chunk_step_locked_(rb, old_write_pt); + rb->shared_hdr->write_pt = qb_rb_chunk_step(rb, old_write_pt); DEBUG_PRINTF("%s: read: %u, write: %u (was:%u)\n", __func__, rb->shared_hdr->read_pt, rb->shared_hdr->write_pt, old_write_pt); - res = rb->unlock_fn(rb); - if (res < 0) { - qb_util_log(LOG_ERR, "failed to unlock ringbuffer lock %s", - strerror(-res)); - return res; - } - /* * post the notification to the reader */ @@ -465,17 +377,17 @@ ssize_t qb_rb_chunk_write(qb_ringbuffer_t * rb, const void *data, size_t len) return len; } -static void _qb_rb_chunk_reclaim_locked_(qb_ringbuffer_t * rb) +void qb_rb_chunk_reclaim(qb_ringbuffer_t * rb) { uint32_t old_read_pt = rb->shared_hdr->read_pt; - if (_qb_rb_space_used_locked_(rb) == 0) { + if (qb_rb_space_used(rb) == 0) { return; } - _qb_rb_chunk_check_locked_(rb, old_read_pt); + qb_rb_chunk_check(rb, old_read_pt); - rb->shared_hdr->read_pt = _qb_rb_chunk_step_locked_(rb, old_read_pt); + rb->shared_hdr->read_pt = qb_rb_chunk_step(rb, old_read_pt); rb->shared_hdr->count--; /* @@ -489,13 +401,6 @@ static void _qb_rb_chunk_reclaim_locked_(qb_ringbuffer_t * rb) rb->shared_hdr->write_pt); } -void qb_rb_chunk_reclaim(qb_ringbuffer_t * rb) -{ - rb->lock_fn(rb); - _qb_rb_chunk_reclaim_locked_(rb); - rb->unlock_fn(rb); -} - ssize_t qb_rb_chunk_peek(qb_ringbuffer_t * rb, void **data_out, int32_t timeout) { uint32_t read_pt; @@ -540,23 +445,15 @@ qb_rb_chunk_read(qb_ringbuffer_t * rb, void *data_out, size_t len, return res; } - res = rb->lock_fn(rb); - if (res < 0) { - qb_util_log(LOG_ERR, "could not lock ringbuffer %s", - strerror(-res)); - return res; - } - if (_qb_rb_space_used_locked_(rb) == 0) { - rb->unlock_fn(rb); + if (qb_rb_space_used(rb) == 0) { return -ENOMSG; } read_pt = rb->shared_hdr->read_pt; - _qb_rb_chunk_check_locked_(rb, read_pt); + qb_rb_chunk_check(rb, read_pt); chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt); if (len < chunk_size) { - rb->unlock_fn(rb); return -ENOBUFS; } @@ -564,8 +461,7 @@ qb_rb_chunk_read(qb_ringbuffer_t * rb, void *data_out, size_t len, &rb->shared_data[read_pt + QB_RB_CHUNK_HEADER_WORDS], chunk_size); - _qb_rb_chunk_reclaim_locked_(rb); - rb->unlock_fn(rb); + qb_rb_chunk_reclaim(rb); return chunk_size; } @@ -586,7 +482,7 @@ static void print_header(qb_ringbuffer_t * rb) printf(" =>used [%zu bytes]\n", qb_rb_space_used(rb)); } -static void _qb_rb_chunk_check_locked_(qb_ringbuffer_t * rb, uint32_t pointer) +static void qb_rb_chunk_check(qb_ringbuffer_t * rb, uint32_t pointer) { uint32_t chunk_size; uint32_t chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, pointer); @@ -596,7 +492,6 @@ static void _qb_rb_chunk_check_locked_(qb_ringbuffer_t * rb, uint32_t pointer) printf("size: %x\n", chunk_size); printf("magic: %x\n", chunk_magic); print_header(rb); - rb->unlock_fn(rb); assert(0); } } diff --git a/lib/ringbuffer_helper.c b/lib/ringbuffer_helper.c index 407cadf..5780d6d 100644 --- a/lib/ringbuffer_helper.c +++ b/lib/ringbuffer_helper.c @@ -170,46 +170,6 @@ static int32_t my_sysv_sem_destroy(qb_ringbuffer_t * rb) } } -static int32_t my_sysv_lock_it_create(qb_ringbuffer_t * rb, uint32_t flags) -{ - union semun options; - int32_t res; - key_t sem_key; - - sem_key = ftok(rb->shared_hdr->hdr_path, rb->shared_hdr->size); - - if (sem_key == -1) { - res = -errno; - qb_util_log(LOG_ERR, "couldn't get a sem id %s", - strerror(errno)); - return res; - } - - if (flags & QB_RB_FLAG_CREATE) { - rb->lock_id = semget(sem_key, 1, IPC_CREAT | IPC_EXCL | 0600); - if (rb->lock_id == -1) { - res = -errno; - qb_util_log(LOG_ERR, "couldn't create a semaphore %s", - strerror(errno)); - return res; - } - options.val = 0; - res = semctl(rb->lock_id, 0, SETVAL, options); - } else { - rb->lock_id = semget(sem_key, 0, 0600); - if (rb->lock_id == -1) { - res = -errno; - qb_util_log(LOG_ERR, "couldn't get a sem id %s", - strerror(errno)); - return res; - } - res = 0; - } - qb_util_log(LOG_INFO, "sem key:%d, id:%d, value:%d", - sem_key, rb->lock_id, semctl(rb->lock_id, 0, GETVAL, 0)); - - return res; -} static int32_t my_posix_sem_create(struct qb_ringbuffer_s *rb, uint32_t flags) { @@ -296,114 +256,3 @@ int32_t qb_rb_sem_create(struct qb_ringbuffer_s * rb, uint32_t flags) } } -static int32_t my_posix_lock_it(qb_ringbuffer_t * rb) -{ - return -pthread_spin_lock(&rb->shared_hdr->spinlock); -} - -static int32_t my_sysv_lock_it(qb_ringbuffer_t * rb) -{ - struct sembuf sops[2]; - int32_t res = 0; - - /* - * atomically wait for sem to get to 0 and then incr. - */ - sops[0].sem_num = 0; - sops[0].sem_op = 0; - sops[0].sem_flg = 0; - - sops[1].sem_num = 0; - sops[1].sem_op = 1; - sops[1].sem_flg = 0; - -semop_again: - if (semop(rb->lock_id, sops, 2) == -1) { - if (errno == EINTR) { - goto semop_again; - } - res = -errno; - qb_util_log(LOG_ERR, "could not lock it : %s", strerror(errno)); - return res; - } - return res; -} - -static int32_t my_posix_unlock_it(qb_ringbuffer_t * rb) -{ - return -pthread_spin_unlock(&rb->shared_hdr->spinlock); -} - -static int32_t my_sysv_unlock_it(qb_ringbuffer_t * rb) -{ - struct sembuf lock_it; - int32_t res = 0; - - lock_it.sem_num = 0; - lock_it.sem_op = -1; - lock_it.sem_flg = IPC_NOWAIT; - -semop_again: - if (semop(rb->lock_id, &lock_it, 1) == -1) { - if (errno == EINTR) { - goto semop_again; - } - res = -errno; - qb_util_log(LOG_ERR, "could not unlock it : %s", - strerror(errno)); - return res; - } - return res; -} - -static int32_t my_posix_lock_it_destroy(qb_ringbuffer_t * rb) -{ - return -pthread_spin_destroy(&rb->shared_hdr->spinlock); -} - -static int32_t my_sysv_lock_it_destroy(qb_ringbuffer_t * rb) -{ - if (semctl(rb->lock_id, 0, IPC_RMID, 0) == -1) { - return -errno; - } else { - return 0; - } -} - -int32_t qb_rb_lock_create(struct qb_ringbuffer_s * rb, uint32_t flags) -{ - int32_t pshared = 0; - int32_t can_use_shared_posix = 0; -#if _POSIX_THREAD_PROCESS_SHARED > 0 - can_use_shared_posix = 1; -#endif - if (((rb->flags & QB_RB_FLAG_SHARED_PROCESS) == 0) && - (rb->flags & QB_RB_FLAG_SHARED_THREAD) == 0) { - rb->lock_fn = my_null_fn; - rb->unlock_fn = my_null_fn; - rb->lock_destroy_fn = my_null_fn; - return 0; - } else if ((can_use_shared_posix && - (rb->flags & QB_RB_FLAG_SHARED_PROCESS)) || - rb->flags & QB_RB_FLAG_SHARED_THREAD) { - rb->lock_fn = my_posix_lock_it; - rb->unlock_fn = my_posix_unlock_it; - rb->lock_destroy_fn = my_posix_lock_it_destroy; - - if (rb->flags & QB_RB_FLAG_SHARED_PROCESS) { - pshared = PTHREAD_PROCESS_SHARED; - } - - if (flags & QB_RB_FLAG_CREATE) { - return -pthread_spin_init(&rb->shared_hdr->spinlock, - pshared); - } else { - return 0; - } - } else { - rb->lock_fn = my_sysv_lock_it; - rb->unlock_fn = my_sysv_unlock_it; - rb->lock_destroy_fn = my_sysv_lock_it_destroy; - return my_sysv_lock_it_create(rb, flags); - } -} diff --git a/lib/ringbuffer_int.h b/lib/ringbuffer_int.h index af94f8c..59d675b 100644 --- a/lib/ringbuffer_int.h +++ b/lib/ringbuffer_int.h @@ -35,11 +35,6 @@ struct qb_ringbuffer_s; -int32_t qb_rb_lock_create(struct qb_ringbuffer_s *rb, uint32_t flags); -typedef int32_t(*qb_rb_lock_fn_t) (struct qb_ringbuffer_s * rb); -typedef int32_t(*qb_rb_unlock_fn_t) (struct qb_ringbuffer_s * rb); -typedef int32_t(*qb_rb_lock_destroy_fn_t) (struct qb_ringbuffer_s * rb); - int32_t qb_rb_sem_create(struct qb_ringbuffer_s *rb, uint32_t flags); typedef int32_t(*qb_rb_sem_post_fn_t) (struct qb_ringbuffer_s * rb); typedef int32_t(*qb_rb_sem_timedwait_fn_t) (struct qb_ringbuffer_s * rb, @@ -55,21 +50,15 @@ struct qb_ringbuffer_shared_s { char data_path[PATH_MAX]; int32_t ref_count; sem_t posix_sem; - pthread_spinlock_t spinlock; char user_data[1]; }; struct qb_ringbuffer_s { uint32_t flags; - int32_t lock_id; int32_t sem_id; struct qb_ringbuffer_shared_s *shared_hdr; uint32_t *shared_data; - qb_rb_lock_fn_t lock_fn; - qb_rb_unlock_fn_t unlock_fn; - qb_rb_lock_destroy_fn_t lock_destroy_fn; - qb_rb_sem_post_fn_t sem_post_fn; qb_rb_sem_timedwait_fn_t sem_timedwait_fn; qb_rb_sem_destroy_fn_t sem_destroy_fn;