diff --git a/services/confdb.c b/services/confdb.c index 64def8a0..0083eccd 100644 --- a/services/confdb.c +++ b/services/confdb.c @@ -71,14 +71,22 @@ static int notify_pipe[2]; struct confdb_ipc_message_holder { void *conn; - void *msg; size_t mlen; + struct list_head list; + char msg[]; }; +DECLARE_LIST_INIT(confdb_ipc_message_holder_list_head); + +pthread_mutex_t confdb_ipc_message_holder_list_mutex = + PTHREAD_MUTEX_INITIALIZER; + static int confdb_exec_init_fn ( struct corosync_api_v1 *corosync_api); static int confdb_exec_exit_fn(void); +static int fd_set_nonblocking(int fd); + static int objdb_notify_dispatch(hdb_handle_t handle, int fd, int revents, void *data); @@ -322,6 +330,8 @@ static int confdb_exec_exit_fn(void) static int confdb_exec_init_fn ( struct corosync_api_v1 *corosync_api) { + int i; + #ifdef COROSYNC_SOLARIS logsys_subsys_init(); #endif @@ -331,6 +341,12 @@ static int confdb_exec_init_fn ( return -1; } + for (i = 0; i < 2; i++) { + if (fd_set_nonblocking (notify_pipe[i]) == -1) { + return -1; + } + } + return poll_dispatch_add(api->poll_handle_get(), notify_pipe[0], POLLIN, NULL, objdb_notify_dispatch); } @@ -353,6 +369,23 @@ static int confdb_lib_exit_fn (void *conn) return (0); } +static int fd_set_nonblocking(int fd) +{ + int flags; + int res; + + flags = fcntl (fd, F_GETFL); + if (flags == -1) { + return -1; + } + + flags |= O_NONBLOCK; + + res = fcntl (fd, F_SETFL, flags); + + return res; +} + static void message_handler_req_lib_confdb_object_create (void *conn, const void *message) { @@ -827,52 +860,119 @@ static void message_handler_req_lib_confdb_reload (void *conn, static int objdb_notify_dispatch(hdb_handle_t handle, int fd, int revents, void *data) { - struct confdb_ipc_message_holder holder; + struct confdb_ipc_message_holder *holder; ssize_t rc; + char pipe_cmd; if (revents & POLLHUP) { return -1; } + + pthread_mutex_lock (&confdb_ipc_message_holder_list_mutex); + retry_read: - rc = read(fd, &holder, sizeof(struct confdb_ipc_message_holder)); - if (rc == -1 && errno == EINTR) { - goto retry_read; - } - if (rc != sizeof(struct confdb_ipc_message_holder)) { - return 0; + rc = read(fd, &pipe_cmd, sizeof(pipe_cmd)); + if (rc == sizeof(pipe_cmd)) { + goto retry_read; /* Flush whole buffer */ } - api->ipc_dispatch_send(holder.conn, holder.msg, holder.mlen); + if (rc == -1) { + if (errno == EINTR) { + goto retry_read; + } - api->ipc_refcnt_dec(holder.conn); + if (errno != EAGAIN && errno != EWOULDBLOCK) { + goto unlock_exit; + } + } else { + goto unlock_exit; /* rc != -1 && rc != 1 -> end of file */ + } + + while (!list_empty (&confdb_ipc_message_holder_list_head)) { + holder = list_entry (confdb_ipc_message_holder_list_head.next, + struct confdb_ipc_message_holder, list); + + list_del (&holder->list); + + /* + * All list operations are done now, so unlock list mutex to + * prevent deadlock in IPC. + */ + pthread_mutex_unlock (&confdb_ipc_message_holder_list_mutex); + + api->ipc_dispatch_send(holder->conn, holder->msg, holder->mlen); + + api->ipc_refcnt_dec(holder->conn); + + free(holder); + + /* + * Next operation is again list one, so lock list again. + */ + pthread_mutex_lock (&confdb_ipc_message_holder_list_mutex); + } + +unlock_exit: + pthread_mutex_unlock (&confdb_ipc_message_holder_list_mutex); - free(holder.msg); return 0; } static int32_t ipc_dispatch_send_from_poll_thread(void *conn, const void *msg, size_t mlen) { - struct confdb_ipc_message_holder holder; + struct confdb_ipc_message_holder *holder; ssize_t written; + size_t holder_size; + char pipe_cmd; api->ipc_refcnt_inc(conn); - holder.conn = conn; - holder.msg = malloc(mlen); - memcpy(holder.msg, msg, mlen); - holder.mlen = mlen; - -retry_write: - written = write(notify_pipe[1], &holder, sizeof(struct confdb_ipc_message_holder)); - if (written == -1 && errno == EINTR) { - goto retry_write; - } - if (written == sizeof(struct confdb_ipc_message_holder)) { - return 0; - } else { + holder_size = sizeof (*holder) + mlen; + holder = malloc (holder_size); + if (holder == NULL) { api->ipc_refcnt_dec(conn); return -1; } + + memset(holder, 0, holder_size); + holder->conn = conn; + holder->mlen = mlen; + memcpy(holder->msg, msg, mlen); + list_init(&holder->list); + + pthread_mutex_lock (&confdb_ipc_message_holder_list_mutex); + + list_add_tail (&holder->list, &confdb_ipc_message_holder_list_head); + + pipe_cmd = 'M'; /* Message */ +retry_write: + written = write(notify_pipe[1], &pipe_cmd, sizeof(pipe_cmd)); + + if (written == -1) { + if (errno == EINTR) { + goto retry_write; + } + + if (errno != EAGAIN && errno != EWOULDBLOCK) { + /* + * Different error then EINTR or BLOCK -> exit with error + */ + goto refcnt_del_unlock_exit; + } + } else if (written != sizeof (pipe_cmd)) { + goto refcnt_del_unlock_exit; + } + pthread_mutex_unlock (&confdb_ipc_message_holder_list_mutex); + + return 0; + +refcnt_del_unlock_exit: + list_del (&holder->list); + free(holder); + api->ipc_refcnt_dec(conn); + pthread_mutex_unlock (&confdb_ipc_message_holder_list_mutex); + + return -1; } static void confdb_notify_lib_of_key_change(object_change_type_t change_type,