From 3c7f8b7c05af79a13e37ed2c216dbcfd26dd6814 Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Thu, 27 Apr 2006 01:39:10 +0000 Subject: [PATCH] This patch reworks IPC to use threads instead of the main poll loop git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1011 fd59a12c-fef9-0310-b244-a6a79926bd2f --- exec/Makefile | 4 +- exec/aispoll.c | 8 +- exec/aispoll.h | 2 +- exec/evt.c | 11 + exec/main.c | 936 +++----------------------------------------- exec/objdb.c | 3 +- exec/totemnet.c | 3 +- exec/totempg.c | 12 +- exec/totemrrp.c | 3 +- exec/totemsrp.c | 4 +- include/hdb.h | 19 + include/queue.h | 49 ++- lib/evt.c | 1 - test/subscription.c | 7 + 14 files changed, 166 insertions(+), 896 deletions(-) diff --git a/exec/Makefile b/exec/Makefile index 4134f910..6dcd1f51 100644 --- a/exec/Makefile +++ b/exec/Makefile @@ -58,9 +58,9 @@ LCR_OBJS = evs.o clm.o amf.o ckpt.o evt.o lck.o msg.o cfg.o cpg.o amfconfig.o ai # main executive objects MAIN_SRC = main.c print.c mempool.c \ - util.c sync.c ykd.c service.c totemconfig.c mainconfig.c + util.c sync.c ykd.c service.c ipc.c totemconfig.c mainconfig.c MAIN_OBJS = main.o print.o mempool.o \ - util.o sync.o service.o totemconfig.o mainconfig.o ../lcr/lcr_ifact.o + util.o sync.o service.o ipc.o totemconfig.o mainconfig.o ../lcr/lcr_ifact.o OTHER_OBJS = objdb.o ifeq (${BUILD_DYNAMIC}, 1) diff --git a/exec/aispoll.c b/exec/aispoll.c index 242783a5..0080cb9b 100644 --- a/exec/aispoll.c +++ b/exec/aispoll.c @@ -57,6 +57,7 @@ struct poll_instance { struct pollfd *ufds; int poll_entry_count; struct timerlist timerlist; + pthread_mutex_t *serialize; }; /* @@ -68,7 +69,7 @@ static struct hdb_handle_database poll_instance_database = { .iterator = 0 }; -poll_handle poll_create (void) +poll_handle poll_create (pthread_mutex_t *serialize) { poll_handle handle; struct poll_instance *poll_instance; @@ -88,6 +89,7 @@ poll_handle poll_create (void) poll_instance->poll_entries = 0; poll_instance->ufds = 0; poll_instance->poll_entry_count = 0; + poll_instance->serialize = serialize; timerlist_init (&poll_instance->timerlist); return (handle); @@ -386,7 +388,9 @@ int poll_run ( &poll_instance->poll_entries[i].ufd, sizeof (struct pollfd)); } + pthread_mutex_lock (poll_instance->serialize); timeout = timerlist_timeout_msec (&poll_instance->timerlist); + pthread_mutex_unlock (poll_instance->serialize); retry_poll: res = poll (poll_instance->ufds, @@ -402,6 +406,7 @@ retry_poll: for (i = 0; i < poll_entry_count; i++) { if (poll_instance->ufds[i].fd != -1 && poll_instance->ufds[i].revents) { + pthread_mutex_lock (poll_instance->serialize); res = poll_instance->poll_entries[i].dispatch_fn (handle, poll_instance->ufds[i].fd, @@ -415,6 +420,7 @@ retry_poll: if (res == -1) { poll_instance->poll_entries[i].ufd.fd = -1; /* empty entry */ } + pthread_mutex_unlock (poll_instance->serialize); } } timerlist_expire (&poll_instance->timerlist); diff --git a/exec/aispoll.h b/exec/aispoll.h index fc162bcc..a8d48891 100644 --- a/exec/aispoll.h +++ b/exec/aispoll.h @@ -37,7 +37,7 @@ typedef void * poll_timer_handle; typedef unsigned int poll_handle; -poll_handle poll_create (void); +poll_handle poll_create (pthread_mutex_t *mutex); int poll_destroy (poll_handle poll_handle); diff --git a/exec/evt.c b/exec/evt.c index 8ad3614f..555b3a3e 100644 --- a/exec/evt.c +++ b/exec/evt.c @@ -1838,6 +1838,7 @@ deliver_event(struct event_data *evt, } } + assert (esip->esi_nevents >= 0); if (!esip->esi_queue_blocked && (esip->esi_nevents >= evt_delivery_queue_size)) { log_printf(LOG_LEVEL_DEBUG, "block\n"); @@ -2106,6 +2107,11 @@ static int evt_lib_init(void *conn) */ memset(libevt_pd, 0, sizeof(*libevt_pd)); + /* + * Initialize the open channel handle database. + */ + hdb_create(&libevt_pd->esi_hdb); + /* * list of channels open on this instance */ @@ -3013,6 +3019,11 @@ static int evt_lib_exit(void *conn) } } + /* + * Destroy the open channel handle database + */ + hdb_destroy(&esip->esi_hdb); + return 0; } diff --git a/exec/main.c b/exec/main.c index a07a0a01..2974aec4 100644 --- a/exec/main.c +++ b/exec/main.c @@ -1,7 +1,6 @@ /* - * vi: set autoindent tabstop=4 shiftwidth=4 : - * * Copyright (c) 2002-2006 MontaVista Software, Inc. + * Copyright (c) 2006 Red Hat, Inc.. * * All rights reserved. * @@ -72,6 +71,7 @@ #include "swab.h" #include "objdb.h" #include "config.h" +#include "ipc.h" #define LOG_SERVICE LOG_SERVICE_MAIN #include "print.h" @@ -87,78 +87,12 @@ static int gid_valid = 0; static unsigned int service_count = 32; -struct outq_item { - void *msg; - size_t mlen; -}; +static struct totem_logging_configuration totem_logging_configuration; -enum conn_state { - CONN_STATE_ACTIVE, - CONN_STATE_DISCONNECTING, - CONN_STATE_DISCONNECTING_DELAYED -}; +static char delivery_data[MESSAGE_SIZE_MAX]; -struct conn_info { - int fd; /* File descriptor */ - enum conn_state state; /* State of this connection */ - char *inb; /* Input buffer for non-blocking reads */ - int inb_nextheader; /* Next message header starts here */ - int inb_start; /* Start location of input buffer */ - int inb_inuse; /* Bytes currently stored in input buffer */ - struct queue outq; /* Circular queue for outgoing requests */ - int byte_start; /* Byte to start sending from in head of queue */ - enum service_types service;/* Type of service so dispatch knows how to route message */ - int authenticated; /* Is this connection authenticated? */ - void *private_data; /* library connection private data */ - struct conn_info *conn_info_partner; /* partner connection dispatch<->response */ - int should_exit_fn; /* Should call the exit function when closing this ipc */ -}; SaClmClusterNodeT *(*main_clm_get_by_nodeid) (unsigned int node_id); - /* - * IPC Initializers - */ -static int dispatch_init_send_response (struct conn_info *conn_info, void *message); - -static int response_init_send_response (struct conn_info *conn_info, void *message); - -static int (*ais_init_service[]) (struct conn_info *conn_info, void *message) = { - response_init_send_response, - dispatch_init_send_response -}; - -static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent, void *data, unsigned int *prio); - - -static inline struct conn_info *conn_info_create (int fd) { - struct conn_info *conn_info; - int res; - - conn_info = malloc (sizeof (struct conn_info)); - if (conn_info == 0) { - return (0); - } - - memset (conn_info, 0, sizeof (struct conn_info)); - res = queue_init (&conn_info->outq, SIZEQUEUE, - sizeof (struct outq_item)); - if (res != 0) { - free (conn_info); - return (0); - } - conn_info->inb = malloc (sizeof (char) * SIZEINB); - if (conn_info->inb == 0) { - queue_free (&conn_info->outq); - free (conn_info); - return (0); - } - - conn_info->state = CONN_STATE_ACTIVE; - conn_info->fd = fd; - conn_info->service = SOCKET_SERVICE_INIT; - return (conn_info); -} - #ifdef COMPILE_OUT static void sigusr2_handler (int num) { @@ -179,18 +113,6 @@ struct totem_ip_address *this_ip; struct totem_ip_address this_non_loopback_ip; #define LOCALHOST_IP inet_addr("127.0.0.1") -#if defined(OPENAIS_LINUX) -/* SUN_LEN is broken for abstract namespace - */ -#define AIS_SUN_LEN(a) sizeof(*(a)) - -char *socketname = "libais.socket"; -#else -#define AIS_SUN_LEN(a) SUN_LEN(a) - -char *socketname = "/var/run/libais.socket"; -#endif - totempg_groups_handle openais_group_handle; struct totempg_group openais_group = { @@ -198,673 +120,6 @@ struct totempg_group openais_group = { .group_len = 1 }; - -static int libais_connection_active (struct conn_info *conn_info) -{ - return (conn_info->state == CONN_STATE_ACTIVE); -} - -static void libais_disconnect_delayed (struct conn_info *conn_info) -{ - conn_info->state = CONN_STATE_DISCONNECTING_DELAYED; - conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTING_DELAYED; -} - -static int libais_disconnect (struct conn_info *conn_info) -{ - int res = 0; - struct outq_item *outq_item; - - if (conn_info->should_exit_fn && - ais_service[conn_info->service]->lib_exit_fn) { - - res = ais_service[conn_info->service]->lib_exit_fn (conn_info); - } - - /* - * Call library exit handler and free private data - */ - if (conn_info->conn_info_partner && - conn_info->conn_info_partner->should_exit_fn && - ais_service[conn_info->conn_info_partner->service]->lib_exit_fn) { - - res = ais_service[conn_info->conn_info_partner->service]->lib_exit_fn (conn_info->conn_info_partner); - if (conn_info->private_data) { - free (conn_info->private_data); - } - } - - /* - * Close the library connection and free its - * data if it hasn't already been freed - */ - if (conn_info->state != CONN_STATE_DISCONNECTING) { - conn_info->state = CONN_STATE_DISCONNECTING; - - close (conn_info->fd); - - /* - * Free the outq queued items - */ - while (!queue_is_empty (&conn_info->outq)) { - outq_item = queue_item_get (&conn_info->outq); - free (outq_item->msg); - queue_item_remove (&conn_info->outq); - } - - queue_free (&conn_info->outq); - free (conn_info->inb); - } - - /* - * Close the library connection and free its - * data if it hasn't already been freed - */ - if (conn_info->conn_info_partner && - conn_info->conn_info_partner->state != CONN_STATE_DISCONNECTING) { - - conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTING; - - close (conn_info->conn_info_partner->fd); - - /* - * Free the outq queued items - */ - while (!queue_is_empty (&conn_info->conn_info_partner->outq)) { - outq_item = queue_item_get (&conn_info->conn_info_partner->outq); - free (outq_item->msg); - queue_item_remove (&conn_info->conn_info_partner->outq); - } - - queue_free (&conn_info->conn_info_partner->outq); - if (conn_info->conn_info_partner->inb) { - free (conn_info->conn_info_partner->inb); - } - } - - /* - * If exit_fn didn't request a retry, - * free the conn_info structure - */ - if (res != -1) { - if (conn_info->conn_info_partner) { - poll_dispatch_delete (aisexec_poll_handle, - conn_info->conn_info_partner->fd); - } - poll_dispatch_delete (aisexec_poll_handle, conn_info->fd); - - free (conn_info->conn_info_partner); - free (conn_info); - } - - /* - * Inverse res from libais exit fn handler - */ - return (res != -1 ? -1 : 0); -} - -static int cleanup_send_response (struct conn_info *conn_info) { - struct queue *outq; - int res = 0; - struct outq_item *queue_item; - struct msghdr msg_send; - struct iovec iov_send; - char *msg_addr; - - if (!libais_connection_active (conn_info)) { - return (-1); - } - outq = &conn_info->outq; - - msg_send.msg_iov = &iov_send; - msg_send.msg_name = 0; - msg_send.msg_namelen = 0; - msg_send.msg_iovlen = 1; - msg_send.msg_control = 0; - msg_send.msg_controllen = 0; - msg_send.msg_flags = 0; - - while (!queue_is_empty (outq)) { - queue_item = queue_item_get (outq); - msg_addr = (char *)queue_item->msg; - msg_addr = &msg_addr[conn_info->byte_start]; - - iov_send.iov_base = msg_addr; - iov_send.iov_len = queue_item->mlen - conn_info->byte_start; - -retry_sendmsg: - res = sendmsg (conn_info->fd, &msg_send, MSG_NOSIGNAL); - if (res == -1 && errno == EINTR) { - goto retry_sendmsg; - } - if (res == -1 && errno == EAGAIN) { - break; /* outgoing kernel queue full */ - } - if (res == -1) { - return (-1); /* message couldn't be sent */ - } - if (res + conn_info->byte_start != queue_item->mlen) { - conn_info->byte_start += res; - break; - } - - /* - * Message sent, try sending another message - */ - queue_item_remove (outq); - conn_info->byte_start = 0; - free (queue_item->msg); - } /* while queue not empty */ - - if (queue_is_empty (outq)) { - poll_dispatch_modify (aisexec_poll_handle, conn_info->fd, - POLLIN|POLLNVAL, poll_handler_libais_deliver, 0); - } - return (0); -} - -int openais_conn_send_response ( - void *conn, - void *msg, - int mlen) -{ - struct queue *outq; - char *cmsg; - int res = 0; - int queue_empty; - struct outq_item *queue_item; - struct outq_item queue_item_out; - struct msghdr msg_send; - struct iovec iov_send; - char *msg_addr; - struct conn_info *conn_info = (struct conn_info *)conn; - - if (conn_info == NULL) { - return -1; - } - - if (!libais_connection_active (conn_info)) { - return (-1); - } - outq = &conn_info->outq; - - msg_send.msg_iov = &iov_send; - msg_send.msg_name = 0; - msg_send.msg_namelen = 0; - msg_send.msg_iovlen = 1; - msg_send.msg_control = 0; - msg_send.msg_controllen = 0; - msg_send.msg_flags = 0; - - if (queue_is_full (outq)) { - /* - * Start a disconnect if we have not already started one - * and report that the outgoing queue is full - */ - log_printf (LOG_LEVEL_ERROR, "Library queue is full, disconnecting library connection.\n"); - libais_disconnect_delayed (conn_info); - return (-1); - } - while (!queue_is_empty (outq)) { - queue_item = queue_item_get (outq); - msg_addr = (char *)queue_item->msg; - msg_addr = &msg_addr[conn_info->byte_start]; - - iov_send.iov_base = msg_addr; - iov_send.iov_len = queue_item->mlen - conn_info->byte_start; - -retry_sendmsg: - res = sendmsg (conn_info->fd, &msg_send, MSG_NOSIGNAL); - if (res == -1 && errno == EINTR) { - goto retry_sendmsg; - } - if (res == -1 && errno == EAGAIN) { - break; /* outgoing kernel queue full */ - } - if (res == -1) { - break; /* some other error, stop trying to send message */ - } - if (res + conn_info->byte_start != queue_item->mlen) { - conn_info->byte_start += res; - break; - } - - /* - * Message sent, try sending another message - */ - queue_item_remove (outq); - conn_info->byte_start = 0; - free (queue_item->msg); - } /* while queue not empty */ - - res = -1; - - queue_empty = queue_is_empty (outq); - /* - * Send requested message - */ - if (queue_empty) { - - iov_send.iov_base = msg; - iov_send.iov_len = mlen; -retry_sendmsg_two: - res = sendmsg (conn_info->fd, &msg_send, MSG_NOSIGNAL); - if (res == -1 && errno == EINTR) { - goto retry_sendmsg_two; - } - if (res == -1 && errno == EAGAIN) { - conn_info->byte_start = 0; - poll_dispatch_modify (aisexec_poll_handle, conn_info->fd, - POLLIN|POLLNVAL, poll_handler_libais_deliver, 0); - } - if (res != -1) { - if (res + conn_info->byte_start != mlen) { - conn_info->byte_start += res; - res = -1; - } else { - conn_info->byte_start = 0; - poll_dispatch_modify (aisexec_poll_handle, conn_info->fd, - POLLIN|POLLNVAL, poll_handler_libais_deliver, 0); - } - } - } - - /* - * If res == -1 , errrno == EAGAIN which means kernel queue full - */ - if (res == -1) { - cmsg = malloc (mlen); - if (cmsg == 0) { - log_printf (LOG_LEVEL_ERROR, "Library queue couldn't allocate a message, disconnecting library connection.\n"); - libais_disconnect_delayed (conn_info); - return (-1); - } - queue_item_out.msg = cmsg; - queue_item_out.mlen = mlen; - memcpy (cmsg, msg, mlen); - queue_item_add (outq, &queue_item_out); - - poll_dispatch_modify (aisexec_poll_handle, conn_info->fd, - POLLOUT|POLLIN|POLLNVAL, poll_handler_libais_deliver, 0); - } - return (0); -} - -static int poll_handler_libais_accept ( - poll_handle handle, - int fd, - int revent, - void *data, - unsigned int *prio) -{ - socklen_t addrlen; - struct conn_info *conn_info; - struct sockaddr_un un_addr; - int new_fd; -#ifdef OPENAIS_LINUX - int on = 1; -#endif - int res; - - addrlen = sizeof (struct sockaddr_un); - -retry_accept: - new_fd = accept (fd, (struct sockaddr *)&un_addr, &addrlen); - if (new_fd == -1 && errno == EINTR) { - goto retry_accept; - } - - if (new_fd == -1) { - log_printf (LOG_LEVEL_ERROR, "ERROR: Could not accept Library connection: %s\n", strerror (errno)); - return (0); /* This is an error, but -1 would indicate disconnect from poll loop */ - } - - totemip_nosigpipe(new_fd); - res = fcntl (new_fd, F_SETFL, O_NONBLOCK); - if (res == -1) { - log_printf (LOG_LEVEL_ERROR, "Could not set non-blocking operation on library connection: %s\n", strerror (errno)); - close (new_fd); - return (0); /* This is an error, but -1 would indicate disconnect from poll loop */ - } - - /* - * Valid accept - */ - - /* - * Request credentials of sender provided by kernel - */ -#ifdef OPENAIS_LINUX - setsockopt(new_fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on)); -#endif - - log_printf (LOG_LEVEL_DEBUG, "connection received from libais client %d.\n", new_fd); - - conn_info = conn_info_create (new_fd); - if (conn_info == 0) { - close (new_fd); - return (0); /* This is an error, but -1 would indicate disconnect from poll */ - } - - poll_dispatch_add (aisexec_poll_handle, new_fd, POLLIN|POLLNVAL, conn_info, - poll_handler_libais_deliver, 0); - return (0); -} - -static int dispatch_init_send_response (struct conn_info *conn_info, void *message) -{ - SaAisErrorT error = SA_AIS_ERR_ACCESS; - struct req_lib_dispatch_init *req_lib_dispatch_init = (struct req_lib_dispatch_init *)message; - struct res_lib_dispatch_init res_lib_dispatch_init; - struct conn_info *msg_conn_info; - - if (conn_info->authenticated) { - conn_info->service = req_lib_dispatch_init->resdis_header.service; - if (!ais_service[req_lib_dispatch_init->resdis_header.service]) - error = SA_AIS_ERR_NOT_SUPPORTED; - else - error = SA_AIS_OK; - - conn_info->conn_info_partner = (struct conn_info *)req_lib_dispatch_init->conn_info; - - msg_conn_info = (struct conn_info *)req_lib_dispatch_init->conn_info; - msg_conn_info->conn_info_partner = conn_info; - - if (error == SA_AIS_OK) { - int private_data_size; - - private_data_size = ais_service[req_lib_dispatch_init->resdis_header.service]->private_data_size; - if (private_data_size) { - conn_info->private_data = malloc (private_data_size); - - conn_info->conn_info_partner->private_data = conn_info->private_data; - if (conn_info->private_data == NULL) { - error = SA_AIS_ERR_NO_MEMORY; - } else { - memset (conn_info->private_data, 0, private_data_size); - } - } else { - conn_info->private_data = NULL; - conn_info->conn_info_partner->private_data = NULL; - } - } - - res_lib_dispatch_init.header.size = sizeof (struct res_lib_dispatch_init); - res_lib_dispatch_init.header.id = MESSAGE_RES_INIT; - res_lib_dispatch_init.header.error = error; - - openais_conn_send_response ( - conn_info, - &res_lib_dispatch_init, - sizeof (res_lib_dispatch_init)); - - if (error != SA_AIS_OK) { - return (-1); - } - - } - - conn_info->should_exit_fn = 1; - ais_service[req_lib_dispatch_init->resdis_header.service]->lib_init_fn (conn_info); - return (0); -} - -void *openais_conn_partner_get (void *conn) -{ - struct conn_info *conn_info = (struct conn_info *)conn; - - if (conn != NULL) { - return ((void *)conn_info->conn_info_partner); - } else { - return NULL; - } -} - -void *openais_conn_private_data_get (void *conn) -{ - struct conn_info *conn_info = (struct conn_info *)conn; - - if (conn != NULL) { - return ((void *)conn_info->private_data); - } else { - return NULL; - } -} - -static int response_init_send_response (struct conn_info *conn_info, void *message) -{ - SaAisErrorT error = SA_AIS_ERR_ACCESS; - struct req_lib_response_init *req_lib_response_init = (struct req_lib_response_init *)message; - struct res_lib_response_init res_lib_response_init; - - if (conn_info->authenticated) { - conn_info->service = req_lib_response_init->resdis_header.service; - error = SA_AIS_OK; - } - res_lib_response_init.header.size = sizeof (struct res_lib_response_init); - res_lib_response_init.header.id = MESSAGE_RES_INIT; - res_lib_response_init.header.error = error; - res_lib_response_init.conn_info = (unsigned long)conn_info; - - openais_conn_send_response ( - conn_info, - &res_lib_response_init, - sizeof (res_lib_response_init)); - - if (error == SA_AIS_ERR_ACCESS) { - return (-1); - } - conn_info->should_exit_fn = 0; - return (0); -} - -struct res_overlay { - struct res_header header; - char buf[4096]; -}; - -static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent, void *data, unsigned int *prio) -{ - int res; - struct conn_info *conn_info = (struct conn_info *)data; - struct req_header *header; - int service; - struct msghdr msg_recv; - struct iovec iov_recv; -#ifdef OPENAIS_LINUX - struct cmsghdr *cmsg; - char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))]; - struct ucred *cred; - int on = 0; -#else - uid_t euid; - gid_t egid; -#endif - int send_ok = 0; - int send_ok_joined = 0; - struct iovec send_ok_joined_iovec; - struct res_overlay res_overlay; - - if (revent & (POLLERR|POLLHUP)) { - res = libais_disconnect (conn_info); - return (res); - } - - /* - * Handle delayed disconnections - */ - if (conn_info->state == CONN_STATE_DISCONNECTING_DELAYED) { - res = libais_disconnect (conn_info); - return (res); - } - - if (conn_info->state == CONN_STATE_DISCONNECTING) { - return (0); - } - - if (revent & POLLOUT) { - cleanup_send_response (conn_info); - } - - if ((revent & POLLIN) == 0) { - return (0); - } - - msg_recv.msg_iov = &iov_recv; - msg_recv.msg_iovlen = 1; - msg_recv.msg_name = 0; - msg_recv.msg_namelen = 0; - msg_recv.msg_flags = 0; - - if (conn_info->authenticated) { - msg_recv.msg_control = 0; - msg_recv.msg_controllen = 0; - } else { -#ifdef OPENAIS_LINUX - msg_recv.msg_control = (void *)cmsg_cred; - msg_recv.msg_controllen = sizeof (cmsg_cred); -#else - euid = -1; egid = -1; - if (getpeereid(fd, &euid, &egid) != -1 && - (euid == 0 || egid == gid_valid)) { - conn_info->authenticated = 1; - } - if (conn_info->authenticated == 0) { - log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, expecting %d\n", egid, gid_valid); - } -#endif - } - - iov_recv.iov_base = &conn_info->inb[conn_info->inb_start]; - iov_recv.iov_len = (SIZEINB) - conn_info->inb_start; - assert (iov_recv.iov_len != 0); - -retry_recv: - res = recvmsg (fd, &msg_recv, MSG_NOSIGNAL); - if (res == -1 && errno == EINTR) { - goto retry_recv; - } else - if (res == -1 && errno != EAGAIN) { - goto error_disconnect; - } else - if (res == 0) { - goto error_disconnect; - return (-1); - } - - /* - * Authenticate if this connection has not been authenticated - */ -#ifdef OPENAIS_LINUX - if (conn_info->authenticated == 0) { - cmsg = CMSG_FIRSTHDR (&msg_recv); - cred = (struct ucred *)CMSG_DATA (cmsg); - if (cred) { - if (cred->uid == 0 || cred->gid == gid_valid) { - setsockopt(fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on)); - conn_info->authenticated = 1; - } - } - if (conn_info->authenticated == 0) { - log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, expecting %d\n", cred->gid, gid_valid); - } - } -#endif - /* - * Dispatch all messages received in recvmsg that can be dispatched - * sizeof (struct req_header) needed at minimum to do any processing - */ - conn_info->inb_inuse += res; - conn_info->inb_start += res; - - while (conn_info->inb_inuse >= sizeof (struct req_header) && res != -1) { - header = (struct req_header *)&conn_info->inb[conn_info->inb_start - conn_info->inb_inuse]; - - if (header->size > conn_info->inb_inuse) { - break; - } - service = conn_info->service; - - /* - * If this service is in init phase, initialize service - * else handle message using service service - */ - if (service == SOCKET_SERVICE_INIT) { - res = ais_init_service[header->id] (conn_info, header); -// TODO error in init_two_fn needs to be handled - } else { - /* - * Not an init service, but a standard service - */ - if (header->id < 0 || header->id > ais_service[service]->lib_service_count) { - log_printf (LOG_LEVEL_SECURITY, "Invalid header id is %d min 0 max %d\n", - header->id, ais_service[service]->lib_service_count); - res = -1; - goto error_disconnect; - } - - /* - * If flow control is required of the library handle, determine that - * openais is not in synchronization and that totempg has room available - * to queue a message, otherwise tell the library we are busy and to - * try again later - */ - send_ok_joined_iovec.iov_base = header; - send_ok_joined_iovec.iov_len = header->size; - send_ok_joined = totempg_groups_send_ok_joined (openais_group_handle, - &send_ok_joined_iovec, 1); - - send_ok = - (sync_primary_designated() == 1) && ( - (ais_service[service]->lib_service[header->id].flow_control == OPENAIS_FLOW_CONTROL_NOT_REQUIRED) || - ((ais_service[service]->lib_service[header->id].flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) && - (send_ok_joined) && - (sync_in_process() == 0))); - - if (send_ok) { - // *prio = 0; - ais_service[service]->lib_service[header->id].lib_handler_fn(conn_info, header); - } else { - // *prio = (*prio) + 1; - - /* - * Overload, tell library to retry - */ - res_overlay.header.size = - ais_service[service]->lib_service[header->id].response_size; - res_overlay.header.id = - ais_service[service]->lib_service[header->id].response_id; - res_overlay.header.error = SA_AIS_ERR_TRY_AGAIN; - openais_conn_send_response ( - conn_info, - &res_overlay, - res_overlay.header.size); - } - } - conn_info->inb_inuse -= header->size; - } /* while */ - - if (conn_info->inb_inuse == 0) { - conn_info->inb_start = 0; - } else -// BUG if (connections[fd].inb_start + connections[fd].inb_inuse >= SIZEINB) { - if (conn_info->inb_start >= SIZEINB) { - /* - * If in buffer is full, move it back to start - */ - memmove (conn_info->inb, - &conn_info->inb[conn_info->inb_start - conn_info->inb_inuse], - sizeof (char) * conn_info->inb_inuse); - conn_info->inb_start = conn_info->inb_inuse; - } - - return (0); - -error_disconnect: - res = libais_disconnect (conn_info); - return (res); -} - void sigintr_handler (int signum) { @@ -910,56 +165,6 @@ static int openais_sync_callbacks_retrieve (int sync_id, return (0); } -char delivery_data[MESSAGE_SIZE_MAX]; - -static void deliver_fn ( - struct totem_ip_address *source_addr, - struct iovec *iovec, - int iov_len, - int endian_conversion_required) -{ - struct req_header *header; - int pos = 0; - int i; - int service; - int fn_id; - - /* - * Build buffer without iovecs to make processing easier - * This is only used for messages which are multicast with iovecs - * and self-delivered. All other mechanisms avoid the copy. - */ - if (iov_len > 1) { - for (i = 0; i < iov_len; i++) { - memcpy (&delivery_data[pos], iovec[i].iov_base, iovec[i].iov_len); - pos += iovec[i].iov_len; - assert (pos < MESSAGE_SIZE_MAX); - } - header = (struct req_header *)delivery_data; - } else { - header = (struct req_header *)iovec[0].iov_base; - } - if (endian_conversion_required) { - header->id = swab32 (header->id); - header->size = swab32 (header->size); - } - -// assert(iovec->iov_len == header->size); - - /* - * Call the proper executive handler - */ - service = header->id >> 16; - fn_id = header->id & 0xffff; - if (endian_conversion_required) { - ais_service[service]->exec_service[fn_id].exec_endian_convert_fn - (header); - } - - ais_service[service]->exec_service[fn_id].exec_handler_fn - (header, source_addr); -} - static struct memb_ring_id aisexec_ring_id; static void confchg_fn ( @@ -1054,52 +259,6 @@ static void aisexec_tty_detach (void) #endif } -static void aisexec_libais_bind (int *server_fd) -{ - int libais_server_fd; - struct sockaddr_un un_addr; - int res; - - /* - * Create socket for libais clients, name socket, listen for connections - */ - libais_server_fd = socket (PF_UNIX, SOCK_STREAM, 0); - if (libais_server_fd == -1) { - log_printf (LOG_LEVEL_ERROR ,"Cannot create libais client connections socket.\n"); - openais_exit_error (AIS_DONE_LIBAIS_SOCKET); - }; - - totemip_nosigpipe(libais_server_fd); - res = fcntl (libais_server_fd, F_SETFL, O_NONBLOCK); - if (res == -1) { - log_printf (LOG_LEVEL_ERROR, "Could not set non-blocking operation on server socket: %s\n", strerror (errno)); - openais_exit_error (AIS_DONE_LIBAIS_SOCKET); - } - -#if !defined(OPENAIS_LINUX) - unlink(socketname); -#endif - memset (&un_addr, 0, sizeof (struct sockaddr_un)); - un_addr.sun_family = AF_UNIX; -#if defined(OPENAIS_BSD) || defined(OPENAIS_DARWIN) - un_addr.sun_len = sizeof(struct sockaddr_un); -#endif -#if defined(OPENAIS_LINUX) - strcpy (un_addr.sun_path + 1, socketname); -#else - strcpy (un_addr.sun_path, socketname); -#endif - - res = bind (libais_server_fd, (struct sockaddr *)&un_addr, AIS_SUN_LEN(&un_addr)); - if (res) { - log_printf (LOG_LEVEL_ERROR, "ERROR: Could not bind AF_UNIX: %s.\n", strerror (errno)); - openais_exit_error (AIS_DONE_LIBAIS_BIND); - } - listen (libais_server_fd, SERVER_BACKLOG); - - *server_fd = libais_server_fd; -} - static void aisexec_setscheduler (void) { #if defined(OPENAIS_BSD) || defined(OPENAIS_LINUX) @@ -1145,33 +304,57 @@ static void aisexec_mlockall (void) #endif } -int message_source_is_local(struct message_source *source) -{ - int ret = 0; - assert (source != NULL); - if ((totemip_localhost_check(&source->addr) - ||(totemip_equal(&source->addr, &this_non_loopback_ip)))) { - ret = 1; +static void deliver_fn ( + struct totem_ip_address *source_addr, + struct iovec *iovec, + int iov_len, + int endian_conversion_required) +{ + struct req_header *header; + int pos = 0; + int i; + int service; + int fn_id; + + /* + * Build buffer without iovecs to make processing easier + * This is only used for messages which are multicast with iovecs + * and self-delivered. All other mechanisms avoid the copy. + */ + if (iov_len > 1) { + for (i = 0; i < iov_len; i++) { + memcpy (&delivery_data[pos], iovec[i].iov_base, iovec[i].iov_len); + pos += iovec[i].iov_len; + assert (pos < MESSAGE_SIZE_MAX); + } + header = (struct req_header *)delivery_data; + } else { + header = (struct req_header *)iovec[0].iov_base; } - return ret; + if (endian_conversion_required) { + header->id = swab32 (header->id); + header->size = swab32 (header->size); + } + +// assert(iovec->iov_len == header->size); + + /* + * Call the proper executive handler + */ + service = header->id >> 16; + fn_id = header->id & 0xffff; + if (endian_conversion_required) { + ais_service[service]->exec_service[fn_id].exec_endian_convert_fn + (header); + } + + ais_service[service]->exec_service[fn_id].exec_handler_fn + (header, source_addr); } -void message_source_set ( - struct message_source *source, - void *conn) -{ - assert ((source != NULL) && (conn != NULL)); - totemip_copy(&source->addr, this_ip); - source->conn = conn; -} - - -struct totem_logging_configuration totem_logging_configuration; - int main (int argc, char **argv) { - int libais_server_fd; char *error_string; struct main_config main_config; struct totem_config totem_config; @@ -1195,9 +378,7 @@ int main (int argc, char **argv) totemip_localhost(AF_INET, &this_non_loopback_ip); - aisexec_poll_handle = poll_create (); - -//TODO signal (SIGUSR2, sigusr2_handler); + aisexec_poll_handle = poll_create (openais_ipc_mutex_get()); /* * Load the object database interface @@ -1308,6 +489,10 @@ int main (int argc, char **argv) * there is more then one interface in a system, so * in this case, only a warning is printed */ + /* + * Join multicast group and setup delivery + * and configuration change functions + */ totempg_initialize ( aisexec_poll_handle, &totem_config); @@ -1349,23 +534,12 @@ int main (int argc, char **argv) signal (SIGINT, sigintr_handler); - aisexec_libais_bind (&libais_server_fd); + openais_ipc_init (aisexec_poll_handle, gid_valid, &this_non_loopback_ip); aisexec_tty_detach (); log_printf (LOG_LEVEL_NOTICE, "AIS Executive Service: started and ready to provide service.\n"); - /* - * Setup libais connection dispatch routine - */ - poll_dispatch_add (aisexec_poll_handle, libais_server_fd, - POLLIN, 0, poll_handler_libais_accept, 0); - - /* - * Join multicast group and setup delivery - * and configuration change functions - */ - /* * Start main processing loop */ diff --git a/exec/objdb.c b/exec/objdb.c index 885ef5e9..d68ff7a3 100644 --- a/exec/objdb.c +++ b/exec/objdb.c @@ -65,7 +65,8 @@ struct object_instance { static struct hdb_handle_database object_instance_database = { .handle_count = 0, .handles = 0, - .iterator = 0 + .iterator = 0, + .mutex = PTHREAD_MUTEX_INITIALIZER }; static int objdb_init (void) diff --git a/exec/totemnet.c b/exec/totemnet.c index 280ddf40..8d6db8f6 100644 --- a/exec/totemnet.c +++ b/exec/totemnet.c @@ -202,7 +202,8 @@ static struct totem_ip_address localhost; static struct hdb_handle_database totemnet_instance_database = { .handle_count = 0, .handles = 0, - .iterator = 0 + .iterator = 0, + .mutex = PTHREAD_MUTEX_INITIALIZER }; static void totemnet_instance_initialize (struct totemnet_instance *instance) diff --git a/exec/totempg.c b/exec/totempg.c index f8a9d7d7..bd2c1585 100644 --- a/exec/totempg.c +++ b/exec/totempg.c @@ -203,7 +203,8 @@ struct totempg_group_instance { static struct hdb_handle_database totempg_groups_instance_database = { .handle_count = 0, .handles = 0, - .iterator = 0 + .iterator = 0, + .mutex = PTHREAD_MUTEX_INITIALIZER }; static int send_ok (int msg_size); @@ -570,6 +571,8 @@ static void totempg_deliver_fn ( void *callback_token_received_handle; +pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER; + int callback_token_received_fn (enum totem_callback_token_type type, void *data) { @@ -577,10 +580,13 @@ int callback_token_received_fn (enum totem_callback_token_type type, struct iovec iovecs[3]; int res; + pthread_mutex_lock (&mcast_msg_mutex); if (mcast_packed_msg_count == 0) { + pthread_mutex_unlock (&mcast_msg_mutex); return (0); } if (totemmrp_avail() == 0) { + pthread_mutex_unlock (&mcast_msg_mutex); return (0); } mcast.fragmented = 0; @@ -605,6 +611,7 @@ int callback_token_received_fn (enum totem_callback_token_type type, mcast_packed_msg_count = 0; fragment_size = 0; + pthread_mutex_unlock (&mcast_msg_mutex); return (0); } @@ -672,6 +679,7 @@ static int mcast_msg ( int copy_base = 0; int total_size = 0; + pthread_mutex_lock (&mcast_msg_mutex); totemmrp_new_msg_signal (); max_packet_size = TOTEMPG_PACKET_SIZE - @@ -689,6 +697,7 @@ static int mcast_msg ( if (send_ok (total_size + sizeof(unsigned short) * (mcast_packed_msg_count+1)) == 0) { + pthread_mutex_unlock (&mcast_msg_mutex); return(-1); } @@ -800,6 +809,7 @@ static int mcast_msg ( mcast_packed_msg_count++; } + pthread_mutex_unlock (&mcast_msg_mutex); return (res); } diff --git a/exec/totemrrp.c b/exec/totemrrp.c index 8d369186..86b30bb4 100644 --- a/exec/totemrrp.c +++ b/exec/totemrrp.c @@ -288,7 +288,8 @@ struct rrp_algo active_algo = { static struct hdb_handle_database totemrrp_instance_database = { .handle_count = 0, .handles = 0, - .iterator = 0 + .iterator = 0, + .mutex = PTHREAD_MUTEX_INITIALIZER }; #define log_printf(level, format, args...) \ diff --git a/exec/totemsrp.c b/exec/totemsrp.c index d5d7db1c..c9f2e7df 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -48,7 +48,6 @@ */ #include -#include #include #include #include @@ -571,7 +570,8 @@ void main_iface_change_fn ( static struct hdb_handle_database totemsrp_instance_database = { .handle_count = 0, .handles = 0, - .iterator = 0 + .iterator = 0, + .mutex = PTHREAD_MUTEX_INITIALIZER }; struct message_handlers totemsrp_message_handlers = { 6, diff --git a/include/hdb.h b/include/hdb.h index 9bcb163e..7cde3d07 100644 --- a/include/hdb.h +++ b/include/hdb.h @@ -38,6 +38,7 @@ #include #include #include +#include enum HDB_HANDLE_STATE { HDB_HANDLE_STATE_EMPTY, @@ -55,12 +56,14 @@ struct hdb_handle_database { unsigned int handle_count; struct hdb_handle *handles; unsigned int iterator; + pthread_mutex_t mutex; }; static inline void hdb_create ( struct hdb_handle_database *handle_database) { memset (handle_database, 0, sizeof (struct hdb_handle_database)); + pthread_mutex_init (&handle_database->mutex, NULL); } static inline void hdb_destroy ( @@ -83,6 +86,8 @@ static inline int hdb_handle_create ( int found = 0; void *instance; + pthread_mutex_lock (&handle_database->mutex); + for (handle = 0; handle < handle_database->handle_count; handle++) { if (handle_database->handles[handle].state == HDB_HANDLE_STATE_EMPTY) { found = 1; @@ -95,6 +100,7 @@ static inline int hdb_handle_create ( new_handles = (struct hdb_handle *)realloc (handle_database->handles, sizeof (struct hdb_handle) * handle_database->handle_count); if (new_handles == 0) { + pthread_mutex_unlock (&handle_database->mutex); return (-1); } handle_database->handles = new_handles; @@ -114,6 +120,8 @@ static inline int hdb_handle_create ( *handle_id_out = handle; + pthread_mutex_unlock (&handle_database->mutex); + return (0); } @@ -122,18 +130,24 @@ static inline int hdb_handle_get ( unsigned int handle, void **instance) { + pthread_mutex_lock (&handle_database->mutex); + *instance = NULL; if (handle >= handle_database->handle_count) { + pthread_mutex_unlock (&handle_database->mutex); return (-1); } if (handle_database->handles[handle].state != HDB_HANDLE_STATE_ACTIVE) { + pthread_mutex_unlock (&handle_database->mutex); return (-1); } *instance = handle_database->handles[handle].instance; handle_database->handles[handle].ref_count += 1; + + pthread_mutex_unlock (&handle_database->mutex); return (0); } @@ -141,6 +155,7 @@ static inline void hdb_handle_put ( struct hdb_handle_database *handle_database, unsigned int handle) { + pthread_mutex_lock (&handle_database->mutex); handle_database->handles[handle].ref_count -= 1; assert (handle_database->handles[handle].ref_count >= 0); @@ -148,13 +163,17 @@ static inline void hdb_handle_put ( free (handle_database->handles[handle].instance); memset (&handle_database->handles[handle], 0, sizeof (struct hdb_handle)); } + pthread_mutex_unlock (&handle_database->mutex); } static inline void hdb_handle_destroy ( struct hdb_handle_database *handle_database, unsigned int handle) { + pthread_mutex_lock (&handle_database->mutex); + handle_database->handles[handle].state = HDB_HANDLE_STATE_PENDINGREMOVAL; + pthread_mutex_unlock (&handle_database->mutex); hdb_handle_put (handle_database, handle); } diff --git a/include/queue.h b/include/queue.h index 7ec7b6a1..ffdfee04 100644 --- a/include/queue.h +++ b/include/queue.h @@ -35,6 +35,7 @@ #define QUEUE_H_DEFINED #include +#include #include "assert.h" struct queue { @@ -46,6 +47,7 @@ struct queue { void *items; int size_per_item; int iterator; + pthread_mutex_t mutex; }; static inline int queue_init (struct queue *queue, int queue_items, int size_per_item) { @@ -61,17 +63,20 @@ static inline int queue_init (struct queue *queue, int queue_items, int size_per return (-ENOMEM); } memset (queue->items, 0, queue_items * size_per_item); + pthread_mutex_init (&queue->mutex, NULL); return (0); } static inline int queue_reinit (struct queue *queue) { + pthread_mutex_lock (&queue->mutex); queue->head = 0; queue->tail = queue->size - 1; queue->used = 0; queue->usedhw = 0; memset (queue->items, 0, queue->size * queue->size_per_item); + pthread_mutex_unlock (&queue->mutex); return (0); } @@ -80,11 +85,21 @@ static inline void queue_free (struct queue *queue) { } static inline int queue_is_full (struct queue *queue) { - return (queue->size - 1 == queue->used); + int full; + + pthread_mutex_lock (&queue->mutex); + full = queue->size - 1 == queue->used; + pthread_mutex_unlock (&queue->mutex); + return (full); } static inline int queue_is_empty (struct queue *queue) { - return (queue->used == 0); + int empty; + + pthread_mutex_lock (&queue->mutex); + empty = queue->used == 0; + pthread_mutex_unlock (&queue->mutex); + return (empty); } static inline void queue_item_add (struct queue *queue, void *item) @@ -92,6 +107,7 @@ static inline void queue_item_add (struct queue *queue, void *item) char *queue_item; int queue_position; + pthread_mutex_lock (&queue->mutex); queue_position = queue->head; queue_item = queue->items; queue_item += queue_position * queue->size_per_item; @@ -104,6 +120,7 @@ static inline void queue_item_add (struct queue *queue, void *item) if (queue->used > queue->usedhw) { queue->usedhw = queue->used; } + pthread_mutex_unlock (&queue->mutex); } static inline void *queue_item_get (struct queue *queue) @@ -111,34 +128,42 @@ static inline void *queue_item_get (struct queue *queue) char *queue_item; int queue_position; + pthread_mutex_lock (&queue->mutex); queue_position = (queue->tail + 1) % queue->size; queue_item = queue->items; queue_item += queue_position * queue->size_per_item; + pthread_mutex_unlock (&queue->mutex); return ((void *)queue_item); } static inline void queue_item_remove (struct queue *queue) { + pthread_mutex_lock (&queue->mutex); queue->tail = (queue->tail + 1) % queue->size; assert (queue->tail != queue->head); queue->used--; assert (queue->used >= 0); + pthread_mutex_unlock (&queue->mutex); } static inline void queue_items_remove (struct queue *queue, int rel_count) { + pthread_mutex_lock (&queue->mutex); queue->tail = (queue->tail + rel_count) % queue->size; assert (queue->tail != queue->head); queue->used -= rel_count; + pthread_mutex_unlock (&queue->mutex); } static inline void queue_item_iterator_init (struct queue *queue) { + pthread_mutex_lock (&queue->mutex); queue->iterator = (queue->tail + 1) % queue->size; + pthread_mutex_unlock (&queue->mutex); } static inline void *queue_item_iterator_get (struct queue *queue) @@ -146,30 +171,46 @@ static inline void *queue_item_iterator_get (struct queue *queue) char *queue_item; int queue_position; + pthread_mutex_lock (&queue->mutex); queue_position = (queue->iterator) % queue->size; if (queue->iterator == queue->head) { + pthread_mutex_unlock (&queue->mutex); return (0); } queue_item = queue->items; queue_item += queue_position * queue->size_per_item; + pthread_mutex_unlock (&queue->mutex); return ((void *)queue_item); } static inline int queue_item_iterator_next (struct queue *queue) { + int next_res; + + pthread_mutex_lock (&queue->mutex); queue->iterator = (queue->iterator + 1) % queue->size; - return (queue->iterator == queue->head); + next_res = queue->iterator == queue->head; + pthread_mutex_unlock (&queue->mutex); + return (next_res); } static inline void queue_avail (struct queue *queue, int *avail) { + pthread_mutex_lock (&queue->mutex); *avail = queue->size - queue->used - 2; assert (*avail >= 0); + pthread_mutex_unlock (&queue->mutex); } static inline int queue_used (struct queue *queue) { - return (queue->used); + int used; + + pthread_mutex_lock (&queue->mutex); + used = queue->used; + pthread_mutex_unlock (&queue->mutex); + + return (used); } #endif /* QUEUE_H_DEFINED */ diff --git a/lib/evt.c b/lib/evt.c index 573e740e..a2063eda 100644 --- a/lib/evt.c +++ b/lib/evt.c @@ -742,7 +742,6 @@ saEvtDispatch( * grabbed it. */ if (evt->led_head.error == SA_AIS_ERR_NOT_EXIST) { - DPRINT (("MESSAGE_RES_EVT_AVAILABLE: No event data\n")); error = SA_AIS_OK; break; } diff --git a/test/subscription.c b/test/subscription.c index a112aa7a..6eaefe7a 100644 --- a/test/subscription.c +++ b/test/subscription.c @@ -11,6 +11,7 @@ #include #include #include +#include #include "saAis.h" #include "saEvt.h" @@ -372,6 +373,10 @@ evt_free: static int err_wait_time = -1; +static struct sched_param sched_param = { + sched_priority: 1 +}; + int main (int argc, char **argv) { static const char opts[] = "c:s:n:qu:f:"; @@ -379,6 +384,8 @@ int main (int argc, char **argv) int option; char *p; + sched_setscheduler (0, SCHED_RR, &sched_param); + while (1) { option = getopt(argc, argv, opts); if (option == -1)