mirror of
https://git.proxmox.com/git/mirror_frr
synced 2025-07-27 10:04:18 +00:00
Merge pull request #701 from qlyoung/mt-safe-cancel
mt-safe thread_cancel()
This commit is contained in:
commit
9e3b206d7c
588
lib/thread.c
588
lib/thread.c
@ -328,6 +328,12 @@ thread_timer_update(void *node, int actual_position)
|
||||
thread->index = actual_position;
|
||||
}
|
||||
|
||||
static void
|
||||
cancelreq_del (void *cr)
|
||||
{
|
||||
XFREE (MTYPE_TMP, cr);
|
||||
}
|
||||
|
||||
/* Allocate new thread master. */
|
||||
struct thread_master *
|
||||
thread_master_create (void)
|
||||
@ -351,6 +357,7 @@ thread_master_create (void)
|
||||
return NULL;
|
||||
|
||||
pthread_mutex_init (&rv->mtx, NULL);
|
||||
pthread_cond_init (&rv->cancel_cond, NULL);
|
||||
|
||||
rv->fd_limit = (int)limit.rlim_cur;
|
||||
rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
|
||||
@ -375,6 +382,9 @@ thread_master_create (void)
|
||||
rv->spin = true;
|
||||
rv->handle_signals = true;
|
||||
rv->owner = pthread_self();
|
||||
rv->cancel_req = list_new ();
|
||||
rv->cancel_req->del = cancelreq_del;
|
||||
rv->canceled = true;
|
||||
pipe (rv->io_pipe);
|
||||
set_nonblocking (rv->io_pipe[0]);
|
||||
set_nonblocking (rv->io_pipe[1]);
|
||||
@ -533,6 +543,7 @@ thread_master_free (struct thread_master *m)
|
||||
pthread_mutex_destroy (&m->mtx);
|
||||
close (m->io_pipe[0]);
|
||||
close (m->io_pipe[1]);
|
||||
list_delete (m->cancel_req);
|
||||
|
||||
XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
|
||||
XFREE (MTYPE_THREAD_MASTER, m->handler.copy);
|
||||
@ -637,7 +648,7 @@ thread_get (struct thread_master *m, u_char type,
|
||||
|
||||
static int
|
||||
fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
|
||||
nfds_t count, struct timeval *timer_wait)
|
||||
nfds_t count, const struct timeval *timer_wait)
|
||||
{
|
||||
/* If timer_wait is null here, that means poll() should block indefinitely,
|
||||
* unless the thread_master has overriden it by setting ->selectpoll_timeout.
|
||||
@ -864,151 +875,288 @@ funcname_thread_add_event (struct thread_master *m,
|
||||
return thread;
|
||||
}
|
||||
|
||||
static void
|
||||
thread_cancel_read_or_write (struct thread *thread, short int state)
|
||||
{
|
||||
for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i)
|
||||
if (thread->master->handler.pfds[i].fd == thread->u.fd)
|
||||
{
|
||||
thread->master->handler.pfds[i].events &= ~(state);
|
||||
/* Thread cancellation ------------------------------------------------------ */
|
||||
|
||||
/* remove thread fds from pfd list */
|
||||
if (thread->master->handler.pfds[i].events == 0)
|
||||
{
|
||||
memmove(thread->master->handler.pfds+i,
|
||||
thread->master->handler.pfds+i+1,
|
||||
(thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd));
|
||||
thread->master->handler.pfdcount--;
|
||||
return;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* NOT's out the .events field of pollfd corresponding to the given file
|
||||
* descriptor. The event to be NOT'd is passed in the 'state' parameter.
|
||||
*
|
||||
* This needs to happen for both copies of pollfd's. See 'thread_fetch'
|
||||
* implementation for details.
|
||||
*
|
||||
* @param master
|
||||
* @param fd
|
||||
* @param state the event to cancel. One or more (OR'd together) of the
|
||||
* following:
|
||||
* - POLLIN
|
||||
* - POLLOUT
|
||||
*/
|
||||
static void
|
||||
thread_cancel_rw (struct thread_master *master, int fd, short state)
|
||||
{
|
||||
/* Cancel POLLHUP too just in case some bozo set it */
|
||||
state |= POLLHUP;
|
||||
|
||||
/* find the index of corresponding pollfd */
|
||||
nfds_t i;
|
||||
|
||||
for (i = 0; i < master->handler.pfdcount; i++)
|
||||
if (master->handler.pfds[i].fd == fd)
|
||||
break;
|
||||
|
||||
/* NOT out event. */
|
||||
master->handler.pfds[i].events &= ~(state);
|
||||
|
||||
/* If all events are canceled, delete / resize the pollfd array. */
|
||||
if (master->handler.pfds[i].events == 0)
|
||||
{
|
||||
memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
|
||||
(master->handler.pfdcount - i - 1) * sizeof (struct pollfd));
|
||||
master->handler.pfdcount--;
|
||||
}
|
||||
|
||||
/* If we have the same pollfd in the copy, perform the same operations,
|
||||
* otherwise return. */
|
||||
if (i >= master->handler.copycount)
|
||||
return;
|
||||
|
||||
master->handler.copy[i].events &= ~(state);
|
||||
|
||||
if (master->handler.copy[i].events == 0)
|
||||
{
|
||||
memmove(master->handler.copy + i, master->handler.copy + i + 1,
|
||||
(master->handler.copycount - i - 1) * sizeof (struct pollfd));
|
||||
master->handler.copycount--;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel thread from scheduler.
|
||||
* Process cancellation requests.
|
||||
*
|
||||
* This function is *NOT* MT-safe. DO NOT call it from any other pthread except
|
||||
* the one which owns thread->master. You will crash.
|
||||
* This may only be run from the pthread which owns the thread_master.
|
||||
*
|
||||
* @param master the thread master to process
|
||||
* @REQUIRE master->mtx
|
||||
*/
|
||||
void
|
||||
thread_cancel (struct thread *thread)
|
||||
static void
|
||||
do_thread_cancel (struct thread_master *master)
|
||||
{
|
||||
struct thread_list *list = NULL;
|
||||
struct pqueue *queue = NULL;
|
||||
struct thread **thread_array = NULL;
|
||||
|
||||
pthread_mutex_lock (&thread->mtx);
|
||||
pthread_mutex_lock (&thread->master->mtx);
|
||||
|
||||
assert (pthread_self() == thread->master->owner);
|
||||
|
||||
switch (thread->type)
|
||||
{
|
||||
case THREAD_READ:
|
||||
thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
|
||||
thread_array = thread->master->read;
|
||||
break;
|
||||
case THREAD_WRITE:
|
||||
thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
|
||||
thread_array = thread->master->write;
|
||||
break;
|
||||
case THREAD_TIMER:
|
||||
queue = thread->master->timer;
|
||||
break;
|
||||
case THREAD_EVENT:
|
||||
list = &thread->master->event;
|
||||
break;
|
||||
case THREAD_READY:
|
||||
list = &thread->master->ready;
|
||||
break;
|
||||
default:
|
||||
goto done;
|
||||
break;
|
||||
}
|
||||
|
||||
if (queue)
|
||||
{
|
||||
assert(thread->index >= 0);
|
||||
pqueue_remove (thread, queue);
|
||||
}
|
||||
else if (list)
|
||||
{
|
||||
thread_list_delete (list, thread);
|
||||
}
|
||||
else if (thread_array)
|
||||
{
|
||||
thread_array[thread->u.fd] = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(!"Thread should be either in queue or list or array!");
|
||||
}
|
||||
|
||||
if (thread->ref)
|
||||
*thread->ref = NULL;
|
||||
|
||||
thread_add_unuse (thread->master, thread);
|
||||
|
||||
done:
|
||||
pthread_mutex_unlock (&thread->master->mtx);
|
||||
pthread_mutex_unlock (&thread->mtx);
|
||||
}
|
||||
|
||||
/* Delete all events which has argument value arg. */
|
||||
unsigned int
|
||||
thread_cancel_event (struct thread_master *m, void *arg)
|
||||
{
|
||||
unsigned int ret = 0;
|
||||
struct thread *thread;
|
||||
struct thread *t;
|
||||
|
||||
pthread_mutex_lock (&m->mtx);
|
||||
{
|
||||
thread = m->event.head;
|
||||
while (thread)
|
||||
{
|
||||
t = thread;
|
||||
pthread_mutex_lock (&t->mtx);
|
||||
struct cancel_req *cr;
|
||||
struct listnode *ln;
|
||||
for (ALL_LIST_ELEMENTS_RO (master->cancel_req, ln, cr))
|
||||
{
|
||||
/* If this is an event object cancellation, linear search through event
|
||||
* list deleting any events which have the specified argument. We also
|
||||
* need to check every thread in the ready queue. */
|
||||
if (cr->eventobj)
|
||||
{
|
||||
thread = t->next;
|
||||
struct thread *t;
|
||||
thread = master->event.head;
|
||||
|
||||
if (t->arg == arg)
|
||||
while (thread)
|
||||
{
|
||||
ret++;
|
||||
thread_list_delete (&m->event, t);
|
||||
if (t->ref)
|
||||
*t->ref = NULL;
|
||||
thread_add_unuse (m, t);
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock (&t->mtx);
|
||||
}
|
||||
t = thread;
|
||||
thread = t->next;
|
||||
|
||||
/* thread can be on the ready list too */
|
||||
thread = m->ready.head;
|
||||
while (thread)
|
||||
{
|
||||
t = thread;
|
||||
pthread_mutex_lock (&t->mtx);
|
||||
if (t->arg == cr->eventobj)
|
||||
{
|
||||
thread_list_delete (&master->event, t);
|
||||
if (t->ref)
|
||||
*t->ref = NULL;
|
||||
thread_add_unuse (master, t);
|
||||
}
|
||||
}
|
||||
|
||||
thread = master->ready.head;
|
||||
while (thread)
|
||||
{
|
||||
t = thread;
|
||||
thread = t->next;
|
||||
|
||||
if (t->arg == cr->eventobj)
|
||||
{
|
||||
thread_list_delete (&master->ready, t);
|
||||
if (t->ref)
|
||||
*t->ref = NULL;
|
||||
thread_add_unuse (master, t);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
/* The pointer varies depending on whether the cancellation request was
|
||||
* made asynchronously or not. If it was, we need to check whether the
|
||||
* thread even exists anymore before cancelling it. */
|
||||
thread = (cr->thread) ? cr->thread : *cr->threadref;
|
||||
|
||||
if (!thread)
|
||||
continue;
|
||||
|
||||
/* Determine the appropriate queue to cancel the thread from */
|
||||
switch (thread->type)
|
||||
{
|
||||
thread = t->next;
|
||||
|
||||
if (t->arg == arg)
|
||||
{
|
||||
ret++;
|
||||
thread_list_delete (&m->ready, t);
|
||||
if (t->ref)
|
||||
*t->ref = NULL;
|
||||
thread_add_unuse (m, t);
|
||||
}
|
||||
case THREAD_READ:
|
||||
thread_cancel_rw (master, thread->u.fd, POLLIN);
|
||||
thread_array = master->read;
|
||||
break;
|
||||
case THREAD_WRITE:
|
||||
thread_cancel_rw (master, thread->u.fd, POLLOUT);
|
||||
thread_array = master->write;
|
||||
break;
|
||||
case THREAD_TIMER:
|
||||
queue = master->timer;
|
||||
break;
|
||||
case THREAD_EVENT:
|
||||
list = &master->event;
|
||||
break;
|
||||
case THREAD_READY:
|
||||
list = &master->ready;
|
||||
break;
|
||||
default:
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
pthread_mutex_unlock (&t->mtx);
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
return ret;
|
||||
|
||||
if (queue)
|
||||
{
|
||||
assert(thread->index >= 0);
|
||||
pqueue_remove (thread, queue);
|
||||
}
|
||||
else if (list)
|
||||
{
|
||||
thread_list_delete (list, thread);
|
||||
}
|
||||
else if (thread_array)
|
||||
{
|
||||
thread_array[thread->u.fd] = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(!"Thread should be either in queue or list or array!");
|
||||
}
|
||||
|
||||
if (thread->ref)
|
||||
*thread->ref = NULL;
|
||||
|
||||
thread_add_unuse (thread->master, thread);
|
||||
}
|
||||
|
||||
/* Delete and free all cancellation requests */
|
||||
list_delete_all_node (master->cancel_req);
|
||||
|
||||
/* Wake up any threads which may be blocked in thread_cancel_async() */
|
||||
master->canceled = true;
|
||||
pthread_cond_broadcast (&master->cancel_cond);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel any events which have the specified argument.
|
||||
*
|
||||
* MT-Unsafe
|
||||
*
|
||||
* @param m the thread_master to cancel from
|
||||
* @param arg the argument passed when creating the event
|
||||
*/
|
||||
void
|
||||
thread_cancel_event (struct thread_master *master, void *arg)
|
||||
{
|
||||
assert (master->owner == pthread_self());
|
||||
|
||||
pthread_mutex_lock (&master->mtx);
|
||||
{
|
||||
struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
|
||||
cr->eventobj = arg;
|
||||
listnode_add (master->cancel_req, cr);
|
||||
do_thread_cancel(master);
|
||||
}
|
||||
pthread_mutex_unlock (&master->mtx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel a specific task.
|
||||
*
|
||||
* MT-Unsafe
|
||||
*
|
||||
* @param thread task to cancel
|
||||
*/
|
||||
void
|
||||
thread_cancel (struct thread *thread)
|
||||
{
|
||||
assert (thread->master->owner == pthread_self());
|
||||
|
||||
pthread_mutex_lock (&thread->master->mtx);
|
||||
{
|
||||
struct cancel_req *cr = XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
|
||||
cr->thread = thread;
|
||||
listnode_add (thread->master->cancel_req, cr);
|
||||
do_thread_cancel (thread->master);
|
||||
}
|
||||
pthread_mutex_unlock (&thread->master->mtx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronous cancellation.
|
||||
*
|
||||
* Called with either a struct thread ** or void * to an event argument,
|
||||
* this function posts the correct cancellation request and blocks until it is
|
||||
* serviced.
|
||||
*
|
||||
* If the thread is currently running, execution blocks until it completes.
|
||||
*
|
||||
* The last two parameters are mutually exclusive, i.e. if you pass one the
|
||||
* other must be NULL.
|
||||
*
|
||||
* When the cancellation procedure executes on the target thread_master, the
|
||||
* thread * provided is checked for nullity. If it is null, the thread is
|
||||
* assumed to no longer exist and the cancellation request is a no-op. Thus
|
||||
* users of this API must pass a back-reference when scheduling the original
|
||||
* task.
|
||||
*
|
||||
* MT-Safe
|
||||
*
|
||||
* @param master the thread master with the relevant event / task
|
||||
* @param thread pointer to thread to cancel
|
||||
* @param eventobj the event
|
||||
*/
|
||||
void
|
||||
thread_cancel_async (struct thread_master *master, struct thread **thread,
|
||||
void *eventobj)
|
||||
{
|
||||
assert (!(thread && eventobj) && (thread || eventobj));
|
||||
assert (master->owner != pthread_self());
|
||||
|
||||
pthread_mutex_lock (&master->mtx);
|
||||
{
|
||||
master->canceled = false;
|
||||
|
||||
if (thread)
|
||||
{
|
||||
struct cancel_req *cr =
|
||||
XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
|
||||
cr->threadref = thread;
|
||||
listnode_add (master->cancel_req, cr);
|
||||
}
|
||||
else if (eventobj)
|
||||
{
|
||||
struct cancel_req *cr =
|
||||
XCALLOC (MTYPE_TMP, sizeof (struct cancel_req));
|
||||
cr->eventobj = eventobj;
|
||||
listnode_add (master->cancel_req, cr);
|
||||
}
|
||||
AWAKEN (master);
|
||||
|
||||
while (!master->canceled)
|
||||
pthread_cond_wait (&master->cancel_cond, &master->mtx);
|
||||
}
|
||||
pthread_mutex_unlock (&master->mtx);
|
||||
}
|
||||
/* ------------------------------------------------------------------------- */
|
||||
|
||||
static struct timeval *
|
||||
thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
|
||||
{
|
||||
@ -1053,13 +1201,22 @@ thread_process_io_helper (struct thread_master *m, struct thread *thread,
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process I/O events.
|
||||
*
|
||||
* Walks through file descriptor array looking for those pollfds whose .revents
|
||||
* field has something interesting. Deletes any invalid file descriptors.
|
||||
*
|
||||
* @param m the thread master
|
||||
* @param num the number of active file descriptors (return value of poll())
|
||||
*/
|
||||
static void
|
||||
thread_process_io (struct thread_master *m, struct pollfd *pfds,
|
||||
unsigned int num, unsigned int count)
|
||||
thread_process_io (struct thread_master *m, unsigned int num)
|
||||
{
|
||||
unsigned int ready = 0;
|
||||
struct pollfd *pfds = m->handler.copy;
|
||||
|
||||
for (nfds_t i = 0; i < count && ready < num ; ++i)
|
||||
for (nfds_t i = 0; i < m->handler.copycount && ready < num ; ++i)
|
||||
{
|
||||
/* no event for current fd? immediately continue */
|
||||
if (pfds[i].revents == 0)
|
||||
@ -1088,8 +1245,9 @@ thread_process_io (struct thread_master *m, struct pollfd *pfds,
|
||||
m->handler.pfdcount--;
|
||||
|
||||
memmove (pfds + i, pfds + i + 1,
|
||||
(count - i - 1) * sizeof(struct pollfd));
|
||||
count--;
|
||||
(m->handler.copycount - i - 1) * sizeof(struct pollfd));
|
||||
m->handler.copycount--;
|
||||
|
||||
i--;
|
||||
}
|
||||
}
|
||||
@ -1139,98 +1297,108 @@ thread_process (struct thread_list *list)
|
||||
struct thread *
|
||||
thread_fetch (struct thread_master *m, struct thread *fetch)
|
||||
{
|
||||
struct thread *thread;
|
||||
struct thread *thread = NULL;
|
||||
struct timeval now;
|
||||
struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
|
||||
struct timeval *timer_wait = &timer_val;
|
||||
struct timeval zerotime = { 0, 0 };
|
||||
struct timeval tv;
|
||||
struct timeval *tw;
|
||||
|
||||
do
|
||||
{
|
||||
int num = 0;
|
||||
int num = 0;
|
||||
|
||||
/* Signals pre-empt everything */
|
||||
if (m->handle_signals)
|
||||
quagga_sigevent_process ();
|
||||
|
||||
pthread_mutex_lock (&m->mtx);
|
||||
/* Drain the ready queue of already scheduled jobs, before scheduling
|
||||
* more.
|
||||
*/
|
||||
if ((thread = thread_trim_head (&m->ready)) != NULL)
|
||||
{
|
||||
fetch = thread_run (m, thread, fetch);
|
||||
if (fetch->ref)
|
||||
*fetch->ref = NULL;
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
return fetch;
|
||||
}
|
||||
|
||||
/* To be fair to all kinds of threads, and avoid starvation, we
|
||||
* need to be careful to consider all thread types for scheduling
|
||||
* in each quanta. I.e. we should not return early from here on.
|
||||
*/
|
||||
|
||||
/* Normal event are the next highest priority. */
|
||||
thread_process (&m->event);
|
||||
|
||||
/* Calculate select wait timer if nothing else to do */
|
||||
if (m->ready.count == 0)
|
||||
{
|
||||
timer_wait = thread_timer_wait (m->timer, &timer_val);
|
||||
}
|
||||
do {
|
||||
/* Handle signals if any */
|
||||
if (m->handle_signals)
|
||||
quagga_sigevent_process ();
|
||||
|
||||
if (timer_wait && timer_wait->tv_sec < 0)
|
||||
{
|
||||
timerclear(&timer_val);
|
||||
timer_wait = &timer_val;
|
||||
}
|
||||
pthread_mutex_lock (&m->mtx);
|
||||
|
||||
unsigned int count = m->handler.pfdcount + m->handler.pfdcountsnmp;
|
||||
memcpy (m->handler.copy, m->handler.pfds, count * sizeof (struct pollfd));
|
||||
/* Process any pending cancellation requests */
|
||||
do_thread_cancel (m);
|
||||
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
{
|
||||
num = fd_poll (m, m->handler.copy, m->handler.pfdsize, count, timer_wait);
|
||||
/* Post events to ready queue. This must come before the following block
|
||||
* since events should occur immediately */
|
||||
thread_process (&m->event);
|
||||
|
||||
/* If there are no tasks on the ready queue, we will poll() until a timer
|
||||
* expires or we receive I/O, whichever comes first. The strategy for doing
|
||||
* this is:
|
||||
*
|
||||
* - If there are events pending, set the poll() timeout to zero
|
||||
* - If there are no events pending, but there are timers pending, set the
|
||||
* timeout to the smallest remaining time on any timer
|
||||
* - If there are neither timers nor events pending, but there are file
|
||||
* descriptors pending, block indefinitely in poll()
|
||||
* - If nothing is pending, it's time for the application to die
|
||||
*
|
||||
* In every case except the last, we need to hit poll() at least once per
|
||||
* loop to avoid starvation by events */
|
||||
|
||||
if (m->ready.count == 0)
|
||||
tw = thread_timer_wait (m->timer, &tv);
|
||||
|
||||
if (m->ready.count != 0 || (tw && !timercmp (tw, &zerotime, >)))
|
||||
tw = &zerotime;
|
||||
|
||||
if (!tw && m->handler.pfdcount == 0)
|
||||
{ /* die */
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
fetch = NULL;
|
||||
break;
|
||||
}
|
||||
pthread_mutex_lock (&m->mtx);
|
||||
|
||||
/* Signals should get quick treatment */
|
||||
if (num < 0)
|
||||
{
|
||||
if (errno == EINTR)
|
||||
{
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
continue; /* signal received - process it */
|
||||
}
|
||||
zlog_warn ("poll() error: %s", safe_strerror (errno));
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
return NULL;
|
||||
}
|
||||
/* Copy pollfd array + # active pollfds in it. Not necessary to copy
|
||||
* the array size as this is fixed. */
|
||||
m->handler.copycount = m->handler.pfdcount;
|
||||
memcpy (m->handler.copy, m->handler.pfds,
|
||||
m->handler.copycount * sizeof (struct pollfd));
|
||||
|
||||
/* Check foreground timers. Historically, they have had higher
|
||||
* priority than I/O threads, so let's push them onto the ready
|
||||
* list in front of the I/O threads. */
|
||||
monotime(&now);
|
||||
thread_process_timers (m->timer, &now);
|
||||
|
||||
/* Got IO, process it */
|
||||
if (num > 0)
|
||||
thread_process_io (m, m->handler.copy, num, count);
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
{
|
||||
num = fd_poll (m, m->handler.copy, m->handler.pfdsize,
|
||||
m->handler.copycount, tw);
|
||||
}
|
||||
pthread_mutex_lock (&m->mtx);
|
||||
|
||||
if ((thread = thread_trim_head (&m->ready)) != NULL)
|
||||
{
|
||||
fetch = thread_run (m, thread, fetch);
|
||||
if (fetch->ref)
|
||||
*fetch->ref = NULL;
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
return fetch;
|
||||
}
|
||||
/* Handle any errors received in poll() */
|
||||
if (num < 0)
|
||||
{
|
||||
if (errno == EINTR)
|
||||
{
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
continue; /* loop around to signal handler */
|
||||
}
|
||||
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
/* else die */
|
||||
zlog_warn ("poll() error: %s", safe_strerror (errno));
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
fetch = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
} while (m->spin);
|
||||
/* Since we could have received more cancellation requests during poll(), process those */
|
||||
do_thread_cancel (m);
|
||||
|
||||
return NULL;
|
||||
/* Post timers to ready queue. */
|
||||
monotime(&now);
|
||||
thread_process_timers (m->timer, &now);
|
||||
|
||||
/* Post I/O to ready queue. */
|
||||
if (num > 0)
|
||||
thread_process_io (m, num);
|
||||
|
||||
/* If we have a ready task, break the loop and return it to the caller */
|
||||
if ((thread = thread_trim_head (&m->ready)))
|
||||
{
|
||||
fetch = thread_run (m, thread, fetch);
|
||||
if (fetch->ref)
|
||||
*fetch->ref = NULL;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock (&m->mtx);
|
||||
|
||||
} while (!thread && m->spin);
|
||||
|
||||
return fetch;
|
||||
}
|
||||
|
||||
unsigned long
|
||||
|
25
lib/thread.h
25
lib/thread.h
@ -47,16 +47,25 @@ struct pqueue;
|
||||
|
||||
struct fd_handler
|
||||
{
|
||||
/* number of pfd stored in pfds */
|
||||
nfds_t pfdcount;
|
||||
/* number of pfd stored in pfds + number of snmp pfd */
|
||||
nfds_t pfdcountsnmp;
|
||||
/* number of pfd that fit in the allocated space of pfds */
|
||||
/* number of pfd that fit in the allocated space of pfds. This is a constant
|
||||
* and is the same for both pfds and copy. */
|
||||
nfds_t pfdsize;
|
||||
|
||||
/* file descriptors to monitor for i/o */
|
||||
struct pollfd *pfds;
|
||||
/* number of pollfds stored in pfds */
|
||||
nfds_t pfdcount;
|
||||
|
||||
/* chunk used for temp copy of pollfds */
|
||||
struct pollfd *copy;
|
||||
/* number of pollfds stored in copy */
|
||||
nfds_t copycount;
|
||||
};
|
||||
|
||||
struct cancel_req {
|
||||
struct thread *thread;
|
||||
void *eventobj;
|
||||
struct thread **threadref;
|
||||
};
|
||||
|
||||
/* Master of the theads. */
|
||||
@ -68,6 +77,9 @@ struct thread_master
|
||||
struct thread_list event;
|
||||
struct thread_list ready;
|
||||
struct thread_list unuse;
|
||||
struct list *cancel_req;
|
||||
bool canceled;
|
||||
pthread_cond_t cancel_cond;
|
||||
int io_pipe[2];
|
||||
int fd_limit;
|
||||
struct fd_handler handler;
|
||||
@ -189,7 +201,8 @@ extern void funcname_thread_execute (struct thread_master *,
|
||||
#undef debugargdef
|
||||
|
||||
extern void thread_cancel (struct thread *);
|
||||
extern unsigned int thread_cancel_event (struct thread_master *, void *);
|
||||
extern void thread_cancel_async (struct thread_master *, struct thread **, void *);
|
||||
extern void thread_cancel_event (struct thread_master *, void *);
|
||||
extern struct thread *thread_fetch (struct thread_master *, struct thread *);
|
||||
extern void thread_call (struct thread *);
|
||||
extern unsigned long thread_timer_remain_second (struct thread *);
|
||||
|
Loading…
Reference in New Issue
Block a user