diff --git a/exec/totemudpu.c b/exec/totemudpu.c index fff1d8d9..60de2e5c 100644 --- a/exec/totemudpu.c +++ b/exec/totemudpu.c @@ -173,6 +173,8 @@ struct totemudpu_instance { int token_socket; + int local_loop_sock[2]; + qb_loop_timer_handle timer_merge_detect_timeout; int send_merge_detect_message; @@ -204,8 +206,6 @@ static void totemudpu_start_merge_detect_timeout( static void totemudpu_stop_merge_detect_timeout( void *udpu_context); -static struct totem_ip_address localhost; - static void totemudpu_instance_initialize (struct totemudpu_instance *instance) { memset (instance, 0, sizeof (struct totemudpu_instance)); @@ -262,6 +262,7 @@ static inline void ucast_sendmsg ( struct sockaddr_storage sockaddr; struct iovec iovec; int addrlen; + int send_sock; iovec.iov_base = (void *)msg; iovec.iov_len = msg_len; @@ -292,12 +293,20 @@ static inline void ucast_sendmsg ( msg_ucast.msg_accrightslen = 0; #endif + if (instance->netif_bind_state == BIND_STATE_REGULAR) { + send_sock = instance->token_socket; + } else { + send_sock = instance->local_loop_sock[1]; + msg_ucast.msg_name = NULL; + msg_ucast.msg_namelen = 0; + } + /* * Transmit unicast message * An error here is recovered by totemsrp */ - res = sendmsg (instance->token_socket, &msg_ucast, MSG_NOSIGNAL); + res = sendmsg (send_sock, &msg_ucast, MSG_NOSIGNAL); if (res < 0) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, "sendmsg(ucast) failed (non-critical)"); @@ -325,22 +334,65 @@ static inline void mcast_sendmsg ( /* * Build multicast message */ - qb_list_for_each(list, &(instance->member_list)) { - member = qb_list_entry (list, - struct totemudpu_member, - list); + if (instance->netif_bind_state == BIND_STATE_REGULAR) { + qb_list_for_each(list, &(instance->member_list)) { + member = qb_list_entry (list, + struct totemudpu_member, + list); + /* + * Do not send multicast message if message is not "flush", member + * is inactive and timeout for sending merge message didn't expired. + */ + if (only_active && !member->active && !instance->send_merge_detect_message) + continue ; + totemip_totemip_to_sockaddr_convert(&member->member, + instance->totem_interface->ip_port, &sockaddr, &addrlen); + msg_mcast.msg_name = &sockaddr; + msg_mcast.msg_namelen = addrlen; + msg_mcast.msg_iov = (void *)&iovec; + msg_mcast.msg_iovlen = 1; + #ifdef HAVE_MSGHDR_CONTROL + msg_mcast.msg_control = 0; + #endif + #ifdef HAVE_MSGHDR_CONTROLLEN + msg_mcast.msg_controllen = 0; + #endif + #ifdef HAVE_MSGHDR_FLAGS + msg_mcast.msg_flags = 0; + #endif + #ifdef HAVE_MSGHDR_ACCRIGHTS + msg_mcast.msg_accrights = NULL; + #endif + #ifdef HAVE_MSGHDR_ACCRIGHTSLEN + msg_mcast.msg_accrightslen = 0; + #endif + + /* + * Transmit multicast message + * An error here is recovered by totemsrp + */ + res = sendmsg (member->fd, &msg_mcast, MSG_NOSIGNAL); + if (res < 0) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, + "sendmsg(mcast) failed (non-critical)"); + } + } + + if (!only_active || instance->send_merge_detect_message) { + /* + * Current message was sent to all nodes + */ + instance->merge_detect_messages_sent_before_timeout++; + instance->send_merge_detect_message = 0; + } + } else { /* - * Do not send multicast message if message is not "flush", member - * is inactive and timeout for sending merge message didn't expired. + * Transmit multicast message to local unix mcast loop + * An error here is recovered by totemsrp */ - if (only_active && !member->active && !instance->send_merge_detect_message) - continue ; - - totemip_totemip_to_sockaddr_convert(&member->member, - instance->totem_interface->ip_port, &sockaddr, &addrlen); - msg_mcast.msg_name = &sockaddr; - msg_mcast.msg_namelen = addrlen; + msg_mcast.msg_name = NULL; + msg_mcast.msg_namelen = 0; msg_mcast.msg_iov = (void *)&iovec; msg_mcast.msg_iovlen = 1; #ifdef HAVE_MSGHDR_CONTROL @@ -359,24 +411,13 @@ static inline void mcast_sendmsg ( msg_mcast.msg_accrightslen = 0; #endif - /* - * Transmit multicast message - * An error here is recovered by totemsrp - */ - res = sendmsg (member->fd, &msg_mcast, MSG_NOSIGNAL); + res = sendmsg (instance->local_loop_sock[1], &msg_mcast, + MSG_NOSIGNAL); if (res < 0) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, - "sendmsg(mcast) failed (non-critical)"); + "sendmsg(local mcast loop) failed (non-critical)"); } } - - if (!only_active || instance->send_merge_detect_message) { - /* - * Current message was sent to all nodes - */ - instance->merge_detect_messages_sent_before_timeout++; - instance->send_merge_detect_message = 0; - } } int totemudpu_finalize ( @@ -391,6 +432,13 @@ int totemudpu_finalize ( close (instance->token_socket); } + if (instance->local_loop_sock[0] > 0) { + qb_loop_poll_del (instance->totemudpu_poll_handle, + instance->local_loop_sock[0]); + close (instance->local_loop_sock[0]); + close (instance->local_loop_sock[1]); + } + totemudpu_stop_merge_detect_timeout(instance); return (res); @@ -508,7 +556,6 @@ static void timer_function_netif_check_timeout ( struct totemudpu_instance *instance = (struct totemudpu_instance *)data; int interface_up; int interface_num; - struct totem_ip_address *bind_address; /* * Build sockets for every interface @@ -545,14 +592,21 @@ static void timer_function_netif_check_timeout ( qb_loop_poll_del (instance->totemudpu_poll_handle, instance->token_socket); close (instance->token_socket); + instance->token_socket = -1; } if (interface_up == 0) { + if (instance->netif_bind_state == BIND_STATE_UNBOUND) { + log_printf (instance->totemudpu_log_level_error, + "One of your ip addresses are now bound to localhost. " + "Corosync would not work correctly."); + exit(COROSYNC_DONE_FATAL_ERR); + } + /* * Interface is not up */ instance->netif_bind_state = BIND_STATE_LOOPBACK; - bind_address = &localhost; /* * Add a timer to retry building interfaces and request memb_gather_enter @@ -568,19 +622,20 @@ static void timer_function_netif_check_timeout ( * Interface is up */ instance->netif_bind_state = BIND_STATE_REGULAR; - bind_address = &instance->totem_interface->bindnet; } /* * Create and bind the multicast and unicast sockets */ totemudpu_build_sockets (instance, - bind_address, + &instance->totem_interface->bindnet, &instance->totem_interface->boundto); - qb_loop_poll_add (instance->totemudpu_poll_handle, - QB_LOOP_MED, - instance->token_socket, - POLLIN, instance, net_deliver_fn); + if (instance->netif_bind_state == BIND_STATE_REGULAR) { + qb_loop_poll_add (instance->totemudpu_poll_handle, + QB_LOOP_MED, + instance->token_socket, + POLLIN, instance, net_deliver_fn); + } totemip_copy (&instance->my_id, &instance->totem_interface->boundto); @@ -720,6 +775,65 @@ int totemudpu_ifaces_get ( } +static int totemudpu_build_local_sockets( + struct totemudpu_instance *instance) +{ + int i; + unsigned int sendbuf_size; + unsigned int recvbuf_size; + unsigned int optlen = sizeof (sendbuf_size); + int res; + + /* + * Create local multicast loop socket + */ + if (socketpair(AF_UNIX, SOCK_DGRAM, 0, instance->local_loop_sock) == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, + "socket() failed"); + return (-1); + } + + for (i = 0; i < 2; i++) { + totemip_nosigpipe (sockets->local_loop_sock[i]); + res = fcntl (instance->local_loop_sock[i], F_SETFL, O_NONBLOCK); + if (res == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, + "Could not set non-blocking operation on multicast socket"); + return (-1); + } + } + + recvbuf_size = MCAST_SOCKET_BUFFER_SIZE; + sendbuf_size = MCAST_SOCKET_BUFFER_SIZE; + + res = setsockopt (instance->local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen); + if (res == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, + "Unable to set SO_RCVBUF size on UDP local mcast loop socket"); + return (-1); + } + res = setsockopt (instance->local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen); + if (res == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, + "Unable to set SO_SNDBUF size on UDP local mcast loop socket"); + return (-1); + } + + res = getsockopt (instance->local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen); + if (res == 0) { + log_printf (instance->totemudpu_log_level_debug, + "Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size); + } + + res = getsockopt (instance->local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen); + if (res == 0) { + log_printf (instance->totemudpu_log_level_debug, + "Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size); + } + + return (0); +} + static int totemudpu_build_sockets ( struct totemudpu_instance *instance, struct totem_ip_address *bindnet_address, @@ -838,8 +952,19 @@ int totemudpu_initialize ( instance->totemudpu_target_set_completed = target_set_completed; - totemip_localhost (AF_INET, &localhost); - localhost.nodeid = instance->totem_config->node_id; + /* + * Create static local mcast sockets + */ + if (totemudpu_build_local_sockets(instance) == -1) { + free(instance); + return (-1); + } + + qb_loop_poll_add ( + instance->totemudpu_poll_handle, + QB_LOOP_MED, + instance->local_loop_sock[0], + POLLIN, instance, net_deliver_fn); /* * RRP layer isn't ready to receive message because it hasn't @@ -852,7 +977,7 @@ int totemudpu_initialize ( timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); - totemudpu_start_merge_detect_timeout(instance); + totemudpu_start_merge_detect_timeout((void*)instance); *udpu_context = instance; return (0); @@ -992,8 +1117,9 @@ extern int totemudpu_recv_mcast_empty ( struct sockaddr_storage system_from; struct msghdr msg_recv; struct pollfd ufd; - int nfds; + int nfds, i; int msg_processed = 0; + int sock; /* * Receive datagram @@ -1018,19 +1144,34 @@ extern int totemudpu_recv_mcast_empty ( msg_recv.msg_accrightslen = 0; #endif - do { - ufd.fd = instance->token_socket; - ufd.events = POLLIN; - nfds = poll (&ufd, 1, 0); - if (nfds == 1 && ufd.revents & POLLIN) { - res = recvmsg (instance->token_socket, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); - if (res != -1) { - msg_processed = 1; + for (i = 0; i < 2; i++) { + sock = -1; + if (i == 0) { + if (instance->netif_bind_state == BIND_STATE_REGULAR) { + sock = instance->token_socket; } else { - msg_processed = -1; + continue; } } - } while (nfds == 1); + if (i == 1) { + sock = instance->local_loop_sock[0]; + } + assert(sock != -1); + + do { + ufd.fd = sock; + ufd.events = POLLIN; + nfds = poll (&ufd, 1, 0); + if (nfds == 1 && ufd.revents & POLLIN) { + res = recvmsg (sock, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); + if (res != -1) { + msg_processed = 1; + } else { + msg_processed = -1; + } + } + } while (nfds == 1); + } return (msg_processed); }