diff --git a/lib/lib_errors.c b/lib/lib_errors.c index b6c764d873..e0559f332d 100644 --- a/lib/lib_errors.c +++ b/lib/lib_errors.c @@ -50,6 +50,12 @@ static struct log_ref ferr_lib_warn[] = { .description = "The Event subsystem has detected a slow process, this typically indicates that FRR is having trouble completing work in a timely manner. This can be either a misconfiguration, bug, or some combination therof.", .suggestion = "Gather log data and open an Issue", }, + { + .code = EC_LIB_NO_THREAD, + .title = "The Event subsystem has detected an internal FD problem", + .description = "The Event subsystem has detected a file descriptor read/write event without an associated handling function. This is a bug, please collect log data and open an issue.", + .suggestion = "Gather log data and open an Issue", + }, { .code = EC_LIB_RMAP_RECURSION_LIMIT, .title = "Reached the Route-Map Recursion Limit", diff --git a/lib/lib_errors.h b/lib/lib_errors.h index 39b39fb065..996a16ba95 100644 --- a/lib/lib_errors.h +++ b/lib/lib_errors.h @@ -45,6 +45,7 @@ enum lib_log_refs { EC_LIB_STREAM, EC_LIB_LINUX_NS, EC_LIB_SLOW_THREAD, + EC_LIB_NO_THREAD, EC_LIB_RMAP_RECURSION_LIMIT, EC_LIB_BACKUP_CONFIG, EC_LIB_VRF_LENGTH, diff --git a/lib/thread.c b/lib/thread.c index 7489be5c2d..fc2de09df0 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -307,6 +307,7 @@ static void show_thread_poll_helper(struct vty *vty, struct thread_master *m) { const char *name = m->name ? m->name : "main"; char underline[strlen(name) + 1]; + struct thread *thread; uint32_t i; memset(underline, '-', sizeof(underline)); @@ -316,11 +317,31 @@ static void show_thread_poll_helper(struct vty *vty, struct thread_master *m) vty_out(vty, "----------------------%s\n", underline); vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount, m->fd_limit); - for (i = 0; i < m->handler.pfdcount; i++) - vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\n", i, - m->handler.pfds[i].fd, - m->handler.pfds[i].events, + for (i = 0; i < m->handler.pfdcount; i++) { + vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i, + m->handler.pfds[i].fd, m->handler.pfds[i].events, m->handler.pfds[i].revents); + + if (m->handler.pfds[i].events & POLLIN) { + thread = m->read[m->handler.pfds[i].fd]; + + if (!thread) + vty_out(vty, "ERROR "); + else + vty_out(vty, "%s ", thread->funcname); + } else + vty_out(vty, " "); + + if (m->handler.pfds[i].events & POLLOUT) { + thread = m->write[m->handler.pfds[i].fd]; + + if (!thread) + vty_out(vty, "ERROR\n"); + else + vty_out(vty, "%s\n", thread->funcname); + } else + vty_out(vty, "\n"); + } } DEFUN (show_thread_poll, @@ -756,6 +777,7 @@ struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m, debugargdef) { struct thread *thread = NULL; + struct thread **thread_array; assert(fd >= 0 && fd < m->fd_limit); pthread_mutex_lock(&m->mtx); @@ -770,11 +792,25 @@ struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m, /* default to a new pollfd */ nfds_t queuepos = m->handler.pfdcount; + if (dir == THREAD_READ) + thread_array = m->read; + else + thread_array = m->write; + /* if we already have a pollfd for our file descriptor, find and * use it */ for (nfds_t i = 0; i < m->handler.pfdcount; i++) if (m->handler.pfds[i].fd == fd) { queuepos = i; + +#ifdef DEV_BUILD + /* + * What happens if we have a thread already + * created for this event? + */ + if (thread_array[fd]) + assert(!"Thread already scheduled for file descriptor"); +#endif break; } @@ -794,10 +830,7 @@ struct thread *funcname_thread_add_read_write(int dir, struct thread_master *m, pthread_mutex_lock(&thread->mtx); { thread->u.fd = fd; - if (dir == THREAD_READ) - m->read[thread->u.fd] = thread; - else - m->write[thread->u.fd] = thread; + thread_array[thread->u.fd] = thread; } pthread_mutex_unlock(&thread->mtx); @@ -1238,12 +1271,31 @@ static struct thread *thread_run(struct thread_master *m, struct thread *thread, } static int thread_process_io_helper(struct thread_master *m, - struct thread *thread, short state, int pos) + struct thread *thread, short state, + short actual_state, int pos) { struct thread **thread_array; - if (!thread) + /* + * poll() clears the .events field, but the pollfd array we + * pass to poll() is a copy of the one used to schedule threads. + * We need to synchronize state between the two here by applying + * the same changes poll() made on the copy of the "real" pollfd + * array. + * + * This cleans up a possible infinite loop where we refuse + * to respond to a poll event but poll is insistent that + * we should. + */ + m->handler.pfds[pos].events &= ~(state); + + if (!thread) { + if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP) + flog_err(EC_LIB_NO_THREAD, + "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!\n", + m->handler.pfds[pos].fd, actual_state); return 0; + } if (thread->type == THREAD_READ) thread_array = m->read; @@ -1253,9 +1305,7 @@ static int thread_process_io_helper(struct thread_master *m, thread_array[thread->u.fd] = NULL; thread_list_add_tail(&m->ready, thread); thread->type = THREAD_READY; - /* if another pthread scheduled this file descriptor for the event we're - * responding to, no problem; we're getting to it now */ - thread->master->handler.pfds[pos].events &= ~(state); + return 1; } @@ -1291,12 +1341,13 @@ static void thread_process_io(struct thread_master *m, unsigned int num) * there's no need to update it. Similarily, barring deletion, * the fd * should still be a valid index into the master's pfds. */ - if (pfds[i].revents & (POLLIN | POLLHUP)) + if (pfds[i].revents & (POLLIN | POLLHUP)) { thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN, - i); + pfds[i].revents, i); + } if (pfds[i].revents & POLLOUT) thread_process_io_helper(m, m->write[pfds[i].fd], - POLLOUT, i); + POLLOUT, pfds[i].revents, i); /* if one of our file descriptors is garbage, remove the same * from