totemudpu: Add local loop support

This patch intends to solve long time ifdown corosync problem. Idea is
to use local socket for sending both unicast and multicast messages if
interface is down.

Together with testing what is current bind state it's possible to keep
pretending existence of old IP address instead of rebinding to localhost
what breaks a lot things badly.

Heavilly based on Yu, Zou <zouyu@shiqichuban.com> work and it's
basically port of UDP patch created by
Jan Friesse <jfriesse@redhat.com>.

(ported from needle 96354fba72)

Signed-off-by: Bin Liu <bliu@suse.com>
Reviewed-by: Jan Friesse <jfriesse@redhat.com>
This commit is contained in:
Bin Liu 2018-07-12 13:44:21 +02:00 committed by Jan Friesse
parent a471bab798
commit 96b4bd1660

View File

@ -173,6 +173,8 @@ struct totemudpu_instance {
int token_socket;
int local_loop_sock[2];
qb_loop_timer_handle timer_merge_detect_timeout;
int send_merge_detect_message;
@ -204,8 +206,6 @@ static void totemudpu_start_merge_detect_timeout(
static void totemudpu_stop_merge_detect_timeout(
void *udpu_context);
static struct totem_ip_address localhost;
static void totemudpu_instance_initialize (struct totemudpu_instance *instance)
{
memset (instance, 0, sizeof (struct totemudpu_instance));
@ -262,6 +262,7 @@ static inline void ucast_sendmsg (
struct sockaddr_storage sockaddr;
struct iovec iovec;
int addrlen;
int send_sock;
iovec.iov_base = (void *)msg;
iovec.iov_len = msg_len;
@ -292,12 +293,20 @@ static inline void ucast_sendmsg (
msg_ucast.msg_accrightslen = 0;
#endif
if (instance->netif_bind_state == BIND_STATE_REGULAR) {
send_sock = instance->token_socket;
} else {
send_sock = instance->local_loop_sock[1];
msg_ucast.msg_name = NULL;
msg_ucast.msg_namelen = 0;
}
/*
* Transmit unicast message
* An error here is recovered by totemsrp
*/
res = sendmsg (instance->token_socket, &msg_ucast, MSG_NOSIGNAL);
res = sendmsg (send_sock, &msg_ucast, MSG_NOSIGNAL);
if (res < 0) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug,
"sendmsg(ucast) failed (non-critical)");
@ -325,22 +334,65 @@ static inline void mcast_sendmsg (
/*
* Build multicast message
*/
qb_list_for_each(list, &(instance->member_list)) {
member = qb_list_entry (list,
struct totemudpu_member,
list);
if (instance->netif_bind_state == BIND_STATE_REGULAR) {
qb_list_for_each(list, &(instance->member_list)) {
member = qb_list_entry (list,
struct totemudpu_member,
list);
/*
* Do not send multicast message if message is not "flush", member
* is inactive and timeout for sending merge message didn't expired.
*/
if (only_active && !member->active && !instance->send_merge_detect_message)
continue ;
totemip_totemip_to_sockaddr_convert(&member->member,
instance->totem_interface->ip_port, &sockaddr, &addrlen);
msg_mcast.msg_name = &sockaddr;
msg_mcast.msg_namelen = addrlen;
msg_mcast.msg_iov = (void *)&iovec;
msg_mcast.msg_iovlen = 1;
#ifdef HAVE_MSGHDR_CONTROL
msg_mcast.msg_control = 0;
#endif
#ifdef HAVE_MSGHDR_CONTROLLEN
msg_mcast.msg_controllen = 0;
#endif
#ifdef HAVE_MSGHDR_FLAGS
msg_mcast.msg_flags = 0;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTS
msg_mcast.msg_accrights = NULL;
#endif
#ifdef HAVE_MSGHDR_ACCRIGHTSLEN
msg_mcast.msg_accrightslen = 0;
#endif
/*
* Transmit multicast message
* An error here is recovered by totemsrp
*/
res = sendmsg (member->fd, &msg_mcast, MSG_NOSIGNAL);
if (res < 0) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug,
"sendmsg(mcast) failed (non-critical)");
}
}
if (!only_active || instance->send_merge_detect_message) {
/*
* Current message was sent to all nodes
*/
instance->merge_detect_messages_sent_before_timeout++;
instance->send_merge_detect_message = 0;
}
} else {
/*
* Do not send multicast message if message is not "flush", member
* is inactive and timeout for sending merge message didn't expired.
* Transmit multicast message to local unix mcast loop
* An error here is recovered by totemsrp
*/
if (only_active && !member->active && !instance->send_merge_detect_message)
continue ;
totemip_totemip_to_sockaddr_convert(&member->member,
instance->totem_interface->ip_port, &sockaddr, &addrlen);
msg_mcast.msg_name = &sockaddr;
msg_mcast.msg_namelen = addrlen;
msg_mcast.msg_name = NULL;
msg_mcast.msg_namelen = 0;
msg_mcast.msg_iov = (void *)&iovec;
msg_mcast.msg_iovlen = 1;
#ifdef HAVE_MSGHDR_CONTROL
@ -359,24 +411,13 @@ static inline void mcast_sendmsg (
msg_mcast.msg_accrightslen = 0;
#endif
/*
* Transmit multicast message
* An error here is recovered by totemsrp
*/
res = sendmsg (member->fd, &msg_mcast, MSG_NOSIGNAL);
res = sendmsg (instance->local_loop_sock[1], &msg_mcast,
MSG_NOSIGNAL);
if (res < 0) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug,
"sendmsg(mcast) failed (non-critical)");
"sendmsg(local mcast loop) failed (non-critical)");
}
}
if (!only_active || instance->send_merge_detect_message) {
/*
* Current message was sent to all nodes
*/
instance->merge_detect_messages_sent_before_timeout++;
instance->send_merge_detect_message = 0;
}
}
int totemudpu_finalize (
@ -391,6 +432,13 @@ int totemudpu_finalize (
close (instance->token_socket);
}
if (instance->local_loop_sock[0] > 0) {
qb_loop_poll_del (instance->totemudpu_poll_handle,
instance->local_loop_sock[0]);
close (instance->local_loop_sock[0]);
close (instance->local_loop_sock[1]);
}
totemudpu_stop_merge_detect_timeout(instance);
return (res);
@ -508,7 +556,6 @@ static void timer_function_netif_check_timeout (
struct totemudpu_instance *instance = (struct totemudpu_instance *)data;
int interface_up;
int interface_num;
struct totem_ip_address *bind_address;
/*
* Build sockets for every interface
@ -545,14 +592,21 @@ static void timer_function_netif_check_timeout (
qb_loop_poll_del (instance->totemudpu_poll_handle,
instance->token_socket);
close (instance->token_socket);
instance->token_socket = -1;
}
if (interface_up == 0) {
if (instance->netif_bind_state == BIND_STATE_UNBOUND) {
log_printf (instance->totemudpu_log_level_error,
"One of your ip addresses are now bound to localhost. "
"Corosync would not work correctly.");
exit(COROSYNC_DONE_FATAL_ERR);
}
/*
* Interface is not up
*/
instance->netif_bind_state = BIND_STATE_LOOPBACK;
bind_address = &localhost;
/*
* Add a timer to retry building interfaces and request memb_gather_enter
@ -568,19 +622,20 @@ static void timer_function_netif_check_timeout (
* Interface is up
*/
instance->netif_bind_state = BIND_STATE_REGULAR;
bind_address = &instance->totem_interface->bindnet;
}
/*
* Create and bind the multicast and unicast sockets
*/
totemudpu_build_sockets (instance,
bind_address,
&instance->totem_interface->bindnet,
&instance->totem_interface->boundto);
qb_loop_poll_add (instance->totemudpu_poll_handle,
QB_LOOP_MED,
instance->token_socket,
POLLIN, instance, net_deliver_fn);
if (instance->netif_bind_state == BIND_STATE_REGULAR) {
qb_loop_poll_add (instance->totemudpu_poll_handle,
QB_LOOP_MED,
instance->token_socket,
POLLIN, instance, net_deliver_fn);
}
totemip_copy (&instance->my_id, &instance->totem_interface->boundto);
@ -720,6 +775,65 @@ int totemudpu_ifaces_get (
}
static int totemudpu_build_local_sockets(
struct totemudpu_instance *instance)
{
int i;
unsigned int sendbuf_size;
unsigned int recvbuf_size;
unsigned int optlen = sizeof (sendbuf_size);
int res;
/*
* Create local multicast loop socket
*/
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, instance->local_loop_sock) == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
"socket() failed");
return (-1);
}
for (i = 0; i < 2; i++) {
totemip_nosigpipe (sockets->local_loop_sock[i]);
res = fcntl (instance->local_loop_sock[i], F_SETFL, O_NONBLOCK);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
"Could not set non-blocking operation on multicast socket");
return (-1);
}
}
recvbuf_size = MCAST_SOCKET_BUFFER_SIZE;
sendbuf_size = MCAST_SOCKET_BUFFER_SIZE;
res = setsockopt (instance->local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug,
"Unable to set SO_RCVBUF size on UDP local mcast loop socket");
return (-1);
}
res = setsockopt (instance->local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
if (res == -1) {
LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug,
"Unable to set SO_SNDBUF size on UDP local mcast loop socket");
return (-1);
}
res = getsockopt (instance->local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
if (res == 0) {
log_printf (instance->totemudpu_log_level_debug,
"Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size);
}
res = getsockopt (instance->local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
if (res == 0) {
log_printf (instance->totemudpu_log_level_debug,
"Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size);
}
return (0);
}
static int totemudpu_build_sockets (
struct totemudpu_instance *instance,
struct totem_ip_address *bindnet_address,
@ -838,8 +952,19 @@ int totemudpu_initialize (
instance->totemudpu_target_set_completed = target_set_completed;
totemip_localhost (AF_INET, &localhost);
localhost.nodeid = instance->totem_config->node_id;
/*
* Create static local mcast sockets
*/
if (totemudpu_build_local_sockets(instance) == -1) {
free(instance);
return (-1);
}
qb_loop_poll_add (
instance->totemudpu_poll_handle,
QB_LOOP_MED,
instance->local_loop_sock[0],
POLLIN, instance, net_deliver_fn);
/*
* RRP layer isn't ready to receive message because it hasn't
@ -852,7 +977,7 @@ int totemudpu_initialize (
timer_function_netif_check_timeout,
&instance->timer_netif_check_timeout);
totemudpu_start_merge_detect_timeout(instance);
totemudpu_start_merge_detect_timeout((void*)instance);
*udpu_context = instance;
return (0);
@ -992,8 +1117,9 @@ extern int totemudpu_recv_mcast_empty (
struct sockaddr_storage system_from;
struct msghdr msg_recv;
struct pollfd ufd;
int nfds;
int nfds, i;
int msg_processed = 0;
int sock;
/*
* Receive datagram
@ -1018,19 +1144,34 @@ extern int totemudpu_recv_mcast_empty (
msg_recv.msg_accrightslen = 0;
#endif
do {
ufd.fd = instance->token_socket;
ufd.events = POLLIN;
nfds = poll (&ufd, 1, 0);
if (nfds == 1 && ufd.revents & POLLIN) {
res = recvmsg (instance->token_socket, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
if (res != -1) {
msg_processed = 1;
for (i = 0; i < 2; i++) {
sock = -1;
if (i == 0) {
if (instance->netif_bind_state == BIND_STATE_REGULAR) {
sock = instance->token_socket;
} else {
msg_processed = -1;
continue;
}
}
} while (nfds == 1);
if (i == 1) {
sock = instance->local_loop_sock[0];
}
assert(sock != -1);
do {
ufd.fd = sock;
ufd.events = POLLIN;
nfds = poll (&ufd, 1, 0);
if (nfds == 1 && ufd.revents & POLLIN) {
res = recvmsg (sock, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
if (res != -1) {
msg_processed = 1;
} else {
msg_processed = -1;
}
}
} while (nfds == 1);
}
return (msg_processed);
}