lib: use heap to manage timers

Signed-off-by: Christian Franke <chris@opensourcerouting.org>
Signed-off-by: David Lamparter <equinox@opensourcerouting.org>
This commit is contained in:
Christian Franke 2013-11-19 14:11:42 +00:00 committed by David Lamparter
parent 514991c763
commit 4becea724c
4 changed files with 110 additions and 53 deletions

View File

@ -168,3 +168,20 @@ pqueue_dequeue (struct pqueue *queue)
trickle_down (0, queue); trickle_down (0, queue);
return data; return data;
} }
void
pqueue_remove_at (int index, struct pqueue *queue)
{
queue->array[index] = queue->array[--queue->size];
if (index > 0
&& (*queue->cmp) (queue->array[index],
queue->array[PARENT_OF(index)]) < 0)
{
trickle_up (index, queue);
}
else
{
trickle_down (index, queue);
}
}

View File

@ -38,6 +38,7 @@ extern void pqueue_delete (struct pqueue *queue);
extern void pqueue_enqueue (void *data, struct pqueue *queue); extern void pqueue_enqueue (void *data, struct pqueue *queue);
extern void *pqueue_dequeue (struct pqueue *queue); extern void *pqueue_dequeue (struct pqueue *queue);
extern void pqueue_remove_at (int index, struct pqueue *queue);
extern void trickle_down (int index, struct pqueue *queue); extern void trickle_down (int index, struct pqueue *queue);
extern void trickle_up (int index, struct pqueue *queue); extern void trickle_up (int index, struct pqueue *queue);

View File

