diff --git a/exec/gmi.c b/exec/gmi.c index 84f738bf..19f48c13 100644 --- a/exec/gmi.c +++ b/exec/gmi.c @@ -77,6 +77,7 @@ #define MISSING_MCAST_WINDOW 64 #define TIMEOUT_STATE_GATHER 100 #define TIMEOUT_TOKEN 300 +#define TIMEOUT_TOKEN_RETRANSMIT 100 #define TIMEOUT_STATE_COMMIT 100 #define MAX_MEMBERS 16 #define HOLE_LIST_MAX MISSING_MCAST_WINDOW @@ -169,11 +170,17 @@ int gmi_fragment = 0; int gmi_pend_queue_priority = 0; +struct orf_token orf_token_retransmit; + +int gmi_token_seqid = 0; + /* * Timers */ poll_timer_handle timer_orf_token_timeout = 0; +poll_timer_handle timer_orf_token_retransmit_timeout = 0; + poll_timer_handle timer_form_token_timeout = 0; poll_timer_handle timer_memb_state_gather_timeout = 0; @@ -224,11 +231,10 @@ struct rtr_item { struct orf_token { struct message_header header; + int token_seqid; int group_arut; struct in_addr addr_arut; short int fcc; - short int brake; - struct in_addr brake_addr; struct rtr_item rtr_list[RTR_TOKEN_SIZE_MAX]; int rtr_list_entries; }; @@ -378,6 +384,7 @@ static int orf_token_mcast (struct orf_token *orf_token, static void queues_pend_delv_memb_new (void); static void calculate_group_arut (struct orf_token *orf_token); static int messages_free (int group_arut); +static int orf_token_send (struct orf_token *orf_token); struct message_handlers gmi_message_handlers = { 5, @@ -1361,6 +1368,8 @@ printf ("EVS STATE group arut %d gmi arut %d highest %d barrier %d starting grou gmi_adut_old = gmi_adut; gmi_adut = 0; + gmi_token_seqid = 0; + gmi_highest_seq_old = gmi_highest_seq; gmi_highest_seq = 0; last_group_arut = 0; @@ -1508,6 +1517,18 @@ static int orf_fcc_allowed (struct orf_token *token) return (allowed); } +/* + * Retransmit the regular token if no mcast or token has + * been received in retransmit token period retransmit + * the token to the next processor + */ + +void timer_function_token_retransmit_timeout (void *data) +{ + gmi_log_printf (gmi_log_level_warning, "Token being retransmitted.\n"); + orf_token_send (&orf_token_retransmit); +} + void timer_function_form_token_timeout (void *data) { gmi_log_printf (gmi_log_level_warning, "Token loss in FORM state\n"); @@ -1589,6 +1610,7 @@ static int orf_token_send ( /* * res not used here errors are handled by algorithm */ +// TODO do we need a test here of some sort gmi_last_seqid = orf_token->header.seqid; stats_sent += res; @@ -1602,6 +1624,7 @@ int orf_token_send_initial (void) orf_token.header.seqid = 0; orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN; + orf_token.token_seqid = 0; orf_token.group_arut = gmi_highest_seq; orf_token.addr_arut.s_addr = gmi_bound_to.sin_addr.s_addr; orf_token.fcc = 0; @@ -2187,8 +2210,25 @@ static int message_handler_orf_token ( int transmits_allowed; int starting_group_arut; +#ifdef TESTTOKENDROP + if ((random() % 500) == 0) { + printf ("randomly dropping token to test token retransmit.\n"); + return (0); + } +#endif orf_token = iovec[0].iov_base; + /* + * Already received this token, but it was retransmitted + * to this processor because the retransmit timer on a previous + * processor timed out, so ignore the token + */ + if (gmi_token_seqid > orf_token->token_seqid) { + return (0); + } + poll_timer_delete (*gmi_poll_handle, timer_orf_token_retransmit_timeout); + timer_orf_token_retransmit_timeout = 0; + #ifdef PRINT_STATS if (orf_token->header.seqid > 10000) { print_stats (); @@ -2256,6 +2296,16 @@ static int message_handler_orf_token ( calculate_group_arut (orf_token); } + /* + * Increment the token seqid and store for later retransmit + */ + orf_token->token_seqid += 1; + memcpy (&orf_token_retransmit, orf_token, sizeof (struct orf_token)); + poll_timer_add (*gmi_poll_handle, TIMEOUT_TOKEN_RETRANSMIT, 0, + timer_function_token_retransmit_timeout, + &timer_orf_token_retransmit_timeout); + + /* * Transmit orf_token to next member */ @@ -2641,6 +2691,8 @@ static int message_handler_mcast ( return (0); } + poll_timer_delete (*gmi_poll_handle, timer_orf_token_retransmit_timeout); + timer_orf_token_retransmit_timeout = 0; /* * Add mcast message to rtr queue if not already in rtr queue