From 4336be7c4cd4ef067a1652d442874295f699ca4c Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Sun, 9 Sep 2007 06:24:00 +0000 Subject: [PATCH] totem srp merge from whitetank git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1427 fd59a12c-fef9-0310-b244-a6a79926bd2f --- exec/totemsrp.c | 283 +++++++++++++++++++++++++++--------------------- 1 file changed, 158 insertions(+), 125 deletions(-) diff --git a/exec/totemsrp.c b/exec/totemsrp.c index d7258d93..2a2fc1a7 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -48,9 +48,10 @@ * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top */ -#ifndef OPENAIS_BSD +#ifdef OPENAIS_BSD #include #endif + #include #include #include @@ -82,6 +83,7 @@ #include "../include/sq.h" #include "../include/list.h" #include "../include/hdb.h" +#include "swab.h" #include "crypto.h" @@ -91,6 +93,7 @@ #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */ #define MAXIOVS 5 #define RETRANSMIT_ENTRIES_MAX 30 +#define TOKEN_SIZE_MAX 64000 /* bytes */ /* * Rollover handling: @@ -323,6 +326,8 @@ struct totemsrp_instance { struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]; + struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]; + int my_proc_list_entries; int my_failed_list_entries; @@ -335,7 +340,7 @@ struct totemsrp_instance { int my_deliver_memb_entries; - int my_nodeid_lookup_entries; + int my_left_memb_entries; struct memb_ring_id my_ring_id; @@ -387,14 +392,12 @@ struct totemsrp_instance { struct list_head token_callback_sent_listhead; - char *orf_token_retransmit; // sizeof (struct orf_token) + sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX]; + char *orf_token_retransmit[TOKEN_SIZE_MAX]; int orf_token_retransmit_size; unsigned int my_token_seq; - unsigned int my_commit_token_seq; - /* * Timers */ @@ -551,8 +554,10 @@ static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token int fcc_mcasts_allowed); static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru); -static void memb_ring_id_store (struct totemsrp_instance *instance, struct memb_commit_token *commit_token); +static void memb_ring_id_set_and_store (struct totemsrp_instance *instance, + struct memb_ring_id *ring_id); static void memb_state_commit_token_update (struct totemsrp_instance *instance, struct memb_commit_token *commit_token); +static void memb_state_commit_token_target_set (struct totemsrp_instance *instance, struct memb_commit_token *commit_token); static int memb_state_commit_token_send (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token); static void memb_state_commit_token_create (struct totemsrp_instance *instance, struct memb_commit_token *commit_token); static int token_hold_cancel_send (struct totemsrp_instance *instance); @@ -618,10 +623,6 @@ void totemsrp_instance_initialize (struct totemsrp_instance *instance) instance->my_token_seq = SEQNO_START_TOKEN - 1; - instance->my_commit_token_seq = SEQNO_START_TOKEN - 1; - - instance->orf_token_retransmit = malloc (15000); - instance->memb_state = MEMB_STATE_OPERATIONAL; instance->set_aru = -1; @@ -945,7 +946,6 @@ error_exit: return (res); } - /* * Set operations for use by the membership algorithm */ @@ -1186,7 +1186,7 @@ static void memb_set_merge ( } } if (found == 0) { - srp_addr_copy (&fullset[j], &subset[i]); + srp_addr_copy (&fullset[*fullset_entries], &subset[i]); *fullset_entries = *fullset_entries + 1; } found = 0; @@ -1388,7 +1388,7 @@ static void memb_state_consensus_timeout_expired ( memb_set_merge (no_consensus_list, no_consensus_list_entries, instance->my_failed_list, &instance->my_failed_list_entries); - memb_state_gather_enter (instance, 1); + memb_state_gather_enter (instance, 0); } } @@ -1569,13 +1569,13 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance) { struct srp_addr joined_list[PROCESSOR_COUNT_MAX]; int joined_list_entries = 0; - struct srp_addr left_list[PROCESSOR_COUNT_MAX]; - int left_list_entries = 0; unsigned int aru_save; - unsigned int left_list_totemip[PROCESSOR_COUNT_MAX]; unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX]; unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX]; unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX]; + unsigned int left_list[PROCESSOR_COUNT_MAX]; + + memb_consensus_reset (instance); old_ring_state_reset (instance); ring_reset (instance); @@ -1593,7 +1593,8 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance) /* * Calculate joined and left list */ - memb_set_subtract (left_list, &left_list_entries, + memb_set_subtract (instance->my_left_memb_list, + &instance->my_left_memb_entries, instance->my_memb_list, instance->my_memb_entries, instance->my_trans_memb_list, instance->my_trans_memb_entries); @@ -1613,12 +1614,13 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance) /* * Deliver transitional configuration to application */ - srp_addr_to_nodeid (left_list_totemip, left_list, left_list_entries); + srp_addr_to_nodeid (left_list, instance->my_left_memb_list, + instance->my_left_memb_entries); srp_addr_to_nodeid (trans_memb_list_totemip, instance->my_trans_memb_list, instance->my_trans_memb_entries); instance->totemsrp_confchg_fn (TOTEM_CONFIGURATION_TRANSITIONAL, trans_memb_list_totemip, instance->my_trans_memb_entries, - left_list_totemip, left_list_entries, + left_list, instance->my_left_memb_entries, 0, 0, &instance->my_ring_id); // TODO we need to filter to ensure we only deliver those @@ -1669,8 +1671,6 @@ static void memb_state_gather_enter ( struct totemsrp_instance *instance, int gather_from) { - instance->my_commit_token_seq = SEQNO_START_TOKEN - 1; - memb_set_merge ( &instance->my_id, 1, instance->my_proc_list, &instance->my_proc_list_entries); @@ -1730,12 +1730,15 @@ static void memb_state_commit_enter ( old_ring_state_save (instance); -// ABC memb_state_commit_token_update (instance, commit_token); + memb_state_commit_token_target_set (instance, commit_token); + + memb_ring_id_set_and_store (instance, &commit_token->ring_id); + memb_state_commit_token_send (instance, commit_token); - memb_ring_id_store (instance, commit_token); + instance->token_ring_id_seq = instance->my_ring_id.seq; poll_timer_delete (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout); @@ -1753,8 +1756,6 @@ static void memb_state_commit_enter ( instance->memb_state = MEMB_STATE_COMMIT; - instance->my_commit_token_seq = SEQNO_START_TOKEN - 1; - /* * reset all flow control variables since we are starting a new ring */ @@ -1829,7 +1830,7 @@ static void memb_state_recovery_enter ( memb_list[i].high_delivered, memb_list[i].received_flg); -// TODO assert (!totemip_zero_check(&memb_list[i].ring_id.rep)); + assert (totemip_print (&memb_list[i].ring_id.rep) != 0); } /* * Determine if any received flag is false @@ -2045,6 +2046,8 @@ error_iovec: for (j = 0; j < i; j++) { free (message_item.iovec[j].iov_base); } + + free(message_item.mcast); error_mcast: hdb_handle_put (&totemsrp_instance_database, handle); @@ -2295,13 +2298,13 @@ static int orf_token_mcast ( * Delete item from pending queue */ queue_item_remove (mcast_queue); + + /* + * If messages mcasted, deliver any new messages to totempg + */ + instance->my_high_seq_received = token->seq; } - /* - * If messages mcasted, deliver any new messages to totempg - */ - instance->my_high_seq_received = token->seq; - update_aru (instance); /* @@ -2423,8 +2426,7 @@ static int orf_token_rtr ( /* * Missing message not found in current retransmit list so add it */ - memb_ring_id_copy ( - &rtr_list[orf_token->rtr_list_entries].ring_id, + memb_ring_id_copy (&rtr_list[orf_token->rtr_list_entries].ring_id, &instance->my_ring_id); rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i; orf_token->rtr_list_entries++; @@ -2459,7 +2461,6 @@ static void timer_function_token_retransmit_timeout (void *data) case MEMB_STATE_GATHER: break; case MEMB_STATE_COMMIT: - break; case MEMB_STATE_OPERATIONAL: case MEMB_STATE_RECOVERY: token_retransmit (instance); @@ -2525,7 +2526,7 @@ static int token_send ( return (0); } - iovec.iov_base = (char *)orf_token; + iovec.iov_base = orf_token; iovec.iov_len = iov_len; totemrrp_token_send (instance->totemrrp_handle, @@ -2556,10 +2557,10 @@ static int token_hold_cancel_send (struct totemsrp_instance *instance) token_hold_cancel.header.nodeid = instance->my_id.addr[0].nodeid; assert (token_hold_cancel.header.nodeid); - iovec[0].iov_base = (char *)&token_hold_cancel; + iovec[0].iov_base = &token_hold_cancel; iovec[0].iov_len = sizeof (struct token_hold_cancel) - sizeof (struct memb_ring_id); - iovec[1].iov_base = (char *)&instance->my_ring_id; + iovec[1].iov_base = &instance->my_ring_id; iovec[1].iov_len = sizeof (struct memb_ring_id); totemrrp_mcast_flush_send (instance->totemrrp_handle, iovec, 2); @@ -2611,65 +2612,78 @@ static void memb_state_commit_token_update ( struct totemsrp_instance *instance, struct memb_commit_token *commit_token) { - int memb_index_this; struct srp_addr *addr; struct memb_commit_token_memb_entry *memb_list; addr = (struct srp_addr *)commit_token->end_of_commit_token; memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries); - memb_index_this = (commit_token->memb_index + 1) % commit_token->addr_entries; - memb_ring_id_copy (&memb_list[memb_index_this].ring_id, + memb_ring_id_copy (&memb_list[commit_token->memb_index].ring_id, &instance->my_old_ring_id); assert (!totemip_zero_check(&instance->my_old_ring_id.rep)); - memb_list[memb_index_this].aru = instance->old_ring_state_aru; + memb_list[commit_token->memb_index].aru = instance->old_ring_state_aru; /* * TODO high delivered is really instance->my_aru, but with safe this * could change? */ - memb_list[memb_index_this].high_delivered = instance->my_high_delivered; - memb_list[memb_index_this].received_flg = instance->my_received_flg; + memb_list[commit_token->memb_index].high_delivered = instance->my_high_delivered; + memb_list[commit_token->memb_index].received_flg = instance->my_received_flg; commit_token->header.nodeid = instance->my_id.addr[0].nodeid; + commit_token->memb_index += 1; + assert (commit_token->memb_index <= commit_token->addr_entries); assert (commit_token->header.nodeid); } -static int memb_state_commit_token_send (struct totemsrp_instance *instance, +static void memb_state_commit_token_target_set ( + struct totemsrp_instance *instance, + struct memb_commit_token *commit_token) +{ + struct srp_addr *addr; + unsigned int i; + + addr = (struct srp_addr *)commit_token->end_of_commit_token; + + for (i = 0; i < instance->totem_config->interface_count; i++) { + totemrrp_token_target_set ( + instance->totemrrp_handle, + &addr[commit_token->memb_index % + commit_token->addr_entries].addr[i], + i); + } +} + +static int memb_state_commit_token_send ( + struct totemsrp_instance *instance, struct memb_commit_token *commit_token) { struct iovec iovec; - int memb_index_this; - int memb_index_next; struct srp_addr *addr; struct memb_commit_token_memb_entry *memb_list; - unsigned int i; addr = (struct srp_addr *)commit_token->end_of_commit_token; memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries); commit_token->token_seq++; - memb_index_this = (commit_token->memb_index + 1) % commit_token->addr_entries; - memb_index_next = (memb_index_this + 1) % commit_token->addr_entries; - commit_token->memb_index = memb_index_this; - - - iovec.iov_base = (char *)commit_token; + iovec.iov_base = commit_token; iovec.iov_len = sizeof (struct memb_commit_token) + ((sizeof (struct srp_addr) + sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries); - - for (i = 0; i < instance->totem_config->interface_count; i++) { - totemrrp_token_target_set ( - instance->totemrrp_handle, - &addr[memb_index_next].addr[i], - i); - } + /* + * Make a copy for retransmission if necessary + */ + memcpy (instance->orf_token_retransmit, commit_token, iovec.iov_len); + instance->orf_token_retransmit_size = iovec.iov_len; totemrrp_token_send (instance->totemrrp_handle, &iovec, 1); + /* + * Request retransmission of the commit token in case it is lost + */ + reset_token_retransmit_timeout (instance); return (0); } @@ -2740,7 +2754,7 @@ static void memb_state_commit_token_create ( qsort (token_memb, token_memb_entries, sizeof (struct srp_addr), srp_addr_compare); - commit_token->memb_index = token_memb_entries - 1; + commit_token->memb_index = 0; commit_token->addr_entries = token_memb_entries; addr = (struct srp_addr *)commit_token->end_of_commit_token; @@ -2756,6 +2770,7 @@ static void memb_join_message_send (struct totemsrp_instance *instance) { struct memb_join memb_join; struct iovec iovec[3]; + unsigned int iovs; memb_join.header.type = MESSAGE_TYPE_MEMB_JOIN; memb_join.header.endian_detector = ENDIAN_LOCAL; @@ -2763,21 +2778,25 @@ static void memb_join_message_send (struct totemsrp_instance *instance) memb_join.header.nodeid = instance->my_id.addr[0].nodeid; assert (memb_join.header.nodeid); - assert (srp_addr_equal (&instance->my_proc_list[0], &instance->my_proc_list[1]) == 0); memb_join.ring_seq = instance->my_ring_id.seq; memb_join.proc_list_entries = instance->my_proc_list_entries; memb_join.failed_list_entries = instance->my_failed_list_entries; srp_addr_copy (&memb_join.system_from, &instance->my_id); - iovec[0].iov_base = (char *)&memb_join; + iovec[0].iov_base = &memb_join; iovec[0].iov_len = sizeof (struct memb_join); - iovec[1].iov_base = (char *)&instance->my_proc_list; + iovec[1].iov_base = &instance->my_proc_list; iovec[1].iov_len = instance->my_proc_list_entries * sizeof (struct srp_addr); - iovec[2].iov_base = (char *)&instance->my_failed_list; - iovec[2].iov_len = instance->my_failed_list_entries * - sizeof (struct srp_addr); + if (instance->my_failed_list_entries == 0) { + iovs = 2; + } else { + iovs = 3; + iovec[2].iov_base = &instance->my_failed_list; + iovec[2].iov_len = instance->my_failed_list_entries * + sizeof (struct srp_addr); + } if (instance->totem_config->send_join_timeout) { usleep (random() % (instance->totem_config->send_join_timeout * 1000)); @@ -2786,7 +2805,7 @@ static void memb_join_message_send (struct totemsrp_instance *instance) totemrrp_mcast_flush_send ( instance->totemrrp_handle, iovec, - 3); + iovs); } static void memb_merge_detect_transmit (struct totemsrp_instance *instance) @@ -2801,10 +2820,10 @@ static void memb_merge_detect_transmit (struct totemsrp_instance *instance) srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id); assert (memb_merge_detect.header.nodeid); - iovec[0].iov_base = (char *)&memb_merge_detect; + iovec[0].iov_base = &memb_merge_detect; iovec[0].iov_len = sizeof (struct memb_merge_detect) - sizeof (struct memb_ring_id); - iovec[1].iov_base = (char *)&instance->my_ring_id; + iovec[1].iov_base = &instance->my_ring_id; iovec[1].iov_len = sizeof (struct memb_ring_id); totemrrp_mcast_flush_send (instance->totemrrp_handle, iovec, 2); @@ -2836,6 +2855,7 @@ static void memb_ring_id_create_or_load ( } res = write (fd, &memb_ring_id->seq, sizeof (unsigned long long)); assert (res == sizeof (unsigned long long)); + fsync (fd); close (fd); } else { log_printf (instance->totemsrp_log_level_warning, @@ -2847,16 +2867,18 @@ static void memb_ring_id_create_or_load ( instance->token_ring_id_seq = memb_ring_id->seq; } -static void memb_ring_id_store ( +static void memb_ring_id_set_and_store ( struct totemsrp_instance *instance, - struct memb_commit_token *commit_token) + struct memb_ring_id *ring_id) { char filename[256]; int fd; int res; - sprintf (filename, "/tmp/ringid_%s", - totemip_print (&instance->my_id.addr[0])); + memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id)); + + sprintf (filename, "%s/ringid_%s", + rundir, totemip_print (&instance->my_id.addr[0])); fd = open (filename, O_WRONLY, 0777); if (fd == -1) { @@ -2865,18 +2887,17 @@ static void memb_ring_id_store ( if (fd == -1) { log_printf (instance->totemsrp_log_level_warning, "Couldn't store new ring id %llx to stable storage (%s)\n", - commit_token->ring_id.seq, strerror (errno)); + instance->my_ring_id.seq, strerror (errno)); assert (0); return; } log_printf (instance->totemsrp_log_level_notice, - "Storing new sequence id for ring %llx\n", commit_token->ring_id.seq); + "Storing new sequence id for ring %llx\n", instance->my_ring_id.seq); //assert (fd > 0); - res = write (fd, &commit_token->ring_id.seq, sizeof (unsigned long long)); + res = write (fd, &instance->my_ring_id.seq, sizeof (unsigned long long)); assert (res == sizeof (unsigned long long)); + fsync (fd); close (fd); - memcpy (&instance->my_ring_id, &commit_token->ring_id, sizeof (struct memb_ring_id)); - instance->token_ring_id_seq = instance->my_ring_id.seq; } int totemsrp_callback_token_create ( @@ -3071,6 +3092,7 @@ static void fcc_token_update ( * Message Handlers */ +struct timeval tv_old; /* * message handler called when TOKEN message type received */ @@ -3098,9 +3120,10 @@ static int message_handler_orf_token ( timersub (&tv_current, &tv_old, &tv_diff); memcpy (&tv_old, &tv_current, sizeof (struct timeval)); - if ((((float)tv_diff.tv_usec) / 100.0) > 5.0) { - printf ("OTHERS %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0); - } + log_printf (instance->totemsrp_log_level_notice, + "Time since last token %0.4f ms\n", + (((float)tv_diff.tv_sec) * 1000) + ((float)tv_diff.tv_usec) + / 1000.0); #endif #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE @@ -3343,9 +3366,9 @@ static int message_handler_orf_token ( gettimeofday (&tv_current, NULL); timersub (&tv_current, &tv_old, &tv_diff); memcpy (&tv_old, &tv_current, sizeof (struct timeval)); - if ((((float)tv_diff.tv_usec) / 100.0) > 5.0) { - printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0); - } + log_printf (instance->totemsrp_log_level_notice, + "I held %0.4f ms\n", + ((float)tv_diff.tv_usec) / 1000.0); #endif if (instance->memb_state == MEMB_STATE_OPERATIONAL) { messages_deliver_to_app (instance, 0, @@ -3388,9 +3411,10 @@ static void messages_deliver_to_app ( struct sort_queue_item *sort_queue_item_p; unsigned int i; int res; - struct mcast *mcast; + struct mcast *mcast_in; + struct mcast mcast_header; unsigned int range = 0; - int endian_conversion_required = 0 ; + int endian_conversion_required; unsigned int my_high_delivered_stored = 0; @@ -3438,18 +3462,27 @@ static void messages_deliver_to_app ( sort_queue_item_p = ptr; - mcast = (struct mcast *)sort_queue_item_p->iovec[0].iov_base; - assert (mcast != (struct mcast *)0xdeadbeef); + mcast_in = sort_queue_item_p->iovec[0].iov_base; + assert (mcast_in != (struct mcast *)0xdeadbeef); + + endian_conversion_required = 0; + if (mcast_in->header.endian_detector != ENDIAN_LOCAL) { + endian_conversion_required = 1; + mcast_endian_convert (mcast_in, &mcast_header); + } else { + memcpy (&mcast_header, mcast_in, sizeof (struct mcast)); + } /* * Skip messages not originated in instance->my_deliver_memb */ if (skip && - memb_set_subset (&mcast->system_from, + memb_set_subset (&mcast_header.system_from, 1, instance->my_deliver_memb_list, instance->my_deliver_memb_entries) == 0) { - instance->my_high_delivered = my_high_delivered_stored + i; + + instance->my_high_delivered = my_high_delivered_stored + i; continue; } @@ -3459,12 +3492,7 @@ static void messages_deliver_to_app ( */ log_printf (instance->totemsrp_log_level_debug, "Delivering MCAST message with seq %x to pending delivery queue\n", - mcast->seq); - - if (mcast->header.endian_detector != ENDIAN_LOCAL) { - endian_conversion_required = 1; - mcast_endian_convert (mcast, mcast); - } + mcast_header.seq); /* * Message is locally originated multicast @@ -3472,7 +3500,7 @@ static void messages_deliver_to_app ( if (sort_queue_item_p->iov_len > 1 && sort_queue_item_p->iovec[0].iov_len == sizeof (struct mcast)) { instance->totemsrp_deliver_fn ( - mcast->header.nodeid, + mcast_header.header.nodeid, &sort_queue_item_p->iovec[1], sort_queue_item_p->iov_len - 1, endian_conversion_required); @@ -3481,7 +3509,7 @@ static void messages_deliver_to_app ( sort_queue_item_p->iovec[0].iov_base += sizeof (struct mcast); instance->totemsrp_deliver_fn ( - mcast->header.nodeid, + mcast_header.header.nodeid, sort_queue_item_p->iovec, sort_queue_item_p->iov_len, endian_conversion_required); @@ -3684,7 +3712,7 @@ static int memb_join_process ( struct totemsrp_instance *instance, struct memb_join *memb_join) { - unsigned char *commit_token_storage[32000]; + unsigned char *commit_token_storage[TOKEN_SIZE_MAX]; struct memb_commit_token *my_commit_token = (struct memb_commit_token *)commit_token_storage; struct srp_addr *proc_list; @@ -3795,7 +3823,8 @@ static void memb_commit_token_endian_convert (struct memb_commit_token *in, stru out->header.endian_detector = ENDIAN_LOCAL; out->header.nodeid = swab32 (in->header.nodeid); out->token_seq = swab32 (in->token_seq); - memb_ring_id_copy_endian_convert (&out->ring_id, &in->ring_id); + totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep); + out->ring_id.seq = swab64 (in->ring_id.seq); out->retrans_flg = swab32 (in->retrans_flg); out->memb_index = swab32 (in->memb_index); out->addr_entries = swab32 (in->addr_entries); @@ -3810,8 +3839,11 @@ static void memb_commit_token_endian_convert (struct memb_commit_token *in, stru */ if (in_memb_list[i].ring_id.rep.family != 0) { memb_ring_id_copy_endian_convert ( - &out_memb_list[i].ring_id, - &in_memb_list[i].ring_id); + &out_memb_list[i].ring_id, + &in_memb_list[i].ring_id); + + out_memb_list[i].ring_id.seq = + swab64 (in_memb_list[i].ring_id.seq); out_memb_list[i].aru = swab32 (in_memb_list[i].aru); out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered); out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg); @@ -3836,8 +3868,8 @@ static void orf_token_endian_convert (struct orf_token *in, struct orf_token *ou out->retrans_flg = swab32 (in->retrans_flg); out->rtr_list_entries = swab32 (in->rtr_list_entries); for (i = 0; i < out->rtr_list_entries; i++) { - memb_ring_id_copy_endian_convert(&out->rtr_list[i].ring_id, - &in->rtr_list[i].ring_id); + memb_ring_id_copy_endian_convert (&out->rtr_list[i].ring_id, + &in->rtr_list[i].ring_id); out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq); } } @@ -3849,7 +3881,7 @@ static void mcast_endian_convert (struct mcast *in, struct mcast *out) out->header.nodeid = swab32 (in->header.nodeid); out->seq = swab32 (in->seq); out->this_seqno = swab32 (in->this_seqno); - memb_ring_id_copy_endian_convert(&out->ring_id, &in->ring_id); + memb_ring_id_copy_endian_convert (&out->ring_id, &in->ring_id); out->node_id = swab32 (in->node_id); out->guarantee = swab32 (in->guarantee); srp_addr_copy_endian_convert (&out->system_from, &in->system_from); @@ -3945,6 +3977,9 @@ static int message_handler_memb_commit_token ( struct srp_addr *addr; struct memb_commit_token_memb_entry *memb_list; + log_printf (instance->totemsrp_log_level_debug, + "got commit token\n"); + if (endian_conversion_needed) { memb_commit_token = memb_commit_token_convert; memb_commit_token_endian_convert (msg, memb_commit_token); @@ -3954,16 +3989,6 @@ static int message_handler_memb_commit_token ( addr = (struct srp_addr *)memb_commit_token->end_of_commit_token; memb_list = (struct memb_commit_token_memb_entry *)(addr + memb_commit_token->addr_entries); - if (sq_lte_compare (memb_commit_token->token_seq, - instance->my_commit_token_seq)) { - /* - * discard token - */ - return (0); - } - instance->my_commit_token_seq = memb_commit_token->token_seq; - - #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) { return (0); @@ -3991,21 +4016,29 @@ static int message_handler_memb_commit_token ( break; case MEMB_STATE_COMMIT: -// if (memcmp (&memb_commit_token->ring_id, &instance->my_ring_id, -// sizeof (struct memb_ring_id)) == 0) { - if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq) { + /* + * If retransmitted commit tokens are sent on this ring + * filter them out and only enter recovery once the + * commit token has traversed the array. This is + * determined by : + * memb_commit_token->memb_index == memb_commit_token->addr_entries) { + */ + if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq && + memb_commit_token->memb_index == memb_commit_token->addr_entries) { memb_state_recovery_enter (instance, memb_commit_token); } break; case MEMB_STATE_RECOVERY: - log_printf (instance->totemsrp_log_level_notice, - "Sending initial ORF token\n"); + if (totemip_equal (&instance->my_id.addr[0], &instance->my_ring_id.rep)) { + log_printf (instance->totemsrp_log_level_notice, + "Sending initial ORF token\n"); - // TODO convert instead of initiate - orf_token_send_initial (instance); - reset_token_timeout (instance); // REVIEWED - reset_token_retransmit_timeout (instance); // REVIEWED + // TODO convert instead of initiate + orf_token_send_initial (instance); + reset_token_timeout (instance); // REVIEWED + reset_token_retransmit_timeout (instance); // REVIEWED + } break; } return (0);