@ -27,6 +27,7 @@
#include "memory.h" #include "memory.h"
#include "log.h" #include "log.h"
#include "hash.h" #include "hash.h"
#include "pqueue.h"
#include "command.h" #include "command.h"
#include "sigevent.h" #include "sigevent.h"
@ -496,17 +497,49 @@ DEFUN(clear_thread_cpu,
return CMD_SUCCESS; return CMD_SUCCESS;
} }
static int
thread_timer_cmp(void *a, void *b)
{
struct thread *thread_a = a;
struct thread *thread_b = b;
long cmp = timeval_cmp(thread_a->u.sands, thread_b->u.sands);
if (cmp < 0)
return -1;
if (cmp > 0)
return 1;
return 0;
}
static void
thread_timer_update(void *node, int actual_position)
{
struct thread *thread = node;
thread->index = actual_position;
}
/* Allocate new thread master. */ /* Allocate new thread master. */
struct thread_master * struct thread_master *
thread_master_create () thread_master_create ()
{ {
struct thread_master *rv;
if (cpu_record == NULL) if (cpu_record == NULL)
cpu_record cpu_record
= hash_create ((unsigned int (*) (void *))cpu_record_hash_key, = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
(int (*) (const void *, const void *))cpu_record_hash_cmp); (int (*) (const void *, const void *))cpu_record_hash_cmp);
return (struct thread_master *) XCALLOC (MTYPE_THREAD_MASTER, rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
sizeof (struct thread_master));
/* Initialize the timer queues */
rv->timer = pqueue_create();
rv->background = pqueue_create();
rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
rv->timer->update = rv->background->update = thread_timer_update;
return rv;
} }
/* Add a new thread to the list. */ /* Add a new thread to the list. */
@ -523,22 +556,6 @@ thread_list_add (struct thread_list *list, struct thread *thread)
list->count++; list->count++;
} }
/* Add a new thread just before the point. */
static void
thread_list_add_before (struct thread_list *list,
struct thread *point,
struct thread *thread)
{
thread->next = point;
thread->prev = point->prev;
if (point->prev)
point->prev->next = thread;
else
list->head = thread;
point->prev = thread;
list->count++;
}
/* Delete a thread from the list. */ /* Delete a thread from the list. */
static struct thread * static struct thread *
thread_list_delete (struct thread_list *list, struct thread *thread) thread_list_delete (struct thread_list *list, struct thread *thread)
@ -584,17 +601,29 @@ thread_list_free (struct thread_master *m, struct thread_list *list)
} }
} }
static void
thread_queue_free (struct thread_master *m, struct pqueue *queue)
{
int i;
for (i = 0; i < queue->size; i++)
XFREE(MTYPE_THREAD, queue->array[i]);
m->alloc -= queue->size;
pqueue_delete(queue);
}
/* Stop thread scheduler. */ /* Stop thread scheduler. */
void void
thread_master_free (struct thread_master *m) thread_master_free (struct thread_master *m)
{ {
thread_list_free (m, &m->read); thread_list_free (m, &m->read);
thread_list_free (m, &m->write); thread_list_free (m, &m->write);
thread_list_free (m, &m->timer); thread_queue_free (m, m->timer);
thread_list_free (m, &m->event); thread_list_free (m, &m->event);
thread_list_free (m, &m->ready); thread_list_free (m, &m->ready);
thread_list_free (m, &m->unuse); thread_list_free (m, &m->unuse);
thread_list_free (m, &m->background); thread_queue_free (m, m->background);
XFREE (MTYPE_THREAD_MASTER, m); XFREE (MTYPE_THREAD_MASTER, m);
@ -676,7 +705,8 @@ thread_get (struct thread_master *m, u_char type,
thread->master = m; thread->master = m;
thread->func = func; thread->func = func;
thread->arg = arg; thread->arg = arg;
thread->index = -1;
strip_funcname (thread->funcname, funcname); strip_funcname (thread->funcname, funcname);
return thread; return thread;
@ -737,16 +767,15 @@ funcname_thread_add_timer_timeval (struct thread_master *m,
const char* funcname) const char* funcname)
{ {
struct thread *thread; struct thread *thread;
struct thread_list *list; struct pqueue *queue;
struct timeval alarm_time; struct timeval alarm_time;
struct thread *tt;
assert (m != NULL); assert (m != NULL);
assert (type == THREAD_TIMER || type == THREAD_BACKGROUND); assert (type == THREAD_TIMER || type == THREAD_BACKGROUND);
assert (time_relative); assert (time_relative);
list = ((type == THREAD_TIMER) ? &m->timer : &m->background); queue = ((type == THREAD_TIMER) ? m->timer : m->background);
thread = thread_get (m, type, func, arg, funcname); thread = thread_get (m, type, func, arg, funcname);
/* Do we need jitter here? */ /* Do we need jitter here? */
@ -755,16 +784,7 @@ funcname_thread_add_timer_timeval (struct thread_master *m,
alarm_time.tv_usec = relative_time.tv_usec + time_relative->tv_usec; alarm_time.tv_usec = relative_time.tv_usec + time_relative->tv_usec;
thread->u.sands = timeval_adjust(alarm_time); thread->u.sands = timeval_adjust(alarm_time);
/* Sort by timeval. */ pqueue_enqueue(thread, queue);
for (tt = list->head; tt; tt = tt->next)
if (timeval_cmp (thread->u.sands, tt->u.sands) <= 0)
break;
if (tt)
thread_list_add_before (list, tt, thread);
else
thread_list_add (list, thread);
return thread; return thread;
} }
@ -849,7 +869,8 @@ funcname_thread_add_event (struct thread_master *m,
void void
thread_cancel (struct thread *thread) thread_cancel (struct thread *thread)
{ {
struct thread_list *list; struct thread_list *list = NULL;
struct pqueue *queue = NULL;
switch (thread->type) switch (thread->type)
{ {
@ -864,7 +885,7 @@ thread_cancel (struct thread *thread)
list = &thread->master->write; list = &thread->master->write;
break; break;
case THREAD_TIMER: case THREAD_TIMER:
list = &thread->master->timer; queue = thread->master->timer;
break; break;
case THREAD_EVENT: case THREAD_EVENT:
list = &thread->master->event; list = &thread->master->event;
@ -873,13 +894,28 @@ thread_cancel (struct thread *thread)
list = &thread->master->ready; list = &thread->master->ready;
break; break;
case THREAD_BACKGROUND: case THREAD_BACKGROUND:
list = &thread->master->background; queue = thread->master->background;
break; break;
default: default:
return; return;
break; break;
} }
thread_list_delete (list, thread);
if (queue)
{
assert(thread->index >= 0);
assert(thread == queue->array[thread->index]);
pqueue_remove_at(thread->index, queue);
}
else if (list)
{
thread_list_delete (list, thread);
}
else
{
assert(!"Thread should be either in queue or list!");
}
thread->type = THREAD_UNUSED; thread->type = THREAD_UNUSED;
thread_add_unuse (thread->master, thread); thread_add_unuse (thread->master, thread);
} }
@ -929,11 +965,12 @@ thread_cancel_event (struct thread_master *m, void *arg)
} }
static struct timeval * static struct timeval *
thread_timer_wait (struct thread_list *tlist, struct timeval *timer_val) thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
{ {
if (!thread_empty (tlist)) if (queue->size)
{ {
*timer_val = timeval_subtract (tlist->head->u.sands, relative_time); struct thread *next_timer = queue->array[0];
*timer_val = timeval_subtract (next_timer->u.sands, relative_time);
return timer_val; return timer_val;
} }
return NULL; return NULL;
@ -977,18 +1014,17 @@ thread_process_fd (struct thread_list *list, fd_set *fdset, fd_set *mfdset)
/* Add all timers that have popped to the ready list. */ /* Add all timers that have popped to the ready list. */
static unsigned int static unsigned int
thread_timer_process (struct thread_list *list, struct timeval *timenow) thread_timer_process (struct pqueue *queue, struct timeval *timenow)
{ {
struct thread *thread; struct thread *thread;
struct thread *next;
unsigned int ready = 0; unsigned int ready = 0;
for (thread = list->head; thread; thread = next) while (queue->size)
{ {
next = thread->next; thread = queue->array[0];
if (timeval_cmp (*timenow, thread->u.sands) < 0) if (timeval_cmp (*timenow, thread->u.sands) < 0)
return ready; return ready;
thread_list_delete (list, thread); pqueue_dequeue(queue);
thread->type = THREAD_READY; thread->type = THREAD_READY;
thread_list_add (&thread->master->ready, thread); thread_list_add (&thread->master->ready, thread);
ready++; ready++;
@ -1064,8 +1100,8 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
if (m->ready.count == 0) if (m->ready.count == 0)
{ {
quagga_get_relative (NULL); quagga_get_relative (NULL);
timer_wait = thread_timer_wait (&m->timer, &timer_val); timer_wait = thread_timer_wait (m->timer, &timer_val);
timer_wait_bg = thread_timer_wait (&m->background, &timer_val_bg); timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg);
if (timer_wait_bg && if (timer_wait_bg &&
(!timer_wait || (timeval_cmp (*timer_wait, *timer_wait_bg) > 0))) (!timer_wait || (timeval_cmp (*timer_wait, *timer_wait_bg) > 0)))
@ -1121,7 +1157,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
priority than I/O threads, so let's push them onto the ready priority than I/O threads, so let's push them onto the ready
list in front of the I/O threads. */ list in front of the I/O threads. */
quagga_get_relative (NULL); quagga_get_relative (NULL);
thread_timer_process (&m->timer, &relative_time); thread_timer_process (m->timer, &relative_time);
/* Got IO, process it */ /* Got IO, process it */
if (num > 0) if (num > 0)
@ -1142,7 +1178,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
#endif #endif
/* Background timer/events, lowest priority */ /* Background timer/events, lowest priority */
thread_timer_process (&m->background, &relative_time); thread_timer_process (m->background, &relative_time);
if ((thread = thread_trim_head (&m->ready)) != NULL) if ((thread = thread_trim_head (&m->ready)) != NULL)
return thread_run (m, thread, fetch); return thread_run (m, thread, fetch);

View File

@ -44,16 +44,18 @@ struct thread_list
int count; int count;
}; };
struct pqueue;
/* Master of the theads. */ /* Master of the theads. */
struct thread_master struct thread_master
{ {
struct thread_list read; struct thread_list read;
struct thread_list write; struct thread_list write;
struct thread_list timer; struct pqueue *timer;
struct thread_list event; struct thread_list event;
struct thread_list ready; struct thread_list ready;
struct thread_list unuse; struct thread_list unuse;
struct thread_list background; struct pqueue *background;
fd_set readfd; fd_set readfd;
fd_set writefd; fd_set writefd;
fd_set exceptfd; fd_set exceptfd;
@ -80,6 +82,7 @@ struct thread
int fd; /* file descriptor in case of read/write. */ int fd; /* file descriptor in case of read/write. */
struct timeval sands; /* rest of time sands value. */ struct timeval sands; /* rest of time sands value. */
} u; } u;
int index; /* used for timers to store position in queue */
struct timeval real; struct timeval real;
struct cpu_thread_history *hist; /* cache pointer to cpu_history */ struct cpu_thread_history *hist; /* cache pointer to cpu_history */
char funcname[FUNCNAME_LEN]; char funcname[FUNCNAME_LEN];