diff --git a/exec/totemsrp.c b/exec/totemsrp.c index 07c08e57..2eb3f772 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -968,9 +968,11 @@ void deliver_messages_from_recovery_to_regular (void) void *ptr; struct mcast *mcast; +printf ("recovery to regular %d-%d\n", 1, my_aru); /* * Move messages from recovery to regular sort queue */ +// todo should i be initialized to 0 or 1 ? for (i = 1; i <= my_aru; i++) { res = sq_item_get (&recovery_sort_queue, i, &ptr); if (res != 0) { @@ -1016,6 +1018,8 @@ static void memb_state_operational_enter (void) deliver_messages_from_recovery_to_regular (); + printf ("Delivering to app %d to %d\n", + my_old_high_seq_delivered, my_high_ring_delivered); messages_deliver_to_app (0, &my_old_high_seq_delivered, my_high_ring_delivered); /* @@ -1071,6 +1075,8 @@ static void memb_state_operational_enter (void) my_failed_list_entries = 0; // TODO the recovery messages are leaked + my_old_high_seq_delivered = 0; + totemsrp_log_printf (totemsrp_log_level_notice, "entering OPERATIONAL state.\n"); memb_state = MEMB_STATE_OPERATIONAL; return; @@ -1273,7 +1279,9 @@ j++; my_seq_unchanged = 0; my_high_seq_received = 0; my_install_seq = 0; - my_old_high_seq_delivered = my_high_seq_delivered; + if (my_old_high_seq_delivered == 0) { + my_old_high_seq_delivered = my_high_seq_delivered; + } totemsrp_log_printf (totemsrp_log_level_notice, "entering RECOVERY state.\n"); reset_token_timeout (); // REVIEWED @@ -2424,7 +2432,14 @@ static void memb_ring_id_store ( fd = open (filename, O_WRONLY, 0777); if (fd == -1) { - printf ("Couldn't store the ring id %s\n", strerror (errno)); + fd = open (filename, O_CREAT|O_RDWR, 0777); + } + if (fd == -1) { + totemsrp_log_printf (totemsrp_log_level_notice, + "Couldn't store new ring id %llx to stable storage (%s)\n", + commit_token->ring_id.seq, strerror (errno)); + assert (0); + return; } totemsrp_log_printf (totemsrp_log_level_notice, "Storing new sequence id for ring %d\n", commit_token->ring_id.seq); @@ -2557,6 +2572,8 @@ static int message_handler_orf_token ( int forward_token; int mcasted; int last_aru; + int low_water; + #ifdef GIVEINFO struct timeval tv_current; struct timeval tv_diff; @@ -2713,8 +2730,12 @@ if (random () % 100 < 10) { * has recovered all messages it can recover * (ie: its retrans queue is empty) */ + low_water = my_aru; + if (low_water > my_last_aru) { + low_water = my_last_aru; + } if (queue_is_empty (&retrans_message_queue) == 0 || - my_aru != my_high_seq_received) { + low_water != my_high_seq_received) { if (token->retrans_flg == 0) { token->retrans_flg = 1; @@ -2724,8 +2745,10 @@ if (random () % 100 < 10) { if (token->retrans_flg == 1 && my_set_retrans_flg) { token->retrans_flg = 0; } -printf ("token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d\n", - token->retrans_flg, my_set_retrans_flg, queue_is_empty (&retrans_message_queue), my_retrans_flg_count); +printf ("token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, low_water %d aru %d\n", + token->retrans_flg, my_set_retrans_flg, + queue_is_empty (&retrans_message_queue), my_retrans_flg_count, + low_water, token->aru); if (token->retrans_flg == 0) { my_retrans_flg_count += 1; } else { @@ -2744,10 +2767,14 @@ my_high_seq_received); } if (my_retrans_flg_count >= 3 && token->aru >= my_install_seq) { my_rotation_counter += 1; + } else { + my_rotation_counter = 0; } if (my_rotation_counter == 2) { -printf ("retrans flag count %d token aru %d install seq %d aru %d %d\n", my_retrans_flg_count, - token->aru, my_install_seq, my_aru, token->seq); + printf ("retrans flag count %d token aru %d install seq %d aru %d %d\n", + my_retrans_flg_count, token->aru, my_install_seq, + my_aru, token->seq); + memb_state_operational_enter (); my_rotation_counter = 0; my_retrans_flg_count = 0; @@ -3236,7 +3263,9 @@ if (random()%100 < 10) { break; case MEMB_STATE_COMMIT: - if (memb_commit_token->ring_id.seq == my_ring_id.seq) { + if (memcmp (&memb_commit_token->ring_id, &my_ring_id, + sizeof (struct memb_ring_id)) == 0) { +// if (memb_commit_token->ring_id.seq == my_ring_id.seq) { memb_state_recovery_enter (memb_commit_token); } break;