diff --git a/exec/gmi.c b/exec/gmi.c index 29bb2ed0..3702a757 100644 --- a/exec/gmi.c +++ b/exec/gmi.c @@ -77,7 +77,7 @@ #define MISSING_MCAST_WINDOW 64 #define TIMEOUT_STATE_GATHER 100 #define TIMEOUT_TOKEN 100 -#define TIMEOUT_TOKEN_RETRANSMIT 100 +#define TIMEOUT_TOKEN_RETRANSMIT 50 #define TIMEOUT_STATE_COMMIT 100 #define MAX_MEMBERS 16 #define HOLE_LIST_MAX MISSING_MCAST_WINDOW @@ -385,7 +385,7 @@ static int orf_token_mcast (struct orf_token *orf_token, static void queues_pend_delv_memb_new (void); static void calculate_group_arut (struct orf_token *orf_token); static int messages_free (int group_arut); -static int orf_token_send (struct orf_token *orf_token); +static int orf_token_send (struct orf_token *orf_token, int reset_timer); struct message_handlers gmi_message_handlers = { 5, @@ -1528,7 +1528,8 @@ static int orf_fcc_allowed (struct orf_token *token) void timer_function_token_retransmit_timeout (void *data) { gmi_log_printf (gmi_log_level_warning, "Token being retransmitted.\n"); - orf_token_send (&orf_token_retransmit); + + orf_token_send (&orf_token_retransmit, 0); } void timer_function_form_token_timeout (void *data) @@ -1559,7 +1560,9 @@ void orf_timer_function_token_timeout (void *data) case MEMB_STATE_GATHER: case MEMB_STATE_COMMIT: gmi_log_printf (gmi_log_level_warning, "Token loss in GATHER or COMMIT.\n"); + memb_conf_id.rep.s_addr = memb_local_sockaddr_in.sin_addr.s_addr; memb_list_entries = 1; + break; case MEMB_STATE_EVS: @@ -1578,16 +1581,19 @@ void orf_timer_function_token_timeout (void *data) * Send orf_token to next member (requires orf_token) */ static int orf_token_send ( - struct orf_token *orf_token) + struct orf_token *orf_token, + int reset_timer) { struct msghdr msg_orf_token; struct iovec iovec_orf_token; int res; - poll_timer_delete (*gmi_poll_handle, timer_orf_token_timeout); + if (reset_timer) { + poll_timer_delete (*gmi_poll_handle, timer_orf_token_timeout); - poll_timer_add (*gmi_poll_handle, TIMEOUT_TOKEN, 0, - orf_timer_function_token_timeout, &timer_orf_token_timeout); + poll_timer_add (*gmi_poll_handle, TIMEOUT_TOKEN, 0, + orf_timer_function_token_timeout, &timer_orf_token_timeout); + } iovec_orf_token.iov_base = (char *)orf_token; iovec_orf_token.iov_len = sizeof (struct orf_token); @@ -1634,7 +1640,7 @@ int orf_token_send_initial (void) orf_token.rtr_list_entries = 0; memset (orf_token.rtr_list, 0, sizeof (struct rtr_item) * RTR_TOKEN_SIZE_MAX); - res = orf_token_send (&orf_token); + res = orf_token_send (&orf_token, 1); return (res); } @@ -1821,7 +1827,7 @@ static int memb_state_commit_enter (void) memb_commit_set_entries = 0; res = memb_join_send(); - poll_timer_delete (*gmi_poll_handle, timer_memb_state_gather_timeout); + poll_timer_delete (*gmi_poll_handle, timer_memb_state_gather_timeout); timer_memb_state_gather_timeout = 0; @@ -2087,6 +2093,13 @@ static int memb_form_token_send ( poll_timer_delete (*gmi_poll_handle, timer_orf_token_timeout); timer_orf_token_timeout = 0; + /* + * Delete retransmit timer since a new + * membership is in progress + */ + poll_timer_delete (*gmi_poll_handle, timer_orf_token_retransmit_timeout); + timer_orf_token_retransmit_timeout = 0; + poll_timer_delete (*gmi_poll_handle, timer_form_token_timeout); poll_timer_add (*gmi_poll_handle, TIMEOUT_TOKEN, 0, @@ -2212,7 +2225,7 @@ static int message_handler_orf_token ( int transmits_allowed; int starting_group_arut; -#ifdef TESTTOKENDROP +#ifdef TESTTOKENRETRANSMIT if ((random() % 500) == 0) { printf ("randomly dropping token to test token retransmit.\n"); return (0); @@ -2225,9 +2238,13 @@ static int message_handler_orf_token ( * to this processor because the retransmit timer on a previous * processor timed out, so ignore the token */ - if (gmi_token_seqid > orf_token->token_seqid) { + if (orf_token->token_seqid > 0 && gmi_token_seqid >= orf_token->token_seqid) { +printf ("already received token %d %d\n", orf_token->token_seqid, gmi_token_seqid); +//exit(1); return (0); } + gmi_token_seqid = orf_token->token_seqid; + poll_timer_delete (*gmi_poll_handle, timer_orf_token_retransmit_timeout); timer_orf_token_retransmit_timeout = 0; @@ -2241,6 +2258,14 @@ static int message_handler_orf_token ( gmi_log_printf (gmi_log_level_notice, "swallowing ORF token %d.\n", stats_orf_token); poll_timer_delete (*gmi_poll_handle, timer_orf_token_timeout); timer_orf_token_timeout = 0; + + /* + * Delete retransmit timer since a new + * membership is in progress + */ + poll_timer_delete (*gmi_poll_handle, timer_orf_token_retransmit_timeout); + timer_orf_token_retransmit_timeout = 0; + return (0); } @@ -2303,15 +2328,17 @@ static int message_handler_orf_token ( */ orf_token->token_seqid += 1; memcpy (&orf_token_retransmit, orf_token, sizeof (struct orf_token)); + + poll_timer_delete (*gmi_poll_handle, timer_orf_token_retransmit_timeout); + poll_timer_add (*gmi_poll_handle, TIMEOUT_TOKEN_RETRANSMIT, 0, timer_function_token_retransmit_timeout, &timer_orf_token_retransmit_timeout); - /* * Transmit orf_token to next member */ - orf_token_send (orf_token); + orf_token_send (orf_token, 1); return (0); } @@ -2377,7 +2404,7 @@ struct pend_delv *pend_delv_next_delivery_find (void) int i; /* - * Find first_delivery queue that is not mepty + * Find first_delivery queue that is not empty * this sets the first pend_delv */ for (i = 0; i < memb_list_entries_confchg; i++) { @@ -2453,6 +2480,7 @@ static int user_deliver () * the queue that should be delivered from next */ pend_delv = pend_delv_next_delivery_find (); + assert (pend_delv); // TODO this assertion fails sometimes //printf ("Delivering from queue %s\n", inet_ntoa (pend_delv->ip)); /* @@ -2641,6 +2669,10 @@ static void pending_queues_deliver (void) assert (mcast->source.s_addr != 0); pend_delv = pend_delv_find (mcast->source); + + if (pend_delv == 0) { + printf ("mcast source is %s\n", inet_ntoa (mcast->source)); + } assert (pend_delv != 0); assert (pend_delv->ip.s_addr != 0); @@ -2736,32 +2768,18 @@ static int message_handler_memb_attempt_join ( int iov_len, int bytes_received) { - int token_lost; int found; int i; gmi_log_printf (gmi_log_level_notice, "Got attempt join from %s\n", inet_ntoa (system_from->sin_addr)); - for (token_lost = 0, i = 0; i < memb_list_entries; i++) { - if (memb_list[i].sin_addr.s_addr == system_from->sin_addr.s_addr && - memb_conf_id.rep.s_addr != system_from->sin_addr.s_addr) { - gmi_log_printf (gmi_log_level_notice, "ATTEMPT JOIN, token lost, taking attempt join msg.\n"); - poll_timer_delete (*gmi_poll_handle, timer_orf_token_timeout); - timer_orf_token_timeout = 0; - memb_conf_id.rep.s_addr = memb_local_sockaddr_in.sin_addr.s_addr; - memb_list_entries = 1; - token_lost = 1; - break; - } - } - /* * Not representative */ - if (token_lost == 0 && - memb_conf_id.rep.s_addr != memb_local_sockaddr_in.sin_addr.s_addr) { + if (memb_conf_id.rep.s_addr != memb_local_sockaddr_in.sin_addr.s_addr) { - gmi_log_printf (gmi_log_level_notice, "not the rep for this ring, not handling attempt join.\n"); + gmi_log_printf (gmi_log_level_notice, "rep is %s, not handling attempt join.\n", + inet_ntoa (memb_conf_id.rep)); return (0); } @@ -2940,6 +2958,14 @@ printf ("Got membership form token\n"); */ poll_timer_delete (*gmi_poll_handle, timer_orf_token_timeout); timer_orf_token_timeout = 0; + + /* + * Delete retransmit timer since a new + * membership is in progress + */ + poll_timer_delete (*gmi_poll_handle, timer_orf_token_retransmit_timeout); + timer_orf_token_retransmit_timeout = 0; + /* * Find next member */ @@ -2974,6 +3000,8 @@ printf ("Got membership form token\n"); break; case MEMB_STATE_FORM: + gmi_token_seqid = 0; + memb_state = MEMB_STATE_EVS; memb_form_token_update_highest_seq (&memb_form_token);