diff --git a/exec/coroipcs.c b/exec/coroipcs.c index aca90533..afef48fa 100644 --- a/exec/coroipcs.c +++ b/exec/coroipcs.c @@ -670,7 +670,7 @@ static void *pthread_ipc_consumer (void *conn) #endif for (;;) { - ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT); + ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT, IPC_SEMWAIT_NOFILE); if (ipc_thread_active (conn_info) == 0) { coroipcs_refcount_dec (conn_info); pthread_exit (0); @@ -681,7 +681,7 @@ static void *pthread_ipc_consumer (void *conn) ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST, &sem_value); if (sem_value > 0) { - res = ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST); + res = ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST, IPC_SEMWAIT_NOFILE); } else { continue; } @@ -691,7 +691,6 @@ static void *pthread_ipc_consumer (void *conn) * There is no new message to process, continue for loop */ if (new_message == 0) { -printf ("continuing\n"); continue; } diff --git a/include/corosync/coroipc_ipc.h b/include/corosync/coroipc_ipc.h index acf4f02d..c3c2d23a 100644 --- a/include/corosync/coroipc_ipc.h +++ b/include/corosync/coroipc_ipc.h @@ -64,6 +64,8 @@ */ #define IPC_SEMWAIT_TIMEOUT 2 +#define IPC_SEMWAIT_NOFILE 0 + enum req_init_types { MESSAGE_REQ_RESPONSE_INIT = 0, MESSAGE_REQ_DISPATCH_INIT = 1 @@ -163,12 +165,14 @@ struct coroipcs_zc_header { static inline cs_error_t ipc_sem_wait ( struct control_buffer *control_buffer, - enum ipc_semaphore_identifiers sem_id) + enum ipc_semaphore_identifiers sem_id, + int fd) { #if _POSIX_THREAD_PROCESS_SHARED < 1 struct sembuf sop; #else struct timespec timeout; + struct pollfd pfd; sem_t *sem = NULL; #endif int res; @@ -189,25 +193,58 @@ ipc_sem_wait ( break; } - timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT; - timeout.tv_nsec = 0; + if (fd == IPC_SEMWAIT_NOFILE) { +retry_sem_wait: + res = sem_wait (sem); + if (res == -1 && errno == EINTR) { + goto retry_sem_wait; + } else + if (res == -1) { + return (CS_ERR_LIBRARY); + } + } else { + timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT; + timeout.tv_nsec = 0; retry_sem_timedwait: - res = sem_timedwait (sem, &timeout); - if (res == -1 && errno == ETIMEDOUT) { - return (CS_ERR_LIBRARY); - } else - if (res == -1 && errno == EINTR) { - goto retry_sem_timedwait; - } else - if (res == -1) { - return (CS_ERR_LIBRARY); + res = sem_timedwait (sem, &timeout); + if (res == -1 && errno == ETIMEDOUT) { + pfd.fd = fd; + pfd.events = 0; + + /* + * Determine if server has failed (ERR_LIBRARY) or + * is just performing slowly or in configuration change + * (retry sem op) + */ + +retry_poll: + res = poll (&pfd, 1, 0); + if (res == -1 && errno == EINTR) { + goto retry_poll; + } else + if (res == -1) { + return (CS_ERR_LIBRARY); + } + + if (res == 1) { + if (pfd.revents == POLLERR || + pfd.revents == POLLHUP || + pfd.revents == POLLNVAL) { + + return (CS_ERR_LIBRARY); + } + } + goto retry_sem_timedwait; + } else + if (res == -1 && errno == EINTR) { + goto retry_sem_timedwait; + } else + if (res == -1) { + return (CS_ERR_LIBRARY); + } } #else - /* - * Wait for semaphore indicating a new message from server - * to client in queue - */ sop.sem_num = sem_id; sop.sem_op = -1; sop.sem_flg = 0; diff --git a/lib/coroipcc.c b/lib/coroipcc.c index 4ece2dc6..bbeb95f6 100644 --- a/lib/coroipcc.c +++ b/lib/coroipcc.c @@ -512,7 +512,7 @@ reply_receive ( cs_error_t res; retry_ipc_sem_wait: - res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE); + res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE, ipc_instance->fd); if (res != CS_OK) { if (res == CS_ERR_TRY_AGAIN) { priv_change_send (ipc_instance); @@ -539,7 +539,7 @@ reply_receive_in_buf ( cs_error_t res; retry_ipc_sem_wait: - res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE); + res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE, ipc_instance->fd); if (res != CS_OK) { if (res == CS_ERR_TRY_AGAIN) { priv_change_send (ipc_instance); @@ -917,7 +917,7 @@ coroipcc_dispatch_put (hdb_handle_t handle) } retry_ipc_sem_wait: - res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH); + res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH, ipc_instance->fd); if (res != CS_OK) { if (res == CS_ERR_TRY_AGAIN) { priv_change_send (ipc_instance);