From 69fd2d8dcd28d5abb4aaf959242b094f55dd48ac Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Fri, 20 Aug 2004 22:18:34 +0000 Subject: [PATCH] Add support to disconnect and dispatch to utilize flow control. (Logical change 1.55) git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@179 fd59a12c-fef9-0310-b244-a6a79926bd2f --- exec/main.c | 95 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 29 deletions(-) diff --git a/exec/main.c b/exec/main.c index 54211d5d..4f360b05 100644 --- a/exec/main.c +++ b/exec/main.c @@ -133,23 +133,37 @@ struct sockaddr_in this_ip; char *socketname = "libais.socket"; -static void libais_disconnect (struct conn_info *conn_info) +static int libais_disconnect (struct conn_info *conn_info) { - int fd; + int res = 0; if (ais_service_handlers[conn_info->service - 1]->libais_exit_fn) { - ais_service_handlers[conn_info->service - 1]->libais_exit_fn (conn_info); - } else { - printf ("exit function not defined\n"); + res = ais_service_handlers[conn_info->service - 1]->libais_exit_fn (conn_info); } - fd = conn_info->fd; + /* + * Close the library connection and free its + * data if it hasn't already been freed + */ + if (conn_info->inb) { + close (conn_info->fd); + queue_free (&conn_info->outq); + free (conn_info->inb); + conn_info->inb = 0; + } - close (fd); - queue_free (&conn_info->outq); - free (conn_info->inb); + /* + * If exit_fn didn't request a retry, + * free the conn_info structure + */ + if (res != -1) { + free (conn_info); + } - poll_dispatch_delete (aisexec_poll_handle, fd); + /* + * Inverse res from libais exit fn handler + */ + return (res != -1 ? -1 : 0); } extern int libais_send_response (struct conn_info *conn_info, @@ -280,7 +294,7 @@ retry_accept: return (0); /* This is an error, but -1 would indicate disconnect from poll */ } - poll_dispatch_add (aisexec_poll_handle, new_fd, POLLIN, conn_info, + poll_dispatch_add (aisexec_poll_handle, new_fd, POLLIN|POLLNVAL, conn_info, poll_handler_libais_deliver, 0); // TODO is this needed, or shouldn't it be in conn_info_create ? @@ -288,11 +302,16 @@ retry_accept: return (0); } +struct message_overlay { + struct res_header header; + char buf[4096]; +}; + static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent, void *data, unsigned int *prio) { int res; struct conn_info *conn_info = (struct conn_info *)data; - struct message_header *header; + struct req_header *header; int service; struct msghdr msg_recv; struct iovec iov_recv; @@ -300,6 +319,8 @@ static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent, char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))]; struct ucred *cred; int on = 0; + int send_ok = 0; + struct message_overlay msg_overlay; msg_recv.msg_iov = &iov_recv; msg_recv.msg_iovlen = 1; @@ -352,19 +373,13 @@ retry_recv: } /* * Dispatch all messages received in recvmsg that can be dispatched - * sizeof (struct message_header) needed at minimum to do any processing + * sizeof (struct req_header) needed at minimum to do any processing */ conn_info->inb_inuse += res; conn_info->inb_start += res; - while (conn_info->inb_inuse >= sizeof (struct message_header) && res != -1) { - header = (struct message_header *)&conn_info->inb[conn_info->inb_start - conn_info->inb_inuse]; - - if (header->magic != MESSAGE_MAGIC) { - log_printf (LOG_LEVEL_SECURITY, "Invalid magic is %x should be %x\n", header->magic, MESSAGE_MAGIC); - res = -1; - goto error_exit; - } + while (conn_info->inb_inuse >= sizeof (struct req_header) && res != -1) { + header = (struct req_header *)&conn_info->inb[conn_info->inb_start - conn_info->inb_inuse]; if (header->size > conn_info->inb_inuse) { break; @@ -384,14 +399,36 @@ retry_recv: /* * Not an init service, but a standard service */ - if (header->id < 0 || header->id > ais_service_handlers[service - 1]->libais_handler_fns_count) { + if (header->id < 0 || header->id > ais_service_handlers[service - 1]->libais_handlers_count) { log_printf (LOG_LEVEL_SECURITY, "Invalid header id is %d min 0 max %d\n", - header->id, ais_service_handlers[service - 1]->libais_handler_fns_count); + header->id, ais_service_handlers[service - 1]->libais_handlers_count); res = -1; goto error_exit; } + + /* + * Determine if a message can be queued with gmi and if so + * deliver it, otherwise tell the library we are too busy + */ - res = ais_service_handlers[service - 1]->libais_handler_fns[header->id](conn_info, header); + send_ok = gmi_send_ok (ais_service_handlers[service - 1]->libais_handlers[header->id].gmi_prio, 1000 + header->size); + if (send_ok) { + *prio = 0; + res = ais_service_handlers[service - 1]->libais_handlers[header->id].libais_handler_fn(conn_info, header); + } else { + *prio = (*prio) + 1; + + /* + * Overload, tell library to retry + */ + msg_overlay.header.size = + ais_service_handlers[service - 1]->libais_handlers[header->id].response_size; + msg_overlay.header.id = + ais_service_handlers[service - 1]->libais_handlers[header->id].response_id; + msg_overlay.header.error = SA_ERR_TRY_AGAIN; + libais_send_response (conn_info, &msg_overlay, + msg_overlay.header.size); + } } conn_info->inb_inuse -= header->size; } /* while */ @@ -413,8 +450,8 @@ retry_recv: return (res); error_exit: - libais_disconnect (conn_info); - return (-1); /* remove entry from poll list */ + res = libais_disconnect (conn_info); + return (res); } extern void print_stats (void); @@ -475,7 +512,7 @@ static void deliver_fn ( struct iovec *iovec, int iov_len) { - struct message_header *header; + struct req_header *header; int res; int pos = 0; int i; @@ -491,9 +528,9 @@ static void deliver_fn ( pos += iovec[i].iov_len; assert (pos < MESSAGE_SIZE_MAX); } - header = (struct message_header *)delivery_data; + header = (struct req_header *)delivery_data; } else { - header = (struct message_header *)iovec[0].iov_base; + header = (struct req_header *)iovec[0].iov_base; } res = aisexec_handler_fns[header->id](header, source_addr); }