diff --git a/lib/thread.c b/lib/thread.c index ccb635a87d..7f58aea789 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -379,12 +379,11 @@ thread_master_create (void) rv->handle_signals = true; rv->owner = pthread_self(); -#if defined(HAVE_POLL_CALL) rv->handler.pfdsize = rv->fd_limit; rv->handler.pfdcount = 0; rv->handler.pfds = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct pollfd) * rv->handler.pfdsize); -#endif + return rv; } @@ -419,18 +418,6 @@ thread_list_delete (struct thread_list *list, struct thread *thread) return thread; } -static void -thread_delete_fd (struct thread **thread_array, struct thread *thread) -{ - thread_array[thread->u.fd] = NULL; -} - -static void -thread_add_fd (struct thread **thread_array, struct thread *thread) -{ - thread_array[thread->u.fd] = thread; -} - /* Thread list is empty or not. */ static int thread_empty (struct thread_list *list) @@ -544,9 +531,7 @@ thread_master_free (struct thread_master *m) thread_queue_free (m, m->background); pthread_mutex_destroy (&m->mtx); -#if defined(HAVE_POLL_CALL) XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); -#endif XFREE (MTYPE_THREAD_MASTER, m); pthread_mutex_lock (&cpu_record_mtx); @@ -646,58 +631,9 @@ thread_get (struct thread_master *m, u_char type, return thread; } -#if defined (HAVE_POLL_CALL) - -#define fd_copy_fd_set(X) (X) - -/* generic add thread function */ -static struct thread * -generic_thread_add(struct thread_master *m, int (*func) (struct thread *), - void *arg, int fd, int dir, debugargdef) -{ - struct thread *thread; - - u_char type; - short int event; - - if (dir == THREAD_READ) - { - event = (POLLIN | POLLHUP); - type = THREAD_READ; - } - else - { - event = (POLLOUT | POLLHUP); - type = THREAD_WRITE; - } - - nfds_t queuepos = m->handler.pfdcount; - nfds_t i=0; - for (i=0; ihandler.pfdcount; i++) - if (m->handler.pfds[i].fd == fd) - { - queuepos = i; - break; - } - - /* is there enough space for a new fd? */ - assert (queuepos < m->handler.pfdsize); - - thread = thread_get (m, type, func, arg, debugargpass); - m->handler.pfds[queuepos].fd = fd; - m->handler.pfds[queuepos].events |= event; - if (queuepos == m->handler.pfdcount) - m->handler.pfdcount++; - - return thread; -} -#else - -#define fd_copy_fd_set(X) (X) -#endif - static int -fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait) +fd_poll (struct thread_master *m, struct pollfd *pfds, nfds_t pfdsize, + nfds_t count, struct timeval *timer_wait) { int num; @@ -714,7 +650,6 @@ fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set * no event is detected. If the value is zero, the behavior is default. */ -#if defined(HAVE_POLL_CALL) int timeout = -1; if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value @@ -725,57 +660,10 @@ fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set timeout = 0; num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout); -#else - struct timeval timeout; - - if (m->selectpoll_timeout > 0) // use the user's timeout - { - timeout.tv_sec = m->selectpoll_timeout / 1000; - timeout.tv_usec = (m->selectpoll_timeout % 1000) * 1000; - timer_wait = &timeout; - } - else if (m->selectpoll_timeout < 0) // effect a poll (return immediately) - { - timeout.tv_sec = 0; - timeout.tv_usec = 0; - timer_wait = &timeout; - } - num = select (size, read, write, except, timer_wait); -#endif return num; } -static int -fd_is_set (struct thread *thread, thread_fd_set *fdset, int pos) -{ -#if defined(HAVE_POLL_CALL) - return 1; -#else - return FD_ISSET (THREAD_FD (thread), fdset); -#endif -} - -static int -fd_clear_read_write (struct thread *thread) -{ -#if !defined(HAVE_POLL_CALL) - thread_fd_set *fdset = NULL; - int fd = THREAD_FD (thread); - - if (thread->type == THREAD_READ) - fdset = &thread->master->handler.readfd; - else - fdset = &thread->master->handler.writefd; - - if (!FD_ISSET (fd, fdset)) - return 0; - - FD_CLR (fd, fdset); -#endif - return 1; -} - /* Add new read thread. */ struct thread * funcname_thread_add_read_write (int dir, struct thread_master *m, @@ -792,32 +680,26 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, return NULL; } -#if defined (HAVE_POLL_CALL) - thread = generic_thread_add(m, func, arg, fd, dir, debugargpass); -#else - if (fd >= FD_SETSIZE) - { - zlog_err ("File descriptor %d is >= FD_SETSIZE (%d). Please recompile" - "with --enable-poll=yes", fd, FD_SETSIZE); - assert (fd < FD_SETSIZE && !"fd >= FD_SETSIZE"); - } - thread_fd_set *fdset = NULL; - if (dir == THREAD_READ) - fdset = &m->handler.readfd; - else - fdset = &m->handler.writefd; + /* default to a new pollfd */ + nfds_t queuepos = m->handler.pfdcount; - if (FD_ISSET (fd, fdset)) - { - zlog_warn ("There is already %s fd [%d]", - (dir == THREAD_READ) ? "read" : "write", fd); - } - else - { - FD_SET (fd, fdset); - thread = thread_get (m, dir, func, arg, debugargpass); - } -#endif + /* if we already have a pollfd for our file descriptor, find and use it */ + for (nfds_t i = 0; i < m->handler.pfdcount; i++) + if (m->handler.pfds[i].fd == fd) + { + queuepos = i; + break; + } + + assert (queuepos < m->handler.pfdsize); + + thread = thread_get (m, dir, func, arg, debugargpass); + + m->handler.pfds[queuepos].fd = fd; + m->handler.pfds[queuepos].events |= (dir == THREAD_READ ? POLLIN : POLLOUT); + + if (queuepos == m->handler.pfdcount) + m->handler.pfdcount++; if (thread) { @@ -825,9 +707,9 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, { thread->u.fd = fd; if (dir == THREAD_READ) - thread_add_fd (m->read, thread); + m->read[thread->u.fd] = thread; else - thread_add_fd (m->write, thread); + m->write[thread->u.fd] = thread; } pthread_mutex_unlock (&thread->mtx); @@ -995,10 +877,7 @@ funcname_thread_add_event (struct thread_master *m, static void thread_cancel_read_or_write (struct thread *thread, short int state) { -#if defined(HAVE_POLL_CALL) - nfds_t i; - - for (i=0;imaster->handler.pfdcount;++i) + for (nfds_t i = 0; i < thread->master->handler.pfdcount; ++i) if (thread->master->handler.pfds[i].fd == thread->u.fd) { thread->master->handler.pfds[i].events &= ~(state); @@ -1013,9 +892,6 @@ thread_cancel_read_or_write (struct thread *thread, short int state) return; } } -#endif - - fd_clear_read_write (thread); } /** @@ -1039,19 +915,11 @@ thread_cancel (struct thread *thread) switch (thread->type) { case THREAD_READ: -#if defined (HAVE_POLL_CALL) thread_cancel_read_or_write (thread, POLLIN | POLLHUP); -#else - thread_cancel_read_or_write (thread, 0); -#endif thread_array = thread->master->read; break; case THREAD_WRITE: -#if defined (HAVE_POLL_CALL) thread_cancel_read_or_write (thread, POLLOUT | POLLHUP); -#else - thread_cancel_read_or_write (thread, 0); -#endif thread_array = thread->master->write; break; case THREAD_TIMER: @@ -1082,7 +950,7 @@ thread_cancel (struct thread *thread) } else if (thread_array) { - thread_delete_fd (thread_array, thread); + thread_array[thread->u.fd] = NULL; } else { @@ -1168,7 +1036,7 @@ thread_timer_wait (struct pqueue *queue, struct timeval *timer_val) static struct thread * thread_run (struct thread_master *m, struct thread *thread, - struct thread *fetch) + struct thread *fetch) { *fetch = *thread; thread_add_unuse (m, thread); @@ -1176,7 +1044,8 @@ thread_run (struct thread_master *m, struct thread *thread, } static int -thread_process_fds_helper (struct thread_master *m, struct thread *thread, thread_fd_set *fdset, short int state, int pos) +thread_process_io_helper (struct thread_master *m, struct thread *thread, + short state, int pos) { struct thread **thread_array; @@ -1188,76 +1057,60 @@ thread_process_fds_helper (struct thread_master *m, struct thread *thread, threa else thread_array = m->write; - if (fd_is_set (thread, fdset, pos)) - { - fd_clear_read_write (thread); - thread_delete_fd (thread_array, thread); - thread_list_add (&m->ready, thread); - thread->type = THREAD_READY; -#if defined(HAVE_POLL_CALL) - thread->master->handler.pfds[pos].events &= ~(state); -#endif - return 1; - } - return 0; + thread_array[thread->u.fd] = NULL; + thread_list_add (&m->ready, thread); + thread->type = THREAD_READY; + /* if another pthread scheduled this file descriptor for the event we're + * responding to, no problem; we're getting to it now */ + thread->master->handler.pfds[pos].events &= ~(state); + return 1; } -#if defined(HAVE_POLL_CALL) - -/* check poll events */ static void -check_pollfds(struct thread_master *m, fd_set *readfd, int num) +thread_process_io (struct thread_master *m, struct pollfd *pfds, + unsigned int num, unsigned int count) { - nfds_t i = 0; - int ready = 0; - for (i = 0; i < m->handler.pfdcount && ready < num ; ++i) + unsigned int ready = 0; + + for (nfds_t i = 0; i < count && ready < num ; ++i) { - /* no event for current fd? immideatly continue */ - if(m->handler.pfds[i].revents == 0) + /* no event for current fd? immediately continue */ + if (pfds[i].revents == 0) continue; ready++; - /* POLLIN / POLLOUT process event */ - if (m->handler.pfds[i].revents & (POLLIN | POLLHUP)) - thread_process_fds_helper(m, m->read[m->handler.pfds[i].fd], NULL, POLLIN, i); - if (m->handler.pfds[i].revents & POLLOUT) - thread_process_fds_helper(m, m->write[m->handler.pfds[i].fd], NULL, POLLOUT, i); + /* Unless someone has called thread_cancel from another pthread, the only + * thing that could have changed in m->handler.pfds while we were + * asleep is the .events field in a given pollfd. Barring thread_cancel() + * that value should be a superset of the values we have in our copy, so + * there's no need to update it. Similarily, barring deletion, the fd + * should still be a valid index into the master's pfds. */ + if (pfds[i].revents & (POLLIN | POLLHUP)) + thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN, i); + if (pfds[i].revents & POLLOUT) + thread_process_io_helper(m, m->write[pfds[i].fd], POLLOUT, i); - /* remove fd from list on POLLNVAL */ - if (m->handler.pfds[i].revents & POLLNVAL) + /* if one of our file descriptors is garbage, remove the same from + * both pfds + update sizes and index */ + if (pfds[i].revents & POLLNVAL) { - memmove(m->handler.pfds+i, - m->handler.pfds+i+1, - (m->handler.pfdsize-i-1) * sizeof(struct pollfd)); - m->handler.pfdcount--; - i--; + memmove (m->handler.pfds + i, + m->handler.pfds + i + 1, + (m->handler.pfdcount - i - 1) * sizeof(struct pollfd)); + m->handler.pfdcount--; + + memmove (pfds + i, pfds + i + 1, + (count - i - 1) * sizeof(struct pollfd)); + count--; + i--; } - else - m->handler.pfds[i].revents = 0; } } -#endif - -static void -thread_process_fds (struct thread_master *m, thread_fd_set *rset, thread_fd_set *wset, int num) -{ -#if defined (HAVE_POLL_CALL) - check_pollfds (m, rset, num); -#else - int ready = 0, index; - - for (index = 0; index < m->fd_limit && ready < num; ++index) - { - ready += thread_process_fds_helper (m, m->read[index], rset, 0, 0); - ready += thread_process_fds_helper (m, m->write[index], wset, 0, 0); - } -#endif -} /* Add all timers that have popped to the ready list. */ static unsigned int -thread_timer_process (struct pqueue *queue, struct timeval *timenow) +thread_process_timers (struct pqueue *queue, struct timeval *timenow) { struct thread *thread; unsigned int ready = 0; @@ -1300,9 +1153,6 @@ struct thread * thread_fetch (struct thread_master *m, struct thread *fetch) { struct thread *thread; - thread_fd_set readfd; - thread_fd_set writefd; - thread_fd_set exceptfd; struct timeval now; struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 }; struct timeval timer_val_bg; @@ -1338,13 +1188,6 @@ thread_fetch (struct thread_master *m, struct thread *fetch) /* Normal event are the next highest priority. */ thread_process (&m->event); - /* Structure copy. */ -#if !defined(HAVE_POLL_CALL) - readfd = fd_copy_fd_set(m->handler.readfd); - writefd = fd_copy_fd_set(m->handler.writefd); - exceptfd = fd_copy_fd_set(m->handler.exceptfd); -#endif - /* Calculate select wait timer if nothing else to do */ if (m->ready.count == 0) { @@ -1362,7 +1205,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch) timer_wait = &timer_val; } - num = fd_select (m, FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait); + num = fd_poll (m, pfds, m->handler.pfdsize, count, timer_wait); /* Signals should get quick treatment */ if (num < 0) @@ -1372,20 +1215,20 @@ thread_fetch (struct thread_master *m, struct thread *fetch) pthread_mutex_unlock (&m->mtx); continue; /* signal received - process it */ } - zlog_warn ("select() error: %s", safe_strerror (errno)); + zlog_warn ("poll() error: %s", safe_strerror (errno)); pthread_mutex_unlock (&m->mtx); return NULL; } /* Check foreground timers. Historically, they have had higher - priority than I/O threads, so let's push them onto the ready - list in front of the I/O threads. */ + * priority than I/O threads, so let's push them onto the ready + * list in front of the I/O threads. */ monotime(&now); - thread_timer_process (m->timer, &now); + thread_process_timers (m->timer, &now); /* Got IO, process it */ if (num > 0) - thread_process_fds (m, &readfd, &writefd, num); + thread_process_io (m, pfds, num, count); #if 0 /* If any threads were made ready above (I/O or foreground timer), @@ -1403,7 +1246,7 @@ thread_fetch (struct thread_master *m, struct thread *fetch) #endif /* Background timer/events, lowest priority */ - thread_timer_process (m->background, &now); + thread_process_timers (m->background, &now); if ((thread = thread_trim_head (&m->ready)) != NULL) { diff --git a/lib/thread.h b/lib/thread.h index 7e79eb38dc..e40bf64855 100644 --- a/lib/thread.h +++ b/lib/thread.h @@ -22,8 +22,9 @@ #define _ZEBRA_THREAD_H #include -#include "monotime.h" #include +#include +#include "monotime.h" struct rusage_t { @@ -44,14 +45,6 @@ struct thread_list struct pqueue; -/* - * Abstract it so we can use different methodologies to - * select on data. - */ -typedef fd_set thread_fd_set; - -#if defined(HAVE_POLL_CALL) -#include struct fd_handler { /* number of pfd stored in pfds */ @@ -62,14 +55,6 @@ struct fd_handler nfds_t pfdsize; struct pollfd *pfds; }; -#else -struct fd_handler -{ - fd_set readfd; - fd_set writefd; - fd_set exceptfd; -}; -#endif /* Master of the theads. */ struct thread_master