lib: remove select()

poll() is present on every supported platform and does not have an upper
limit on file descriptors.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
[DL: split off from AWAKEN() change]
This commit is contained in:
Quentin Young 2017-05-10 18:09:49 +00:00 committed by Quentin Young
parent e22ab7727d
commit 75bcb3558d
2 changed files with 75 additions and 247 deletions

View File

@ -379,12 +379,11 @@ thread_master_create (void)
rv->handle_signals = true;
rv->owner = pthread_self();
#if defined(HAVE_POLL_CALL)
rv->handler.pfdsize = rv->fd_limit;
rv->handler.pfdcount = 0;
rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER,
sizeof (struct pollfd) * rv->handler.pfdsize);
#endif
return rv;
}
@ -419,18 +418,6 @@ thread_list_delete (struct thread_list *list, struct thread *thread)
return thread;
}
static void
thread_delete_fd (struct thread **thread_array, struct thread *thread)
{
thread_array[thread->u.fd] = NULL;
}
static void
thread_add_fd (struct thread **thread_array, struct thread *thread)
{
thread_array[thread->u.fd] = thread;
}
/* Thread list is empty or not. */
static int
thread_empty (struct thread_list *list)
@ -544,9 +531,7 @@ thread_master_free (struct thread_master *m)
thread_queue_free (m, m->background);
pthread_mutex_destroy (&m->mtx);
#if defined(HAVE_POLL_CALL)
XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
#endif
XFREE (MTYPE_THREAD_MASTER, m);
pthread_mutex_lock (&cpu_record_mtx);
@ -646,58 +631,9 @@ thread_get (struct thread_master *m, u_char type,
return thread;
}
#if defined (HAVE_POLL_CALL)
#define fd_copy_fd_set(X) (X)
/* generic add thread function */
static struct thread *
generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
void *arg, int fd, int dir, debugargdef)
{
struct thread *thread;
u_char type;
short int event;
if (dir == THREAD_READ)
{
event = (POLLIN | POLLHUP);
type = THREAD_READ;
}
else
{
event = (POLLOUT | POLLHUP);
type = THREAD_WRITE;
}
nfds_t queuepos = m->handler.pfdcount;
nfds_t i=0;
for (i=0; i<m->handler.pfdcount; i++)
if (m->handler.pfds[i].fd == fd)
{
queuepos = i;
break;
}
/* is there enough space for a new fd? */
assert (queuepos < m->handler.pfdsize);
thread = thread_get (m, type, func, arg, debugargpass);
m->handler.pfds[queuepos].fd = fd;
m->handler.pfds[queuepos].events |= event;
if (queuepos == m->handler.pfdcount)
m->handler.pfdcount++;
return thread;
}
#else
#define fd_copy_fd_set(X) (X)
#endif
static int
fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize,
nfds_t count, struct timeval *timer_wait)
{
int num;
@ -714,7 +650,6 @@ fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set
* no event is detected. If the value is zero, the behavior is default.
*/
#if defined(HAVE_POLL_CALL)
int timeout = -1;
if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value
@ -725,57 +660,10 @@ fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set
timeout = 0;
num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
#else
struct timeval timeout;
if (m->selectpoll_timeout > 0) // use the user's timeout
{
timeout.tv_sec = m->selectpoll_timeout / 1000;
timeout.tv_usec = (m->selectpoll_timeout % 1000) * 1000;
timer_wait = &timeout;
}
else if (m->selectpoll_timeout < 0) // effect a poll (return immediately)
{
timeout.tv_sec = 0;
timeout.tv_usec = 0;
timer_wait = &timeout;
}
num = select (size, read, write, except, timer_wait);
#endif
return num;
}
static int
fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos)
{
#if defined(HAVE_POLL_CALL)
return 1;
#else
return FD_ISSET (THREAD_FD (thread), fdset);
#endif
}
static int
fd_clear_read_write (struct thread *thread)
{
#if !defined(HAVE_POLL_CALL)
thread_fd_set *fdset = NULL;
int fd = THREAD_FD (thread);
if (thread->type == THREAD_READ)
fdset = &thread->master->handler.readfd;
else
fdset = &thread->master->handler.writefd;
if (!FD_ISSET (fd, fdset))
return 0;
FD_CLR (fd, fdset);
#endif
return 1;
}
/* Add new read thread. */
struct thread *
funcname_thread_add_read_write (int dir, struct thread_master *m,
@ -792,32 +680,26 @@ funcname_thread_add_read_write (int dir, struct thread_master *m,
return NULL;
}
#if defined (HAVE_POLL_CALL)
thread = generic_thread_add(m, func, arg, fd, dir, debugargpass);
#else
if (fd >= FD_SETSIZE)
{
zlog_err ("File descriptor %d is >= FD_SETSIZE (%d). Please recompile"
"with --enable-poll=yes", fd, FD_SETSIZE);
assert (fd < FD_SETSIZE && !"fd >= FD_SETSIZE");
}
thread_fd_set *fdset = NULL;
if (dir == THREAD_READ)
fdset = &m->handler.readfd;
else
fdset = &m->handler.writefd;
/* default to a new pollfd */
nfds_t queuepos = m->handler.pfdcount;
if (FD_ISSET (fd, fdset))
{
zlog_warn ("There is already %s fd [%d]",
(dir == THREAD_READ) ? "read" : "write", fd);
}
else
{
FD_SET (fd, fdset);
thread = thread_get (m, dir, func, arg, debugargpass);
}
#endif
/* if we already have a pollfd for our file descriptor, find and use it */
for (nfds_t i = 0; i < m->handler.pfdcount; i++)
if (m->handler.pfds[i].fd == fd)
{
queuepos = i;
break;
}
assert (queuepos < m->handler.pfdsize);
thread = thread_get (m, dir, func, arg, debugargpass);
m->handler.pfds[queuepos].fd = fd;
m->handler.pfds[queuepos].events |= (dir == THREAD_READ ? POLLIN : POLLOUT);
if (queuepos == m->handler.pfdcount)
m->handler.pfdcount++;
if (thread)
{
@ -825,9 +707,9 @@ funcname_thread_add_read_write (int dir, struct thread_master *m,
{
thread->u.fd = fd;
if (dir == THREAD_READ)
thread_add_fd (m->read, thread);
m->read[thread->u.fd] = thread;
else
thread_add_fd (m->write, thread);
m->write[thread->u.fd] = thread;
}
pthread_mutex_unlock (&thread->mtx);
@ -995,10 +877,7 @@ funcname_thread_add_event (struct thread_master *m,
static void
thread_cancel_read_or_write (struct thread *thread, short int state)
{
#if defined(HAVE_POLL_CALL)
nfds_t i;
for (i=0;i<thread->master->handler.pfdcount;++i)
for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i)
if (thread->master->handler.pfds[i].fd == thread->u.fd)
{
thread->master->handler.pfds[i].events &= ~(state);
@ -1013,9 +892,6 @@ thread_cancel_read_or_write (struct thread *thread, short int state)
return;
}
}
#endif
fd_clear_read_write (thread);
}
/**
@ -1039,19 +915,11 @@ thread_cancel (struct thread *thread)
switch (thread->type)
{
case THREAD_READ:
#if defined (HAVE_POLL_CALL)
thread_cancel_read_or_write (thread, POLLIN | POLLHUP);
#else
thread_cancel_read_or_write (thread, 0);
#endif
thread_array = thread->master->read;
break;
case THREAD_WRITE:
#if defined (HAVE_POLL_CALL)
thread_cancel_read_or_write (thread, POLLOUT | POLLHUP);
#else
thread_cancel_read_or_write (thread, 0);
#endif
thread_array = thread->master->write;
break;
case THREAD_TIMER:
@ -1082,7 +950,7 @@ thread_cancel (struct thread *thread)
}
else if (thread_array)
{
thread_delete_fd (thread_array, thread);
thread_array[thread->u.fd] = NULL;
}
else
{
@ -1168,7 +1036,7 @@ thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
static struct thread *
thread_run (struct thread_master *m, struct thread *thread,
struct thread *fetch)
struct thread *fetch)
{
*fetch = *thread;
thread_add_unuse (m, thread);
@ -1176,7 +1044,8 @@ thread_run (struct thread_master *m, struct thread *thread,
}
static int
thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos)
thread_process_io_helper (struct thread_master *m, struct thread *thread,
short state, int pos)
{
struct thread **thread_array;
@ -1188,76 +1057,60 @@ thread_process_fds_helper (struct thread_master *m, struct thread *thread, threa
else
thread_array = m->write;
if (fd_is_set (thread, fdset, pos))
{
fd_clear_read_write (thread);
thread_delete_fd (thread_array, thread);
thread_list_add (&m->ready, thread);
thread->type = THREAD_READY;
#if defined(HAVE_POLL_CALL)
thread->master->handler.pfds[pos].events &= ~(state);
#endif
return 1;
}
return 0;
thread_array[thread->u.fd] = NULL;
thread_list_add (&m->ready, thread);
thread->type = THREAD_READY;
/* if another pthread scheduled this file descriptor for the event we're
* responding to, no problem; we're getting to it now */
thread->master->handler.pfds[pos].events &= ~(state);
return 1;
}
#if defined(HAVE_POLL_CALL)
/* check poll events */
static void
check_pollfds(struct thread_master *m, fd_set *readfd, int num)
thread_process_io (struct thread_master *m, struct pollfd *pfds,
unsigned int num, unsigned int count)
{
nfds_t i = 0;
int ready = 0;
for (i = 0; i < m->handler.pfdcount && ready < num ; ++i)
unsigned int ready = 0;
for (nfds_t i = 0; i < count && ready < num ; ++i)
{
/* no event for current fd? immideatly continue */
if(m->handler.pfds[i].revents == 0)
/* no event for current fd? immediately continue */
if (pfds[i].revents == 0)
continue;
ready++;
/* POLLIN / POLLOUT process event */
if (m->handler.pfds[i].revents & (POLLIN | POLLHUP))
thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
if (m->handler.pfds[i].revents & POLLOUT)
thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
/* Unless someone has called thread_cancel from another pthread, the only
* thing that could have changed in m->handler.pfds while we were
* asleep is the .events field in a given pollfd. Barring thread_cancel()
* that value should be a superset of the values we have in our copy, so
* there's no need to update it. Similarily, barring deletion, the fd
* should still be a valid index into the master's pfds. */
if (pfds[i].revents & (POLLIN | POLLHUP))
thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN, i);
if (pfds[i].revents & POLLOUT)
thread_process_io_helper(m, m->write[pfds[i].fd], POLLOUT, i);
/* remove fd from list on POLLNVAL */
if (m->handler.pfds[i].revents & POLLNVAL)
/* if one of our file descriptors is garbage, remove the same from
* both pfds + update sizes and index */
if (pfds[i].revents & POLLNVAL)
{
memmove(m->handler.pfds+i,
m->handler.pfds+i+1,
(m->handler.pfdsize-i-1) * sizeof(struct pollfd));
m->handler.pfdcount--;
i--;
memmove (m->handler.pfds + i,
m->handler.pfds + i + 1,
(m->handler.pfdcount - i - 1) * sizeof(struct pollfd));
m->handler.pfdcount--;
memmove (pfds + i, pfds + i + 1,
(count - i - 1) * sizeof(struct pollfd));
count--;
i--;
}
else
m->handler.pfds[i].revents = 0;
}
}
#endif
static void
thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num)
{
#if defined (HAVE_POLL_CALL)
check_pollfds (m, rset, num);
#else
int ready = 0, index;
for (index = 0; index < m->fd_limit && ready < num; ++index)
{
ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0);
ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0);
}
#endif
}
/* Add all timers that have popped to the ready list. */
static unsigned int
thread_timer_process (struct pqueue *queue, struct timeval *timenow)
thread_process_timers (struct pqueue *queue, struct timeval *timenow)
{
struct thread *thread;
unsigned int ready = 0;
@ -1300,9 +1153,6 @@ struct thread *
thread_fetch (struct thread_master *m, struct thread *fetch)
{
struct thread *thread;
thread_fd_set readfd;
thread_fd_set writefd;
thread_fd_set exceptfd;
struct timeval now;
struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 };
struct timeval timer_val_bg;
@ -1338,13 +1188,6 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
/* Normal event are the next highest priority. */
thread_process (&m->event);
/* Structure copy. */
#if !defined(HAVE_POLL_CALL)
readfd = fd_copy_fd_set(m->handler.readfd);
writefd = fd_copy_fd_set(m->handler.writefd);
exceptfd = fd_copy_fd_set(m->handler.exceptfd);
#endif
/* Calculate select wait timer if nothing else to do */
if (m->ready.count == 0)
{
@ -1362,7 +1205,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
timer_wait = &timer_val;
}
num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
num = fd_poll (m, pfds, m->handler.pfdsize, count, timer_wait);
/* Signals should get quick treatment */
if (num < 0)
@ -1372,20 +1215,20 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
pthread_mutex_unlock (&m->mtx);
continue; /* signal received - process it */
}
zlog_warn ("select() error: %s", safe_strerror (errno));
zlog_warn ("poll() error: %s", safe_strerror (errno));
pthread_mutex_unlock (&m->mtx);
return NULL;
}
/* Check foreground timers. Historically, they have had higher
priority than I/O threads, so let's push them onto the ready
list in front of the I/O threads. */
* priority than I/O threads, so let's push them onto the ready
* list in front of the I/O threads. */
monotime(&now);
thread_timer_process (m->timer, &now);
thread_process_timers (m->timer, &now);
/* Got IO, process it */
if (num > 0)
thread_process_fds (m, &readfd, &writefd, num);
thread_process_io (m, pfds, num, count);
#if 0
/* If any threads were made ready above (I/O or foreground timer),
@ -1403,7 +1246,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
#endif
/* Background timer/events, lowest priority */
thread_timer_process (m->background, &now);
thread_process_timers (m->background, &now);
if ((thread = thread_trim_head (&m->ready)) != NULL)
{

View File

@ -22,8 +22,9 @@
#define _ZEBRA_THREAD_H
#include <zebra.h>
#include "monotime.h"
#include <pthread.h>
#include <poll.h>
#include "monotime.h"
struct rusage_t
{
@ -44,14 +45,6 @@ struct thread_list
struct pqueue;
/*
* Abstract it so we can use different methodologies to
* select on data.
*/
typedef fd_set thread_fd_set;
#if defined(HAVE_POLL_CALL)
#include <poll.h>
struct fd_handler
{
/* number of pfd stored in pfds */
@ -62,14 +55,6 @@ struct fd_handler
nfds_t pfdsize;
struct pollfd *pfds;
};
#else
struct fd_handler
{
fd_set readfd;
fd_set writefd;
fd_set exceptfd;
};
#endif
/* Master of the theads. */
struct thread_master