lib: Add ability to use poll() instead of select

This patch originated w/ Hannes Hofer <hhofer@barracuda.com>.
I've taken the patch fixed some bugs and reworked the code
to allow both poll and select to be choosen at compile time.

Signed-off-by: Donald Sharp <sharpd@cumulusnetworks.com>
This commit is contained in:
Donald Sharp 2016-03-04 06:28:29 +00:00
parent 5d4ccd4e85
commit 0a95a0d0b6
3 changed files with 287 additions and 38 deletions

View File

@ -300,6 +300,8 @@ AC_ARG_ENABLE(fpm,
[ --enable-fpm enable Forwarding Plane Manager support])
AC_ARG_ENABLE(systemd,
[ --enable-systemd enable Systemd support])
AC_ARG_ENABLE(poll,
[ --enable-poll enable usage of Poll instead of select])
AC_ARG_ENABLE(werror,
AS_HELP_STRING([--enable-werror], [enable -Werror (recommended for developers only)]))
AC_ARG_ENABLE(cumulus,
@ -331,6 +333,10 @@ if test "${enable_systemd}" = "yes" ; then
LIBS="$LIBS -lsystemd "
fi
if test "${enable_poll}" = "yes" ; then
AC_DEFINE(HAVE_POLL,,Compile systemd support in)
fi
if test "${enable_cumulus}" = "yes" ; then
AC_DEFINE(HAVE_CUMULUS,,Compile Special Cumulus Code in)
fi

View File

@ -57,7 +57,6 @@ static unsigned short timers_inited;
static struct hash *cpu_record = NULL;
/* Adjust so that tv_usec is in the range [0,TIMER_SECOND_MICRO).
And change negative values to 0. */
static struct timeval
@ -520,7 +519,7 @@ thread_timer_update(void *node, int actual_position)
/* Allocate new thread master. */
struct thread_master *
thread_master_create ()
thread_master_create (void)
{
struct thread_master *rv;
struct rlimit limit;
@ -560,6 +559,12 @@ thread_master_create ()
rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
rv->timer->update = rv->background->update = thread_timer_update;
#if defined(HAVE_POLL)
rv->handler.pfdsize = 64;
rv->handler.pfdcount = 0;
rv->handler.pfds = (struct pollfd *) malloc (sizeof (struct pollfd) * rv->handler.pfdsize);
memset (rv->handler.pfds, 0, sizeof (struct pollfd) * rv->handler.pfdsize);
#endif
return rv;
}
@ -710,7 +715,10 @@ thread_master_free (struct thread_master *m)
thread_list_free (m, &m->ready);
thread_list_free (m, &m->unuse);
thread_queue_free (m, m->background);
#if defined(HAVE_POLL)
XFREE (MTYPE_THREAD_MASTER, m->handler.pfds);
#endif
XFREE (MTYPE_THREAD_MASTER, m);
if (cpu_record)
@ -783,27 +791,123 @@ thread_get (struct thread_master *m, u_char type,
return thread;
}
#if defined (HAVE_POLL)
#define fd_copy_fd_set(X) (X)
static int
fd_select (int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *t)
static short
realloc_pfds (struct thread_master *m, int fd)
{
return(select(size, read, write, except, t));
size_t oldpfdlen = m->handler.pfdsize * sizeof(struct pollfd);
void *newpfd = NULL;
m->handler.pfdsize *= 2;
newpfd = XREALLOC (MTYPE_THREAD, m->handler.pfds, m->handler.pfdsize * sizeof(struct pollfd));
if (newpfd == NULL)
{
close(fd);
zlog (NULL, LOG_ERR, "failed to allocate space for pollfds");
return 0;
}
memset((struct pollfd*)newpfd + (m->handler.pfdsize / 2), 0, oldpfdlen);
m->handler.pfds = (struct pollfd*)newpfd;
return 1;
}
/* generic add thread function */
static struct thread *
generic_thread_add(struct thread_master *m, int (*func) (struct thread *),
void *arg, int fd, const char* funcname, int dir)
{
struct thread *thread;
u_char type;
short int event;
if (dir == THREAD_READ)
{
event = (POLLIN | POLLHUP);
type = THREAD_READ;
}
else
{
event = (POLLOUT | POLLHUP);
type = THREAD_WRITE;
}
nfds_t queuepos = m->handler.pfdcount;
nfds_t i=0;
for (i=0; i<m->handler.pfdcount; i++)
if (m->handler.pfds[i].fd == fd)
{
queuepos = i;
break;
}
/* is there enough space for a new fd? */
if (queuepos >= m->handler.pfdsize)
if (realloc_pfds(m, fd) == 0)
return NULL;
thread = thread_get (m, type, func, arg, funcname);
m->handler.pfds[queuepos].fd = fd;
m->handler.pfds[queuepos].events |= event;
if (queuepos == m->handler.pfdcount)
m->handler.pfdcount++;
return thread;
}
#else
#define fd_copy_fd_set(X) (X)
#endif
static int
fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait)
{
int num;
#if defined(HAVE_POLL)
/* recalc timeout for poll. Attention NULL pointer is no timeout with
select, where with poll no timeount is -1 */
int timeout = -1;
if (timer_wait != NULL)
timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000);
num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout);
#else
num = select (size, read, write, except, timer_wait);
#endif
return num;
}
static int
fd_is_set (int fd, thread_fd_set *fdset)
fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos)
{
return FD_ISSET (fd, fdset);
#if defined(HAVE_POLL)
return 1;
#else
return FD_ISSET (THREAD_FD (thread), fdset);
#endif
}
static int
fd_clear_read_write (int fd, thread_fd_set *fdset)
fd_clear_read_write (struct thread *thread)
{
#if !defined(HAVE_POLL)
thread_fd_set *fdset = NULL;
int fd = THREAD_FD (thread);
if (thread->type == THREAD_READ)
fdset = &thread->master->handler.readfd;
else
fdset = &thread->master->handler.writefd;
if (!FD_ISSET (fd, fdset))
return 0;
FD_CLR (fd, fdset);
#endif
return 1;
}
@ -813,13 +917,21 @@ funcname_thread_add_read_write (int dir, struct thread_master *m,
int (*func) (struct thread *), void *arg, int fd, const char* funcname)
{
struct thread *thread = NULL;
#if !defined(HAVE_POLL)
thread_fd_set *fdset = NULL;
if (dir == THREAD_READ)
fdset = &m->readfd;
fdset = &m->handler.readfd;
else
fdset = &m->writefd;
fdset = &m->handler.writefd;
#endif
#if defined (HAVE_POLL)
thread = generic_thread_add(m, func, arg, fd, funcname, dir);
if (thread == NULL)
return NULL;
#else
if (FD_ISSET (fd, fdset))
{
zlog (NULL, LOG_WARNING, "There is already %s fd [%d]", (dir = THREAD_READ) ? "read" : "write", fd);
@ -827,8 +939,9 @@ funcname_thread_add_read_write (int dir, struct thread_master *m,
}
FD_SET (fd, fdset);
thread = thread_get (m, dir, func, arg, funcname);
#endif
thread->u.fd = fd;
if (dir == THREAD_READ)
thread_add_fd (m->read, thread);
@ -945,6 +1058,27 @@ funcname_thread_add_event (struct thread_master *m,
return thread;
}
static void
thread_cancel_read_write (struct thread *thread)
{
#if defined(HAVE_POLL)
nfds_t i;
for (i=0;i<thread->master->handler.pfdcount;++i)
if (thread->master->handler.pfds[i].fd == thread->u.fd)
{
/* remove thread fds from pfd list */
memmove(thread->master->handler.pfds+i,
thread->master->handler.pfds+i+1,
(thread->master->handler.pfdsize-i-1) * sizeof(struct pollfd));
i--;
thread->master->handler.pfdcount--;
}
#endif
fd_clear_read_write (thread);
}
/* Cancel thread from scheduler. */
void
thread_cancel (struct thread *thread)
@ -956,11 +1090,11 @@ thread_cancel (struct thread *thread)
switch (thread->type)
{
case THREAD_READ:
assert (fd_clear_read_write (thread->u.fd, &thread->master->readfd));
thread_cancel_read_write (thread);
thread_array = thread->master->read;
break;
case THREAD_WRITE:
assert (fd_clear_read_write (thread->u.fd, &thread->master->writefd));
thread_cancel_read_write (thread);
thread_array = thread->master->write;
break;
case THREAD_TIMER:
@ -1070,47 +1204,117 @@ thread_run (struct thread_master *m, struct thread *thread,
}
static int
thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset)
thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos)
{
thread_fd_set *mfdset = NULL;
struct thread **thread_array;
if (!thread)
return 0;
if (thread->type == THREAD_READ)
{
mfdset = &m->readfd;
thread_array = m->read;
}
thread_array = m->read;
else
{
mfdset = &m->writefd;
thread_array = m->write;
}
thread_array = m->write;
if (fd_is_set (THREAD_FD (thread), fdset))
if (fd_is_set (thread, fdset, pos))
{
fd_clear_read_write (THREAD_FD (thread), mfdset);
fd_clear_read_write (thread);
thread_delete_fd (thread_array, thread);
thread_list_add (&m->ready, thread);
thread->type = THREAD_READY;
#if defined(HAVE_POLL)
thread->master->handler.pfds[pos].events &= ~(state);
#endif
return 1;
}
return 0;
}
static int
#if defined(HAVE_POLL)
#if defined(HAVE_SNMP)
/* add snmp fds to poll set */
static void
add_snmp_pollfds(struct thread_master *m, fd_set *snmpfds, int fdsetsize)
{
int i;
m->handler.pfdcountsnmp = m->handler.pfdcount;
/* cycle trough fds and add neccessary fds to poll set */
for (i=0;i<fdsetsize;++i)
{
if (FD_ISSET(i, snmpfds))
{
if (m->handler.pfdcountsnmp > m->handler.pfdsize)
if (realloc_pfds(m, i) < 0)
return;
m->handler.pfds[m->handler.pfdcountsnmp].fd = i;
m->handler.pfds[m->handler.pfdcountsnmp].events = POLLIN;
m->handler.pfdcountsnmp++;
}
}
}
#endif
/* check poll events */
static void
check_pollfds(struct thread_master *m, fd_set *readfd, int num)
{
nfds_t i = 0;
int ready = 0;
for (i = 0; i < m->handler.pfdcount && ready < num ; ++i)
{
/* no event for current fd? immideatly continue */
if(m->handler.pfds[i].revents == 0)
continue;
/* remove fd from list on POLLNVAL */
if (m->handler.pfds[i].revents & POLLNVAL)
{
memmove(m->handler.pfds+i,
m->handler.pfds+i+1,
(m->handler.pfdsize-i-1) * sizeof(struct pollfd));
m->handler.pfdcount--;
i--;
continue;
}
/* POLLIN / POLLOUT process event */
if (m->handler.pfds[i].revents & POLLIN)
ready += thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i);
if (m->handler.pfds[i].revents & POLLOUT)
ready += thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i);
/* remove fd from list on POLLHUP after other event is processed */
if (m->handler.pfds[i].revents & POLLHUP)
{
memmove(m->handler.pfds+i,
m->handler.pfds+i+1,
(m->handler.pfdsize-i-1) * sizeof(struct pollfd));
m->handler.pfdcount--;
i--;
ready++;
}
else
m->handler.pfds[i].revents = 0;
}
}
#endif
static void
thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num)
{
#if defined (HAVE_POLL)
check_pollfds (m, rset, num);
#else
int ready = 0, index;
for (index = 0; index < m->fd_limit && ready < num; ++index)
{
ready += thread_process_fds_helper (m, m->read[index], rset);
ready += thread_process_fds_helper (m, m->write[index], wset);
ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0);
ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0);
}
return num - ready;
#endif
}
/* Add all timers that have popped to the ready list. */
@ -1193,9 +1397,11 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
thread_process (&m->event);
/* Structure copy. */
readfd = fd_copy_fd_set(m->readfd);
writefd = fd_copy_fd_set(m->writefd);
exceptfd = fd_copy_fd_set(m->exceptfd);
#if !defined(HAVE_POLL)
readfd = fd_copy_fd_set(m->handler.readfd);
writefd = fd_copy_fd_set(m->handler.writefd);
exceptfd = fd_copy_fd_set(m->handler.exceptfd);
#endif
/* Calculate select wait timer if nothing else to do */
if (m->ready.count == 0)
@ -1224,12 +1430,20 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
snmpblock = 0;
memcpy(&snmp_timer_wait, timer_wait, sizeof(struct timeval));
}
#if defined(HAVE_POLL)
/* clear fdset since there are no other fds in fd_set,
then add injected fds from snmp_select_info into pollset */
FD_ZERO(&readfd);
#endif
snmp_select_info(&fdsetsize, &readfd, &snmp_timer_wait, &snmpblock);
#if defined(HAVE_POLL)
add_snmp_pollfds(m, &readfd, fdsetsize);
#endif
if (snmpblock == 0)
timer_wait = &snmp_timer_wait;
}
#endif
num = fd_select (FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait);
/* Signals should get quick treatment */
if (num < 0)
@ -1241,6 +1455,16 @@ thread_fetch (struct thread_master *m, struct thread *fetch)
}
#if defined HAVE_SNMP && defined SNMP_AGENTX
#if defined(HAVE_POLL)
/* re-enter pollfds in fd_set for handling in snmp_read */
FD_ZERO(&readfd);
nfds_t i;
for (i = m->handler.pfdcount; i < m->handler.pfdcountsnmp; ++i)
{
if (m->handler.pfds[i].revents == POLLIN)
FD_SET(m->handler.pfds[i].fd, &readfd);
}
#endif
if (agentx_enabled)
{
if (num > 0)

View File

@ -52,6 +52,27 @@ struct pqueue;
*/
typedef fd_set thread_fd_set;
#if defined(HAVE_POLL)
#include <poll.h>
struct fd_handler
{
/* number of pfd stored in pfds */
nfds_t pfdcount;
/* number of pfd stored in pfds + number of snmp pfd */
nfds_t pfdcountsnmp;
/* number of pfd that fit in the allocated space of pfds */
nfds_t pfdsize;
struct pollfd *pfds;
};
#else
struct fd_handler
{
fd_set readfd;
fd_set writefd;
fd_set exceptfd;
};
#endif
/* Master of the theads. */
struct thread_master
{
@ -63,9 +84,7 @@ struct thread_master
struct thread_list unuse;
struct pqueue *background;
int fd_limit;
thread_fd_set readfd;
thread_fd_set writefd;
thread_fd_set exceptfd;
struct fd_handler handler;
unsigned long alloc;
};