diff --git a/exec/totemsrp.c b/exec/totemsrp.c index fc7b5ac4..75cad6cb 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -143,8 +143,9 @@ int fcc_remcast_current = 0; enum message_type { MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */ MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */ - MESSAGE_TYPE_MEMB_JOIN = 2, /* membership join message */ - MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 3, /* membership commit token */ + MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */ + MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */ + MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */ }; /* @@ -345,6 +346,10 @@ struct memb_join { unsigned long long ring_seq; } __attribute__((packed)); +struct memb_merge_detect { + struct message_header header; +} __attribute__((packed)); + struct memb_commit_token_memb_entry { struct memb_ring_id ring_id; int aru; @@ -405,7 +410,7 @@ static struct iovec iov_encrypted = { struct message_handlers { int count; - int (*handler_functions[4]) (struct sockaddr_in *, struct iovec *, int, int, int); + int (*handler_functions[5]) (struct sockaddr_in *, struct iovec *, int, int, int); }; poll_handle *totemsrp_poll_handle; @@ -430,8 +435,11 @@ void (*totemsrp_confchg_fn) ( * forward decls */ static int message_handler_orf_token (struct sockaddr_in *, struct iovec *, int, int, int); + static int message_handler_mcast (struct sockaddr_in *, struct iovec *, int, int, int); +static int message_handler_memb_merge_detect (struct sockaddr_in *, struct iovec *, int, int, int); + static int message_handler_memb_join (struct sockaddr_in *, struct iovec *, int, int, int); static int message_handler_memb_commit_token (struct sockaddr_in *, struct iovec *, int, int, int); @@ -466,6 +474,7 @@ struct message_handlers totemsrp_message_handlers = { { message_handler_orf_token, message_handler_mcast, + message_handler_memb_merge_detect, message_handler_memb_join, message_handler_memb_commit_token } @@ -899,6 +908,8 @@ static void memb_state_consensus_timeout_expired (void) static int memb_join_message_send (void); +static int memb_merge_detect_send (void); + /* * Timers used for various states of the membership algorithm */ @@ -2388,6 +2399,35 @@ int memb_join_message_send (void) return (res); } +static int memb_merge_detect_send (void) +{ + struct msghdr msghdr; + struct iovec iovec; + struct memb_merge_detect memb_merge_detect; + int res; + + memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT; + memb_merge_detect.header.endian_detector = ENDIAN_LOCAL; + memb_merge_detect.header.encapsulated = 0; + + iovec.iov_base = &memb_merge_detect; + iovec.iov_len = sizeof (struct memb_merge_detect); + + encrypt_and_sign (&iovec, 1); + + msghdr.msg_name = &sockaddr_in_mcast; + msghdr.msg_namelen = sizeof (struct sockaddr_in); + msghdr.msg_iov = &iov_encrypted; + msghdr.msg_iovlen = 1; + msghdr.msg_control = 0; + msghdr.msg_controllen = 0; + msghdr.msg_flags = 0; + + res = sendmsg (totemsrp_sockets[0].mcast, &msghdr, MSG_NOSIGNAL | MSG_DONTWAIT); + + return (res); +} + static void memb_ring_id_create_or_load ( struct memb_ring_id *memb_ring_id) { @@ -2818,6 +2858,9 @@ printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0); reset_token_timeout (); // REVIEWED if (forward_token == 0) { reset_token_retransmit_timeout (); // REVIEWED + if (memb_state == MEMB_STATE_OPERATIONAL) { + memb_merge_detect_send (); + } } token_callbacks_execute (TOTEM_CALLBACK_TOKEN_SENT); @@ -3013,6 +3056,61 @@ printf ("got foreign message\n"); return (0); } +static int message_handler_memb_merge_detect ( + struct sockaddr_in *system_from, + struct iovec *iovec, + int iov_len, + int bytes_received, + int endian_conversion_needed) +{ + +printf ("merge detect\n"); + /* + * Return if we are already aware of this configuration + */ + if (memb_set_subset (&system_from->sin_addr, + 1, + my_new_memb_list, + my_new_memb_entries)) { + + return (0); + } + + printf ("Merging configuration with rep %s\n", inet_ntoa (system_from->sin_addr)); + /* + * Execute merge operation + */ + switch (memb_state) { + case MEMB_STATE_OPERATIONAL: + memb_set_merge (&system_from->sin_addr, 1, + my_proc_list, &my_proc_list_entries); + memb_state_gather_enter (); + break; + + case MEMB_STATE_GATHER: + if (!memb_set_subset (&system_from->sin_addr, + 1, + my_proc_list, + my_proc_list_entries)) { + + memb_set_merge (&system_from->sin_addr, 1, + my_proc_list, &my_proc_list_entries); + memb_state_gather_enter (); + return (0); + } + break; + + case MEMB_STATE_COMMIT: + /* discard message */ + break; + + case MEMB_STATE_RECOVERY: + /* discard message */ + break; + } + return (0); +} + int memb_join_process (struct memb_join *memb_join, struct sockaddr_in *system_from) { struct memb_commit_token my_commit_token;