diff --git a/exec/Makefile b/exec/Makefile index acd3492b..33ac0166 100644 --- a/exec/Makefile +++ b/exec/Makefile @@ -29,13 +29,13 @@ # THE POSSIBILITY OF SUCH DAMAGE. # Production mode flags -#CFLAGS = -O3 -Wall -fomit-frame-pointer -#LDFLAGS = -lpthread +CFLAGS = -O3 -Wall -fomit-frame-pointer +LDFLAGS = -lpthread # Debug mode flags -CFLAGS = -g -Wall +#CFLAGS = -g -Wall ##-DDEBUG -LDFLAGS = -g -lpthread +#LDFLAGS = -g -lpthread # Profile mode flags #CFLAGS = -O3 -pg diff --git a/exec/print.c b/exec/print.c index 2472a5be..7e9b9bf7 100644 --- a/exec/print.c +++ b/exec/print.c @@ -142,8 +142,8 @@ void log_syslog (char *log_string) { void internal_log_printf (int logclass, char *string, ...) { va_list ap; - char newstring[1024]; - char log_string[1024]; + char newstring[4096]; + char log_string[4096]; char char_time[512]; time_t curr_time; int level; diff --git a/exec/totemnet.c b/exec/totemnet.c index a0086f67..710723db 100644 --- a/exec/totemnet.c +++ b/exec/totemnet.c @@ -1026,8 +1026,6 @@ static int totemnet_build_sockets ( sockaddr_in_test.sin_family = AF_INET; sockaddr_in_test.sin_addr.s_addr = sockaddr_mcast->sin_addr.s_addr; sockaddr_in_test.sin_port = sockaddr_mcast->sin_port; - printf ("binding to %s\n", inet_ntoa (sockaddr_in_test.sin_addr)); - printf ("%d\n", sockaddr_in_test.sin_port); res = bind (sockets->mcast, (struct sockaddr *)&sockaddr_in_test, sizeof (struct sockaddr_in)); if (res == -1) { @@ -1051,8 +1049,6 @@ static int totemnet_build_sockets ( sockaddr_in_test.sin_family = AF_INET; sockaddr_in_test.sin_addr.s_addr = bound_to->sin_addr.s_addr; sockaddr_in_test.sin_port = sockaddr_mcast->sin_port; - printf ("binding to %s\n", inet_ntoa (sockaddr_in_test.sin_addr)); - printf ("%d\n", sockaddr_in_test.sin_port); res = bind (sockets->token, (struct sockaddr *)&sockaddr_in_test, sizeof (struct sockaddr_in)); if (res == -1) { @@ -1397,5 +1393,4 @@ extern void totemnet_net_mtu_adjust (struct totem_config *totem_config) } else { totem_config->net_mtu -= UDPIP_HEADER_SIZE; } -printf ("adjusted frame size is %d\n", totem_config->net_mtu); } diff --git a/exec/totemrrp.c b/exec/totemrrp.c index fdc16c28..3f3304cd 100644 --- a/exec/totemrrp.c +++ b/exec/totemrrp.c @@ -366,8 +366,6 @@ error_exit: static void timer_function_active_token (void *context) { struct active_instance *instance = (struct active_instance *)context; - - printf ("active instance %p\n", instance); } diff --git a/exec/totemsrp.c b/exec/totemsrp.c index 7352dc47..2b38224d 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -93,6 +93,25 @@ #define RETRANSMIT_ENTRIES_MAX 30 #define MISSING_MCAST_WINDOW 128 +/* + * Rollover handling: + * SEQNO_START_MSG is the starting sequence number after a new configuration + * This should remain zero, unless testing overflow in which case + * 0x7ffff000 and 0xfffff000 are good starting values. + * + * SEQNO_START_TOKEN is the starting sequence number after a new configuration + * for a token. This should remain zero, unless testing overflow in which + * case 07fffff00 or 0xffffff00 are good starting values. + * + * SEQNO_START_MSG is the starting sequence number after a new configuration + * This should remain zero, unless testing overflow in which case + * 0x7ffff000 and 0xfffff000 are good values to start with + */ +#define SEQNO_START_MSG 0x0 +#define SEQNO_START_TOKEN 0x0 +//#define SEQNO_START_MSG 0xfffffe00 +//#define SEQNO_START_TOKEN 0xfffffe00 + /* * we compare incoming messages to determine if their endian is * different - if so convert them @@ -136,13 +155,12 @@ struct totemsrp_socket { struct message_header { char type; char encapsulated; -// unsigned short filler; unsigned short endian_detector; } __attribute__((packed)); struct mcast { struct message_header header; - int seq; + unsigned int seq; int this_seqno; struct memb_ring_id ring_id; struct in_addr source; @@ -161,14 +179,14 @@ struct mcast { struct rtr_item { struct memb_ring_id ring_id; - int seq; + unsigned int seq; }__attribute__((packed)); struct orf_token { struct message_header header; - int seq; - int token_seq; - int aru; + unsigned int seq; + unsigned int token_seq; + unsigned int aru; struct in_addr aru_addr; struct memb_ring_id ring_id; short int fcc; @@ -198,14 +216,14 @@ struct token_hold_cancel { struct memb_commit_token_memb_entry { struct memb_ring_id ring_id; - int aru; - int high_delivered; + unsigned int aru; + unsigned int high_delivered; int received_flg; }__attribute__((packed)); struct memb_commit_token { struct message_header header; - int token_seq; + unsigned int token_seq; struct memb_ring_id ring_id; unsigned int retrans_flg; int memb_index; @@ -287,15 +305,15 @@ struct totemsrp_instance { int my_merge_detect_timeout_outstanding; - int my_last_aru; + unsigned int my_last_aru; int my_seq_unchanged; int my_received_flg; - int my_high_seq_received; + unsigned int my_high_seq_received; - int my_install_seq; + unsigned int my_install_seq; int my_rotation_counter; @@ -321,9 +339,9 @@ struct totemsrp_instance { /* * Received up to and including */ - int my_aru; + unsigned int my_aru; - int my_high_delivered; + unsigned int my_high_delivered; struct list_head token_callback_received_listhead; @@ -333,7 +351,7 @@ struct totemsrp_instance { int orf_token_retransmit_size; - int my_token_seq; + unsigned int my_token_seq; /* * Timers @@ -407,21 +425,19 @@ struct totemsrp_instance { unsigned long long token_ring_id_seq; - int last_released; + unsigned int last_released; - int set_aru; - - int totemsrp_brake; + unsigned int set_aru; int old_ring_state_saved; int old_ring_state_aru; - int old_ring_state_high_seq_received; + unsigned int old_ring_state_high_seq_received; int ring_saved; - int my_last_seq; + unsigned int my_last_seq; struct timeval tv_old; @@ -491,10 +507,10 @@ static void memb_ring_id_create_or_load (struct totemsrp_instance *, struct memb static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type); static void memb_state_gather_enter (struct totemsrp_instance *instance); -static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, int end_point); +static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point); static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken, int fcc_mcasts_allowed, struct in_addr *system_from); -static int messages_free (struct totemsrp_instance *instance, int token_aru); +static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru); static void memb_ring_id_store (struct totemsrp_instance *instance, struct memb_commit_token *commit_token); static void memb_state_commit_token_update (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token); @@ -552,13 +568,19 @@ void totemsrp_instance_initialize (struct totemsrp_instance *instance) instance->my_received_flg = 1; - instance->my_token_seq = -1; + instance->my_token_seq = SEQNO_START_TOKEN - 1; instance->orf_token_retransmit = malloc (15000); instance->memb_state = MEMB_STATE_OPERATIONAL; instance->set_aru = -1; + + instance->my_aru = SEQNO_START_MSG; + + instance->my_high_seq_received = SEQNO_START_MSG; + + instance->my_high_delivered = SEQNO_START_MSG; } void main_token_seqid_get ( @@ -748,7 +770,6 @@ void totemsrp_finalize ( saHandleInstancePut (&totemsrp_instance_database, handle); } - /* * Set operations for use by the membership algorithm */ @@ -1037,7 +1058,7 @@ static void old_ring_state_save (struct totemsrp_instance *instance) instance->old_ring_state_aru = instance->my_aru; instance->old_ring_state_high_seq_received = instance->my_high_seq_received; instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, - "Saving state aru %d high seq recieved %d\n", + "Saving state aru %x high seq received %x\n", instance->my_aru, instance->my_high_seq_received); } } @@ -1063,7 +1084,7 @@ static void ring_state_restore (struct totemsrp_instance *instance) instance->my_aru = instance->old_ring_state_aru; instance->my_high_seq_received = instance->old_ring_state_high_seq_received; instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, - "Restoring instance->my_aru %d my high seq received %d\n", + "Restoring instance->my_aru %x my high seq received %x\n", instance->my_aru, instance->my_high_seq_received); } } @@ -1222,26 +1243,28 @@ static void memb_timer_function_gather_consensus_timeout (void *data) static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance) { - int i; + unsigned int i; struct sort_queue_item *recovery_message_item; struct sort_queue_item regular_message_item; + unsigned int range = 0; int res; void *ptr; struct mcast *mcast; instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, - "recovery to regular %d-%d\n", 1, instance->my_aru); + "recovery to regular %x-%x\n", SEQNO_START_MSG + 1, instance->my_aru); + range = instance->my_aru - SEQNO_START_MSG; /* * Move messages from recovery to regular sort queue */ // todo should i be initialized to 0 or 1 ? - for (i = 1; i <= instance->my_aru; i++) { - res = sq_item_get (&instance->recovery_sort_queue, i, &ptr); + for (i = 1; i <= range; i++) { + res = sq_item_get (&instance->recovery_sort_queue, + i + SEQNO_START_MSG, &ptr); if (res != 0) { continue; } -printf ("Transferring message with seq id %d\n", i); recovery_message_item = (struct sort_queue_item *)ptr; /* @@ -1254,9 +1277,6 @@ printf ("Transferring message with seq id %d\n", i); sizeof (struct iovec) * recovery_message_item->iov_len); } else { mcast = recovery_message_item->iovec[0].iov_base; - instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, - "encapsulated is %d\n", - mcast->header.encapsulated); if (mcast->header.encapsulated == 1) { /* * Message is a recovery message encapsulated @@ -1269,7 +1289,6 @@ printf ("Transferring message with seq id %d\n", i); regular_message_item.iov_len = 1; mcast = regular_message_item.iovec[0].iov_base; } else { -printf ("not encapsulated\n"); continue; /* TODO this case shouldn't happen */ /* * Message is originated on new ring and not @@ -1294,21 +1313,18 @@ printf ("not encapsulated\n"); if (memcmp (&instance->my_old_ring_id, &mcast->ring_id, sizeof (struct memb_ring_id)) == 0) { - instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, - "adding msg with seq no %d\n", mcast->seq, mcast->this_seqno); - regular_message_item.iov_len = recovery_message_item->iov_len; res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq); if (res == 0) { sq_item_add (&instance->regular_sort_queue, ®ular_message_item, mcast->seq); - if (mcast->seq > instance->old_ring_state_high_seq_received) { + if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) { instance->old_ring_state_high_seq_received = mcast->seq; } } } else { instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, - "-not adding msg with seq no %d\n", mcast->seq); + "-not adding msg with seq no %x\n", mcast->seq); } } } @@ -1322,14 +1338,14 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance) int joined_list_entries = 0; struct in_addr left_list[PROCESSOR_COUNT_MAX]; int left_list_entries = 0; - int aru_save; + unsigned int aru_save; old_ring_state_reset (instance); ring_reset (instance); deliver_messages_from_recovery_to_regular (instance); instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, - "Delivering to app %d to %d\n", + "Delivering to app %x to %x\n", instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received); aru_save = instance->my_aru; @@ -1384,7 +1400,8 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance) * into the regular sort queue. */ sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue); - instance->my_last_aru = 0; + instance->my_last_aru = SEQNO_START_MSG; + sq_items_release (&instance->regular_sort_queue, SEQNO_START_MSG - 1); instance->my_proc_list_entries = instance->my_new_memb_entries; memcpy (instance->my_proc_list, instance->my_new_memb_list, @@ -1494,18 +1511,23 @@ static void memb_state_recovery_enter ( int local_received_flg = 1; #endif unsigned int low_ring_aru; + unsigned int range = 0; unsigned int messages_originated = 0; + char is_originated[4096]; + char not_originated[4096]; + char seqno_string_hex[10]; instance->my_high_ring_delivered = 0; - sq_reinit (&instance->recovery_sort_queue, 0); + sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG); queue_reinit (&instance->retrans_message_queue); low_ring_aru = instance->old_ring_state_high_seq_received; memb_state_commit_token_send (instance, commit_token); -instance->my_token_seq = -1; + instance->my_token_seq = SEQNO_START_TOKEN - 1; + /* * Build regular configuration */ @@ -1534,7 +1556,7 @@ instance->my_token_seq = -1; inet_ntoa (commit_token->memb_list[i].ring_id.rep)); instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, - "aru %d high delivered %d received flag %d\n", + "aru %x high delivered %x received flag %d\n", commit_token->memb_list[i].aru, commit_token->memb_list[i].high_delivered, commit_token->memb_list[i].received_flg); @@ -1565,12 +1587,6 @@ instance->my_token_seq = -1; * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership */ for (i = 0; i < commit_token->addr_entries; i++) { -printf ("comparing %d old ring %s.%lld with commit ring %s.%lld.\n", i, - inet_ntoa (instance->my_old_ring_id.rep), instance->my_old_ring_id.seq, - inet_ntoa (commit_token->memb_list[i].ring_id.rep), - commit_token->memb_list[i].ring_id.seq); -printf ("memb set subset %d\n", - memb_set_subset (&instance->my_new_memb_list[i], 1, instance->my_deliver_memb_list, instance->my_deliver_memb_entries)); if (memb_set_subset (&instance->my_new_memb_list[i], 1, instance->my_deliver_memb_list, instance->my_deliver_memb_entries) && @@ -1580,36 +1596,41 @@ printf ("memb set subset %d\n", sizeof (struct memb_ring_id)) == 0) { if (low_ring_aru == 0 || - low_ring_aru > commit_token->memb_list[i].aru) { + sq_lt_compare (commit_token->memb_list[i].aru, low_ring_aru)) { low_ring_aru = commit_token->memb_list[i].aru; } - if (instance->my_high_ring_delivered < commit_token->memb_list[i].high_delivered) { + if (sq_lt_compare (instance->my_high_ring_delivered, commit_token->memb_list[i].high_delivered)) { instance->my_high_ring_delivered = commit_token->memb_list[i].high_delivered; } } } - assert (low_ring_aru != 0xffffffff); /* - * Cpy all old ring messages to instance->retrans_message_queue + * Copy all old ring messages to instance->retrans_message_queue */ instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, - "copying all old ring messages from %d-%d.\n", + "copying all old ring messages from %x-%x.\n", low_ring_aru + 1, instance->old_ring_state_high_seq_received); + strcpy (not_originated, "Not Originated for recovery: "); + strcpy (is_originated, "Originated for recovery: "); - for (i = low_ring_aru + 1; i <= instance->old_ring_state_high_seq_received; i++) { + range = instance->old_ring_state_high_seq_received - low_ring_aru; + assert (range < 1024); + for (i = 1; i <= range; i++) { struct sort_queue_item *sort_queue_item; struct message_item message_item; void *ptr; int res; - res = sq_item_get (&instance->regular_sort_queue, i, &ptr); + sprintf (seqno_string_hex, "%x ", low_ring_aru + i); + res = sq_item_get (&instance->regular_sort_queue, + low_ring_aru + i, &ptr); if (res != 0) { -printf ("-not copying %d-\n", i); + strcat (not_originated, seqno_string_hex); continue; } -printf ("copying %d\n", i); + strcat (is_originated, seqno_string_hex); sort_queue_item = ptr; assert (sort_queue_item->iov_len > 0); assert (sort_queue_item->iov_len <= MAXIOVS); @@ -1630,15 +1651,19 @@ printf ("copying %d\n", i); } instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, "Originated %d messages in RECOVERY.\n", messages_originated); + strcat (not_originated, "\n"); + strcat (is_originated, "\n"); + instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, is_originated); + instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, not_originated); // } - instance->my_aru = 0; + instance->my_aru = SEQNO_START_MSG; instance->my_aru_count = 0; instance->my_seq_unchanged = 0; - instance->my_high_seq_received = 0; - instance->my_install_seq = 0; + instance->my_high_seq_received = SEQNO_START_MSG; + instance->my_install_seq = SEQNO_START_MSG; + instance->last_released = SEQNO_START_MSG; - instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, "entering RECOVERY state.\n"); reset_token_timeout (instance); // REVIEWED reset_token_retransmit_timeout (instance); // REVIEWED @@ -1797,13 +1822,18 @@ static int orf_token_remcast ( struct sq *sort_queue; -//TODO printf ("remcasting %d\n", seq); if (instance->memb_state == MEMB_STATE_RECOVERY) { sort_queue = &instance->recovery_sort_queue; } else { sort_queue = &instance->regular_sort_queue; } + res = sq_in_range (sort_queue, seq); + if (res == 0) { +printf ("sq not in range\n"); + return (-1); + } + /* * Get RTR item at seq, if not available, return */ @@ -1825,54 +1855,73 @@ static int orf_token_remcast ( /* * Free all freeable messages from ring */ -static int messages_free ( +static void messages_free ( struct totemsrp_instance *instance, - int token_aru) + unsigned int token_aru) { struct sort_queue_item *regular_message; - int i, j; + unsigned int i, j; int res; int log_release = 0; - int release_to; + unsigned int release_to; + unsigned int range = 0; + +//printf ("aru %x last aru %x my high delivered %x last releaed %x\n", +// token_aru, instance->my_last_aru, instance->my_high_delivered, instance->last_released); release_to = token_aru; - if (release_to > instance->my_last_aru) { + if (sq_lt_compare (instance->my_last_aru, release_to)) { release_to = instance->my_last_aru; } - if (release_to > instance->my_high_delivered) { + if (sq_lt_compare (instance->my_high_delivered, release_to)) { release_to = instance->my_high_delivered; } + /* + * Ensure we dont try release before an already released point + */ + if (sq_lt_compare (release_to, instance->last_released)) { + return; + } + + range = release_to - instance->last_released; + assert (range < 1024); + /* * Release retransmit list items if group aru indicates they are transmitted */ - for (i = instance->last_released; i <= release_to; i++) { + for (i = 1; i <= range; i++) { void *ptr; - res = sq_item_get (&instance->regular_sort_queue, i, &ptr); + + res = sq_item_get (&instance->regular_sort_queue, + instance->last_released + i, &ptr); if (res == 0) { regular_message = ptr; for (j = 0; j < regular_message->iov_len; j++) { free (regular_message->iovec[j].iov_base); } } - sq_items_release (&instance->regular_sort_queue, i); - instance->last_released = i + 1; + sq_items_release (&instance->regular_sort_queue, + instance->last_released + i); + log_release = 1; } + instance->last_released += range; if (log_release) { instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, - "releasing messages up to and including %d\n", release_to); + "releasing messages up to and including %x\n", release_to); } - return (0); } static void update_aru ( struct totemsrp_instance *instance) { - int i; + unsigned int i; int res; struct sq *sort_queue; + unsigned int range; + unsigned int my_aru_saved = 0; if (instance->memb_state == MEMB_STATE_RECOVERY) { sort_queue = &instance->recovery_sort_queue; @@ -1880,18 +1929,27 @@ static void update_aru ( sort_queue = &instance->regular_sort_queue; } - for (i = instance->my_aru + 1; i <= instance->my_high_seq_received; i++) { + range = instance->my_high_seq_received - instance->my_aru; + if (range > 1024) { + return; + } + + my_aru_saved = instance->my_aru; + for (i = 1; i <= range; i++) { + void *ptr; - res = sq_item_get (sort_queue, i, &ptr); + res = sq_item_get (sort_queue, my_aru_saved + i, &ptr); /* - * If hole, stop assembly + * If hole, stop updating aru */ if (res != 0) { break; } - instance->my_aru = i; } + instance->my_aru += i - 1; + + // instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, // "setting received flag to FALSE %d %d\n", // instance->my_aru, instance->my_high_seq_received); @@ -1944,6 +2002,7 @@ static int orf_token_mcast ( token->seq); return (0); } + message_item->mcast->seq = ++token->seq; message_item->mcast->this_seqno = instance->global_seqno++; @@ -1988,7 +2047,7 @@ static int orf_token_mcast ( assert (instance->fcc_mcast_current < 100); /* - * If messages mcasted, deliver any new messages to totemg + * If messages mcasted, deliver any new messages to totempg */ instance->my_high_seq_received = token->seq; @@ -2008,12 +2067,15 @@ static int orf_token_rtr ( struct orf_token *orf_token, int *fcc_allowed) { - int res; - int i, j; - int found; - int total_entries; + unsigned int res; + unsigned int i, j; + unsigned int found; + unsigned int total_entries; struct sq *sort_queue; struct rtr_item *rtr_list; + unsigned int range = 0; + char retransmit_msg[1024]; + char value[64]; if (instance->memb_state == MEMB_STATE_RECOVERY) { sort_queue = &instance->recovery_sort_queue; @@ -2022,14 +2084,17 @@ static int orf_token_rtr ( } rtr_list = &orf_token->rtr_list[0]; + strcpy (retransmit_msg, "Retransmit List: "); if (orf_token->rtr_list_entries) { instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, "Retransmit List %d\n", orf_token->rtr_list_entries); for (i = 0; i < orf_token->rtr_list_entries; i++) { - instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, - "%d ", rtr_list[i].seq); + sprintf (value, "%x ", rtr_list[i].seq); + strcat (retransmit_msg, value); } - instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, "\n"); + strcat (retransmit_msg, "\n"); + instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, + "%s", retransmit_msg); } total_entries = orf_token->rtr_list_entries; @@ -2051,7 +2116,6 @@ static int orf_token_rtr ( continue; } - assert (rtr_list[i].seq > 0); res = orf_token_remcast (instance, rtr_list[i].seq); if (res == 0) { /* @@ -2069,32 +2133,36 @@ static int orf_token_rtr ( } *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current - 1; -#ifdef COMPILE_OUT -for (i = 0; i < orf_token->rtr_list_entries; i++) { - assert (rtr_list_old[index_old].seq != -1); -} -#endif - /* * Add messages to retransmit to RTR list * but only retry if there is room in the retransmit list */ - for (i = instance->my_aru + 1; - orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX && - i <= instance->my_high_seq_received; - i++) { +//printf ("high seq %x aru %x\n", instance->my_high_seq_received, instance->my_aru); + range = instance->my_high_seq_received - instance->my_aru; + assert (range < 100000); + + for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) && + (i <= range); i++) { + + /* + * Ensure message is within the sort queue range + */ + res = sq_in_range (sort_queue, instance->my_aru + i); + if (res == 0) { + break; + } /* * Find if a message is missing from this processor */ - res = sq_item_inuse (sort_queue, i); + res = sq_item_inuse (sort_queue, instance->my_aru + i); if (res == 0) { /* * Determine if missing message is already in retransmit list */ found = 0; for (j = 0; j < orf_token->rtr_list_entries; j++) { - if (i == rtr_list[j].seq) { + if (instance->my_aru + i == rtr_list[j].seq) { found = 1; } } @@ -2104,7 +2172,7 @@ for (i = 0; i < orf_token->rtr_list_entries; i++) { */ memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); - rtr_list[orf_token->rtr_list_entries].seq = i; + rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i; orf_token->rtr_list_entries++; } } @@ -2241,7 +2309,8 @@ static int orf_token_send_initial (struct totemsrp_instance *instance) orf_token.header.endian_detector = ENDIAN_LOCAL; orf_token.header.encapsulated = 0; orf_token.seq = 0; - orf_token.token_seq = 0; + orf_token.seq = SEQNO_START_MSG; + orf_token.token_seq = SEQNO_START_TOKEN; orf_token.retrans_flg = 1; instance->my_set_retrans_flg = 1; /* @@ -2254,7 +2323,7 @@ static int orf_token_send_initial (struct totemsrp_instance *instance) */ orf_token.aru = 0; -// orf_token.aru_addr.s_addr = 0;//instance->my_id.sin_addr.s_addr; + orf_token.aru = SEQNO_START_MSG - 1; orf_token.aru_addr.s_addr = instance->my_id.sin_addr.s_addr; memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); orf_token.fcc = 0; @@ -2611,8 +2680,8 @@ static int message_handler_orf_token ( int transmits_allowed; int forward_token; int mcasted; - int last_aru; - int low_water; + unsigned int last_aru; + unsigned int low_water; #ifdef GIVEINFO struct timeval tv_current; @@ -2628,7 +2697,7 @@ printf ("OTHERS %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0); #endif #ifdef RANDOM_DROP -if (random () % 100 < 10) { +if (random()%100 < 10) { return (0); } #endif @@ -2647,9 +2716,6 @@ if (random () % 100 < 10) { instance->my_last_seq = token_ref->seq; -// assert (msg_len >= sizeof (struct orf_token)); -// assert (msg_len == sizeof (struct orf_token) + -// (sizeof (struct rtr_item) * token_ref->rtr_list_entries)); /* * Make copy of token and retransmit list in case we have * to flush incoming messages from the kernel queue @@ -2728,7 +2794,7 @@ if (random () % 100 < 10) { /* * Discard retransmitted tokens */ - if (instance->my_token_seq >= token->token_seq) { + if (sq_lte_compare (token->token_seq, instance->my_token_seq)) { /* * If this processor receives a retransmitted token, it is sure * the previous processor is still alive. As a result, it can @@ -2751,11 +2817,12 @@ if (random () % 100 < 10) { transmits_allowed = TRANSMITS_ALLOWED; mcasted = orf_token_rtr (instance, token, &transmits_allowed); - if ((last_aru + MISSING_MCAST_WINDOW) < token->seq) { + if (sq_lt_compare (instance->last_released + MISSING_MCAST_WINDOW, token->seq + TRANSMITS_ALLOWED)) { transmits_allowed = 0; +printf ("zero \n"); } mcasted = orf_token_mcast (instance, token, transmits_allowed, system_from); - if (instance->my_aru < token->aru || + if (sq_lt_compare (instance->my_aru, token->aru) || instance->my_id.sin_addr.s_addr == token->aru_addr.s_addr || token->aru_addr.s_addr == 0) { @@ -2796,7 +2863,7 @@ printf ("FAILED TO RECEIVE\n"); * (ie: its retrans queue is empty) */ low_water = instance->my_aru; - if (low_water > last_aru) { + if (sq_lt_compare (last_aru, low_water)) { low_water = last_aru; } // TODO is this code right @@ -2812,7 +2879,7 @@ printf ("FAILED TO RECEIVE\n"); token->retrans_flg = 0; } instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, - "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, low_water %d aru %d\n", + "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, low_water %x aru %x\n", token->retrans_flg, instance->my_set_retrans_flg, queue_is_empty (&instance->retrans_message_queue), instance->my_retrans_flg_count, low_water, token->aru); @@ -2825,7 +2892,7 @@ printf ("FAILED TO RECEIVE\n"); instance->my_install_seq = token->seq; } instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, - "install seq %d aru %d high seq received %d\n", + "install seq %x aru %x high seq received %x\n", instance->my_install_seq, instance->my_aru, instance->my_high_seq_received); if (instance->my_retrans_flg_count >= 2 && instance->my_aru >= instance->my_install_seq && instance->my_received_flg == 0) { instance->my_received_flg = 1; @@ -2840,7 +2907,7 @@ printf ("FAILED TO RECEIVE\n"); } if (instance->my_rotation_counter == 2) { instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, - "retrans flag count %d token aru %d install seq %d aru %d %d\n", + "retrans flag count %x token aru %x install seq %x aru %x %x\n", instance->my_retrans_flg_count, token->aru, instance->my_install_seq, instance->my_aru, token->seq); @@ -2862,7 +2929,8 @@ printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0); } #endif if (instance->memb_state == MEMB_STATE_OPERATIONAL) { - messages_deliver_to_app (instance, 0, instance->my_high_seq_received); + messages_deliver_to_app (instance, 0, + instance->my_high_seq_received); } /* @@ -2896,37 +2964,56 @@ printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0); static void messages_deliver_to_app ( struct totemsrp_instance *instance, int skip, - int end_point) + unsigned int end_point) { - struct sort_queue_item *sort_queue_item_p; - int i; - int res; - struct mcast *mcast; + struct sort_queue_item *sort_queue_item_p; + unsigned int i; + int res; + struct mcast *mcast; + unsigned int range = 0; + unsigned int my_high_delivered_stored = 0; instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, - "Delivering %d to %d\n", instance->my_high_delivered + 1, + "Delivering %x to %x\n", instance->my_high_delivered, end_point); + range = end_point - instance->my_high_delivered; + + assert (range < 10240); + my_high_delivered_stored = instance->my_high_delivered; + /* * Deliver messages in order from rtr queue to pending delivery queue */ - for (i = instance->my_high_delivered + 1; i <= end_point; i++) { + for (i = 1; i <= range; i++) { + void *ptr = 0; - res = sq_item_get (&instance->regular_sort_queue, i, &ptr); - if (res != 0 && skip) { -printf ("-skipping %d-\n", i); - instance->my_high_delivered = i; - continue; + /* + * If out of range of sort queue, stop assembly + */ + res = sq_in_range (&instance->regular_sort_queue, + my_high_delivered_stored + i); + if (res == 0) { + break; } + res = sq_item_get (&instance->regular_sort_queue, + my_high_delivered_stored + i, &ptr); /* * If hole, stop assembly */ - if (res != 0) { + if (res != 0 && skip == 0) { break; } + instance->my_high_delivered = my_high_delivered_stored + i; + + if (res != 0) { + continue; + + } + sort_queue_item_p = ptr; mcast = sort_queue_item_p->iovec[0].iov_base; @@ -2943,18 +3030,16 @@ printf ("-skipping %d-\n", i); 1, instance->my_deliver_memb_list, instance->my_deliver_memb_entries) == 0) { + instance->my_high_delivered = my_high_delivered_stored + i; -printf ("-skipping %d - wrong ip", i); - instance->my_high_delivered = i; continue; } - instance->my_high_delivered = i; /* * Message found */ instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, - "Delivering MCAST message with seq %d to pending delivery queue\n", + "Delivering MCAST message with seq %x to pending delivery queue\n", mcast->seq); /* @@ -3006,17 +3091,25 @@ static int message_handler_mcast ( struct sq *sort_queue; struct mcast mcast_header; + if (endian_conversion_needed) { mcast_endian_convert (msg, &mcast_header); } else { memcpy (&mcast_header, msg, sizeof (struct mcast)); } +/* if (mcast_header.header.encapsulated == 1) { sort_queue = &instance->recovery_sort_queue; } else { sort_queue = &instance->regular_sort_queue; } +*/ + if (instance->memb_state == MEMB_STATE_RECOVERY) { + sort_queue = &instance->recovery_sort_queue; + } else { + sort_queue = &instance->regular_sort_queue; + } assert (msg_len < FRAME_SIZE_MAX); #ifdef RANDOM_DROP if (random()%100 < 50) { @@ -3065,16 +3158,17 @@ if (random()%100 < 50) { } instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, - "Received ringid(%s:%lld) seq %d\n", + "Received ringid(%s:%lld) seq %x\n", inet_ntoa (mcast_header.ring_id.rep), mcast_header.ring_id.seq, mcast_header.seq); + /* * Add mcast message to rtr queue if not already in rtr queue * otherwise free io vectors */ if (msg_len > 0 && msg_len < FRAME_SIZE_MAX && - instance->my_aru < mcast_header.seq && + sq_in_range (sort_queue, mcast_header.seq) && sq_item_inuse (sort_queue, mcast_header.seq) == 0) { /* @@ -3091,7 +3185,8 @@ if (random()%100 < 50) { assert (sort_queue_item.iovec[0].iov_len < FRAME_SIZE_MAX); sort_queue_item.iov_len = 1; - if (mcast_header.seq > instance->my_high_seq_received) { + if (sq_lt_compare (instance->my_high_seq_received, + mcast_header.seq)) { instance->my_high_seq_received = mcast_header.seq; } diff --git a/include/sq.h b/include/sq.h index 1febecc4..ea7d51a2 100644 --- a/include/sq.h +++ b/include/sq.h @@ -37,16 +37,57 @@ #include "errno.h" struct sq { - int head; - int size; + unsigned int head; + unsigned int size; void *items; - unsigned char *items_inuse; - int size_per_item; - int head_seqid; - int item_count; - int pos_max; + unsigned int *items_inuse; + unsigned int size_per_item; + unsigned int head_seqid; + unsigned int item_count; + unsigned int pos_max; }; +/* + * Compare a unsigned rollover-safe value to an unsigned rollover-safe value + */ + +/* + * ADJUST_ROLLOVER_POINT is the value used to determine when a window should be + * used to calculate a less-then or less-then-equal comparison. + * + * ADJUST_ROLLOVER_VALUE is the value by which both values in a comparison are + * adjusted if either value in a comparison is greater then + * ADJUST_ROLLOVER_POINT. + */ +#define ADJUST_ROLLOVER_POINT 0x80000000 +#define ADJUST_ROLLOVER_VALUE 0x10000 + +static inline int sq_lt_compare (unsigned int a, unsigned int b) { + if ((a > ADJUST_ROLLOVER_POINT) || (b > ADJUST_ROLLOVER_POINT)) { + if ((a - ADJUST_ROLLOVER_VALUE) < (b - ADJUST_ROLLOVER_VALUE)) { + return (1); + } + } else { + if (a < b) { + return (1); + } + } + return (0); +} + +static inline int sq_lte_compare (unsigned int a, unsigned int b) { + if ((a > ADJUST_ROLLOVER_POINT) || (b > ADJUST_ROLLOVER_POINT)) { + if ((a - ADJUST_ROLLOVER_VALUE) <= (b - ADJUST_ROLLOVER_VALUE)) { + return (1); + } + } else { + if (a <= b) { + return (1); + } + } + return (0); +} + static inline int sq_init ( struct sq *sq, int item_count, @@ -66,24 +107,24 @@ static inline int sq_init ( } memset (sq->items, 0, item_count * size_per_item); - sq->items_inuse = (void *)malloc (item_count * sizeof (char)); - memset (sq->items_inuse, 0, item_count * sizeof (char)); + sq->items_inuse = (void *)malloc (item_count * sizeof (unsigned int)); + memset (sq->items_inuse, 0, item_count * sizeof (unsigned int)); return (0); } -static inline void sq_reinit (struct sq *sq, int head_seqid) +static inline void sq_reinit (struct sq *sq, unsigned int head_seqid) { sq->head = 0; sq->head_seqid = head_seqid; sq->pos_max = 0; memset (sq->items, 0, sq->item_count * sq->size_per_item); - memset (sq->items_inuse, 0, sq->item_count * sizeof (char)); + memset (sq->items_inuse, 0, sq->item_count * sizeof (unsigned int)); } -static inline void sq_assert (struct sq *sq, int pos) +static inline void sq_assert (struct sq *sq, unsigned int pos) { - int i; + unsigned int i; // printf ("Instrument[%d] Asserting from %d to %d\n", // pos, sq->pos_max, sq->size); @@ -103,7 +144,7 @@ static inline void sq_copy (struct sq *sq_dest, struct sq *sq_src) memcpy (sq_dest->items, sq_src->items, sq_src->item_count * sq_src->size_per_item); memcpy (sq_dest->items_inuse, sq_src->items_inuse, - sq_src->item_count * sizeof (char)); + sq_src->item_count * sizeof (unsigned int)); } static inline void sq_free (struct sq *sq) { @@ -114,35 +155,35 @@ static inline void sq_free (struct sq *sq) { static inline void *sq_item_add ( struct sq *sq, void *item, - int seqid) + unsigned int seqid) { char *sq_item; - int sq_position; + unsigned int sq_position; - if (seqid - sq->head_seqid >= sq->size) { - return(0); - } sq_position = (sq->head + seqid - sq->head_seqid) % sq->size; if (sq_position > sq->pos_max) { sq->pos_max = sq_position; } assert (sq_position >= 0); -//printf ("item add position %d seqid %d head seqid %d\n", sq_position, seqid, sq->head_seqid); sq_item = sq->items; sq_item += sq_position * sq->size_per_item; assert(sq->items_inuse[sq_position] == 0); memcpy (sq_item, item, sq->size_per_item); - sq->items_inuse[sq_position] = 1; + if (seqid == 0) { + sq->items_inuse[sq_position] = 1; + } else { + sq->items_inuse[sq_position] = seqid; + } return (sq_item); } -static inline int sq_item_inuse ( +static inline unsigned int sq_item_inuse ( struct sq *sq, - int seq_id) { + unsigned int seq_id) { - int sq_position; + unsigned int sq_position; /* * We need to say that the seqid is in use if it shouldn't @@ -157,34 +198,68 @@ static inline int sq_item_inuse ( } #endif sq_position = (sq->head - sq->head_seqid + seq_id) % sq->size; -//printf ("in use %d\n", sq_position); - return (sq->items_inuse[sq_position]); + return (sq->items_inuse[sq_position] != 0); } -static inline int sq_size_get ( +static inline unsigned int sq_size_get ( struct sq *sq) { return sq->size; } -static inline int sq_item_get ( +static inline unsigned int sq_in_range ( struct sq *sq, - int seq_id, + unsigned int seq_id) +{ + int res = 1; + + if (sq->head_seqid > ADJUST_ROLLOVER_POINT) { + if (seq_id - ADJUST_ROLLOVER_VALUE < + sq->head_seqid - ADJUST_ROLLOVER_VALUE) { + + res = 0; + } + if ((seq_id - ADJUST_ROLLOVER_VALUE) >= + ((sq->head_seqid - ADJUST_ROLLOVER_VALUE) + sq->size)) { + + res = 0; + } + } else { + if (seq_id < sq->head_seqid) { + res = 0; + } + if ((seq_id) >= ((sq->head_seqid) + sq->size)) { + res = 0; + } + } + return (res); + +} + +static inline unsigned int sq_item_get ( + struct sq *sq, + unsigned int seq_id, void **sq_item_out) { char *sq_item; - int sq_position; + unsigned int sq_position; -if (seq_id == -1) { - return (ENOENT); -} - assert (seq_id < (sq->head_seqid + sq->size)); - sq_position = (sq->head - sq->head_seqid + seq_id) % sq->size; + if (seq_id > ADJUST_ROLLOVER_POINT) { + assert ((seq_id - ADJUST_ROLLOVER_POINT) < + ((sq->head_seqid - ADJUST_ROLLOVER_POINT) + sq->size)); + + sq_position = ((sq->head - ADJUST_ROLLOVER_VALUE) - + (sq->head_seqid - ADJUST_ROLLOVER_VALUE) + seq_id) % sq->size; + } else { + assert (seq_id < (sq->head_seqid + sq->size)); + sq_position = (sq->head - sq->head_seqid + seq_id) % sq->size; + } +//printf ("seqid %x head %x head %x pos %x\n", seq_id, sq->head, sq->head_seqid, sq_position); +// sq_position = (sq->head - sq->head_seqid + seq_id) % sq->size; +//printf ("sq_position = %x\n", sq_position); //printf ("ITEMGET %d %d %d %d\n", sq_position, sq->head, sq->head_seqid, seq_id); assert (sq_position >= 0); -//printf ("itme get in use %d\n", sq_position); if (sq->items_inuse[sq_position] == 0) { -//printf ("not in use %d\n", sq_position); return (ENOENT); } sq_item = sq->items; @@ -193,13 +268,9 @@ assert (sq_position >= 0); return (0); } -static inline void sq_items_release (struct sq *sq, int seqid) +static inline void sq_items_release (struct sq *sq, unsigned int seqid) { - int oldhead; - - if (seqid < sq->head_seqid) { - return; - } + unsigned int oldhead; oldhead = sq->head; @@ -208,11 +279,11 @@ static inline void sq_items_release (struct sq *sq, int seqid) // printf ("releasing %d for %d\n", oldhead, sq->size - oldhead); // printf ("releasing %d for %d\n", 0, sq->head); memset (&sq->items_inuse[oldhead], 0, sq->size - oldhead); - memset (sq->items_inuse, 0, sq->head * sizeof (char)); + memset (sq->items_inuse, 0, sq->head * sizeof (unsigned int)); } else { // printf ("releasing %d for %d\n", oldhead, seqid - sq->head_seqid + 1); memset (&sq->items_inuse[oldhead], 0, - (seqid - sq->head_seqid + 1) * sizeof (char)); + (seqid - sq->head_seqid + 1) * sizeof (unsigned int)); } sq->head_seqid = seqid + 1; }