diff --git a/exec/totemnet.c b/exec/totemnet.c index 2a2ddfe3..78fb28fb 100644 --- a/exec/totemnet.c +++ b/exec/totemnet.c @@ -57,7 +57,10 @@ struct transport { void (*iface_change_fn) ( void *context, - const struct totem_ip_address *iface_address)); + const struct totem_ip_address *iface_address), + + void (*target_set_completed) ( + void *context)); int (*processor_count_set) ( void *transport_context, @@ -195,7 +198,10 @@ int totemnet_initialize ( void (*iface_change_fn) ( void *context, - const struct totem_ip_address *iface_address)) + const struct totem_ip_address *iface_address), + + void (*target_set_completed) ( + void *context)) { struct totemnet_instance *instance; unsigned int res; @@ -208,7 +214,7 @@ int totemnet_initialize ( res = instance->transport->initialize (poll_handle, &instance->transport_context, totem_config, - interface_no, context, deliver_fn, iface_change_fn); + interface_no, context, deliver_fn, iface_change_fn, target_set_completed); if (res == -1) { goto error_destroy; diff --git a/exec/totemnet.h b/exec/totemnet.h index 644a6c24..96f063d2 100644 --- a/exec/totemnet.h +++ b/exec/totemnet.h @@ -64,7 +64,10 @@ extern int totemnet_initialize ( void (*iface_change_fn) ( void *context, - const struct totem_ip_address *iface_address)); + const struct totem_ip_address *iface_address), + + void (*target_set_completed) ( + void *context)); extern int totemnet_processor_count_set ( void *net_context, diff --git a/exec/totemrrp.c b/exec/totemrrp.c index f9623064..406d4fdd 100644 --- a/exec/totemrrp.c +++ b/exec/totemrrp.c @@ -194,6 +194,9 @@ struct totemrrp_instance { unsigned int *seqid, unsigned int *token_is); + void (*totemrrp_target_set_completed) ( + void *context); + unsigned int (*totemrrp_msgs_missing) (void); /* @@ -1432,6 +1435,13 @@ int totemrrp_finalize ( return (0); } +static void rrp_target_set_completed (void *context) +{ + struct deliver_fn_context *deliver_fn_context = (struct deliver_fn_context *)context; + + deliver_fn_context->instance->totemrrp_target_set_completed (deliver_fn_context->context); +} + /* * Totem Redundant Ring interface * depends on poll abstraction, POSIX, IPV4 @@ -1461,7 +1471,9 @@ int totemrrp_initialize ( unsigned int *seqid, unsigned int *token_is), - unsigned int (*msgs_missing) (void)) + unsigned int (*msgs_missing) (void), + + void (*target_set_completed) (void *context)) { struct totemrrp_instance *instance; unsigned int res; @@ -1504,6 +1516,8 @@ int totemrrp_initialize ( instance->totemrrp_token_seqid_get = token_seqid_get; + instance->totemrrp_target_set_completed = target_set_completed; + instance->totemrrp_msgs_missing = msgs_missing; instance->interface_count = totem_config->interface_count; @@ -1522,6 +1536,7 @@ int totemrrp_initialize ( deliver_fn_context->instance = instance; deliver_fn_context->context = context; deliver_fn_context->iface_no = i; +printf ("deliver fn context %p\n", deliver_fn_context); totemnet_initialize ( poll_handle, @@ -1530,7 +1545,8 @@ int totemrrp_initialize ( i, (void *)deliver_fn_context, rrp_deliver_fn, - rrp_iface_change_fn); + rrp_iface_change_fn, + rrp_target_set_completed); totemnet_net_mtu_adjust (instance->net_handles[i], totem_config); } diff --git a/exec/totemrrp.h b/exec/totemrrp.h index 03dbedcc..5fa0a0ff 100644 --- a/exec/totemrrp.h +++ b/exec/totemrrp.h @@ -72,7 +72,11 @@ extern int totemrrp_initialize ( unsigned int *seqid, unsigned int *token_is), - unsigned int (*msgs_missing) (void)); + unsigned int (*msgs_missing) (void), + + void (*target_set_completed) ( + void *context) + ); extern int totemrrp_processor_count_set ( diff --git a/exec/totemsrp.c b/exec/totemsrp.c index e40ccfbb..574edb63 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -505,6 +505,10 @@ struct totemsrp_instance { unsigned int my_cbl; struct timeval pause_timestamp; + + struct memb_commit_token *commit_token; + + char commit_token_storage[9000]; }; struct message_handlers { @@ -586,10 +590,12 @@ static void messages_free (struct totemsrp_instance *instance, unsigned int toke static void memb_ring_id_set_and_store (struct totemsrp_instance *instance, const struct memb_ring_id *ring_id); -static void memb_state_commit_token_update (struct totemsrp_instance *instance, struct memb_commit_token *commit_token); -static void memb_state_commit_token_target_set (struct totemsrp_instance *instance, struct memb_commit_token *commit_token); -static int memb_state_commit_token_send (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token); -static void memb_state_commit_token_create (struct totemsrp_instance *instance, struct memb_commit_token *commit_token); +static void target_set_completed (void *context); +static void memb_state_commit_token_update (struct totemsrp_instance *instance); +static void memb_state_commit_token_target_set (struct totemsrp_instance *instance); +static int memb_state_commit_token_send (struct totemsrp_instance *instance); +static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token); +static void memb_state_commit_token_create (struct totemsrp_instance *instance); static int token_hold_cancel_send (struct totemsrp_instance *instance); static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out); static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out); @@ -661,6 +667,8 @@ static void totemsrp_instance_initialize (struct totemsrp_instance *instance) instance->my_high_seq_received = SEQNO_START_MSG; instance->my_high_delivered = SEQNO_START_MSG; + + instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage; } static void main_token_seqid_get ( @@ -872,7 +880,8 @@ int totemsrp_initialize ( main_deliver_fn, main_iface_change_fn, main_token_seqid_get, - main_msgs_missing); + main_msgs_missing, + target_set_completed); /* * Must have net_mtu adjusted by totemrrp_initialize first @@ -1792,23 +1801,25 @@ static void memb_state_gather_enter ( static void timer_function_token_retransmit_timeout (void *data); +static void target_set_completed ( + void *context) +{ + struct totemsrp_instance *instance = (struct totemsrp_instance *)context; + + memb_state_commit_token_send (instance); + +} + static void memb_state_commit_enter ( - struct totemsrp_instance *instance, - struct memb_commit_token *commit_token) + struct totemsrp_instance *instance) { ring_save (instance); old_ring_state_save (instance); - memb_state_commit_token_update (instance, commit_token); + memb_state_commit_token_update (instance); - memb_state_commit_token_target_set (instance, commit_token); - - memb_ring_id_set_and_store (instance, &commit_token->ring_id); - - memb_state_commit_token_send (instance, commit_token); - - instance->token_ring_id_seq = instance->my_ring_id.seq; + memb_state_commit_token_target_set (instance); poll_timer_delete (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout); @@ -1818,13 +1829,17 @@ static void memb_state_commit_enter ( instance->memb_timer_state_gather_consensus_timeout = 0; - reset_token_timeout (instance); // REVIEWED - reset_token_retransmit_timeout (instance); // REVIEWED + memb_ring_id_set_and_store (instance, &instance->commit_token->ring_id); + + instance->token_ring_id_seq = instance->my_ring_id.seq; log_printf (instance->totemsrp_log_level_debug, "entering COMMIT state.\n"); instance->memb_state = MEMB_STATE_COMMIT; + reset_token_retransmit_timeout (instance); // REVIEWED + reset_token_timeout (instance); // REVIEWED + /* * reset all flow control variables since we are starting a new ring @@ -1832,7 +1847,9 @@ static void memb_state_commit_enter ( instance->my_trc = 0; instance->my_pbl = 0; instance->my_cbl = 0; - return; + /* + * commit token sent after callback that token target has been set + */ } static void memb_state_recovery_enter ( @@ -1863,7 +1880,7 @@ static void memb_state_recovery_enter ( low_ring_aru = instance->old_ring_state_high_seq_received; - memb_state_commit_token_send (instance, commit_token); + memb_state_commit_token_send_recovery (instance, commit_token); instance->my_token_seq = SEQNO_START_TOKEN - 1; @@ -2604,27 +2621,26 @@ static int orf_token_send_initial (struct totemsrp_instance *instance) } static void memb_state_commit_token_update ( - struct totemsrp_instance *instance, - struct memb_commit_token *commit_token) + struct totemsrp_instance *instance) { struct srp_addr *addr; struct memb_commit_token_memb_entry *memb_list; unsigned int high_aru; unsigned int i; - addr = (struct srp_addr *)commit_token->end_of_commit_token; - memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries); + addr = (struct srp_addr *)instance->commit_token->end_of_commit_token; + memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries); memcpy (instance->my_new_memb_list, addr, - sizeof (struct srp_addr) * commit_token->addr_entries); + sizeof (struct srp_addr) * instance->commit_token->addr_entries); - instance->my_new_memb_entries = commit_token->addr_entries; + instance->my_new_memb_entries = instance->commit_token->addr_entries; - memcpy (&memb_list[commit_token->memb_index].ring_id, + memcpy (&memb_list[instance->commit_token->memb_index].ring_id, &instance->my_old_ring_id, sizeof (struct memb_ring_id)); assert (!totemip_zero_check(&instance->my_old_ring_id.rep)); - memb_list[commit_token->memb_index].aru = instance->old_ring_state_aru; + memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru; /* * TODO high delivered is really instance->my_aru, but with safe this * could change? @@ -2632,17 +2648,17 @@ static void memb_state_commit_token_update ( instance->my_received_flg = (instance->my_aru == instance->my_high_seq_received); - memb_list[commit_token->memb_index].received_flg = instance->my_received_flg; + memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg; - memb_list[commit_token->memb_index].high_delivered = instance->my_high_delivered; + memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered; /* * find high aru up to current memb_index for all matching ring ids * if any ring id matching memb_index has aru less then high aru set * received flag for that entry to false */ - high_aru = memb_list[commit_token->memb_index].aru; - for (i = 0; i <= commit_token->memb_index; i++) { - if (memcmp (&memb_list[commit_token->memb_index].ring_id, + high_aru = memb_list[instance->commit_token->memb_index].aru; + for (i = 0; i <= instance->commit_token->memb_index; i++) { + if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id, &memb_list[i].ring_id, sizeof (struct memb_ring_id)) == 0) { @@ -2652,45 +2668,44 @@ static void memb_state_commit_token_update ( } } - for (i = 0; i <= commit_token->memb_index; i++) { - if (memcmp (&memb_list[commit_token->memb_index].ring_id, + for (i = 0; i <= instance->commit_token->memb_index; i++) { + if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id, &memb_list[i].ring_id, sizeof (struct memb_ring_id)) == 0) { if (sq_lt_compare (memb_list[i].aru, high_aru)) { memb_list[i].received_flg = 0; - if (i == commit_token->memb_index) { + if (i == instance->commit_token->memb_index) { instance->my_received_flg = 0; } } } } - commit_token->header.nodeid = instance->my_id.addr[0].nodeid; - commit_token->memb_index += 1; - assert (commit_token->memb_index <= commit_token->addr_entries); - assert (commit_token->header.nodeid); + instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid; + instance->commit_token->memb_index += 1; + assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries); + assert (instance->commit_token->header.nodeid); } static void memb_state_commit_token_target_set ( - struct totemsrp_instance *instance, - struct memb_commit_token *commit_token) + struct totemsrp_instance *instance) { struct srp_addr *addr; unsigned int i; - addr = (struct srp_addr *)commit_token->end_of_commit_token; + addr = (struct srp_addr *)instance->commit_token->end_of_commit_token; for (i = 0; i < instance->totem_config->interface_count; i++) { totemrrp_token_target_set ( instance->totemrrp_context, - &addr[commit_token->memb_index % - commit_token->addr_entries].addr[i], + &addr[instance->commit_token->memb_index % + instance->commit_token->addr_entries].addr[i], i); } } -static int memb_state_commit_token_send ( +static int memb_state_commit_token_send_recovery ( struct totemsrp_instance *instance, struct memb_commit_token *commit_token) { @@ -2722,6 +2737,37 @@ static int memb_state_commit_token_send ( return (0); } +static int memb_state_commit_token_send ( + struct totemsrp_instance *instance) +{ + struct srp_addr *addr; + struct memb_commit_token_memb_entry *memb_list; + unsigned int commit_token_size; + + addr = (struct srp_addr *)instance->commit_token->end_of_commit_token; + memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries); + + instance->commit_token->token_seq++; + commit_token_size = sizeof (struct memb_commit_token) + + ((sizeof (struct srp_addr) + + sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries); + /* + * Make a copy for retransmission if necessary + */ + memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size); + instance->orf_token_retransmit_size = commit_token_size; + + totemrrp_token_send (instance->totemrrp_context, + instance->commit_token, + commit_token_size); + + /* + * Request retransmission of the commit token in case it is lost + */ + reset_token_retransmit_timeout (instance); + return (0); +} + static int memb_lowest_in_config (struct totemsrp_instance *instance) { @@ -2756,8 +2802,7 @@ static int srp_addr_compare (const void *a, const void *b) } static void memb_state_commit_token_create ( - struct totemsrp_instance *instance, - struct memb_commit_token *commit_token) + struct totemsrp_instance *instance) { struct srp_addr token_memb[PROCESSOR_COUNT_MAX]; struct srp_addr *addr; @@ -2771,16 +2816,16 @@ static void memb_state_commit_token_create ( instance->my_proc_list, instance->my_proc_list_entries, instance->my_failed_list, instance->my_failed_list_entries); - memset (commit_token, 0, sizeof (struct memb_commit_token)); - commit_token->header.type = MESSAGE_TYPE_MEMB_COMMIT_TOKEN; - commit_token->header.endian_detector = ENDIAN_LOCAL; - commit_token->header.encapsulated = 0; - commit_token->header.nodeid = instance->my_id.addr[0].nodeid; - assert (commit_token->header.nodeid); + memset (instance->commit_token, 0, sizeof (struct memb_commit_token)); + instance->commit_token->header.type = MESSAGE_TYPE_MEMB_COMMIT_TOKEN; + instance->commit_token->header.endian_detector = ENDIAN_LOCAL; + instance->commit_token->header.encapsulated = 0; + instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid; + assert (instance->commit_token->header.nodeid); - totemip_copy(&commit_token->ring_id.rep, &instance->my_id.addr[0]); + totemip_copy(&instance->commit_token->ring_id.rep, &instance->my_id.addr[0]); - commit_token->ring_id.seq = instance->token_ring_id_seq + 4; + instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4; /* * This qsort is necessary to ensure the commit token traverses @@ -2789,11 +2834,11 @@ static void memb_state_commit_token_create ( qsort (token_memb, token_memb_entries, sizeof (struct srp_addr), srp_addr_compare); - commit_token->memb_index = 0; - commit_token->addr_entries = token_memb_entries; + instance->commit_token->memb_index = 0; + instance->commit_token->addr_entries = token_memb_entries; - addr = (struct srp_addr *)commit_token->end_of_commit_token; - memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries); + addr = (struct srp_addr *)instance->commit_token->end_of_commit_token; + memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries); memcpy (addr, token_memb, token_memb_entries * sizeof (struct srp_addr)); @@ -3349,6 +3394,12 @@ static int message_handler_orf_token ( fcc_rtr_limit (instance, token, &transmits_allowed); mcasted_regular = orf_token_mcast (instance, token, transmits_allowed); +/* +if (mcasted_regular) { +printf ("mcasted regular %d\n", mcasted_regular); +printf ("token seq %d\n", token->seq); +} +*/ fcc_token_update (instance, token, mcasted_retransmit + mcasted_regular); @@ -3779,15 +3830,18 @@ static int memb_join_process ( struct totemsrp_instance *instance, const struct memb_join *memb_join) { - unsigned char *commit_token_storage[TOKEN_SIZE_MAX]; - struct memb_commit_token *my_commit_token = - (struct memb_commit_token *)commit_token_storage; struct srp_addr *proc_list; struct srp_addr *failed_list; proc_list = (struct srp_addr *)memb_join->end_of_memb_join; failed_list = proc_list + memb_join->proc_list_entries; +/* + memb_set_print ("proclist", proc_list, memb_join->proc_list_entries); + memb_set_print ("faillist", failed_list, memb_join->failed_list_entries); + memb_set_print ("my_proclist", instance->my_proc_list, instance->my_proc_list_entries); + memb_set_print ("my_faillist", instance->my_failed_list, instance->my_failed_list_entries); +*/ if (memb_set_equal (proc_list, memb_join->proc_list_entries, instance->my_proc_list, @@ -3803,9 +3857,9 @@ static int memb_join_process ( if (memb_consensus_agreed (instance) && memb_lowest_in_config (instance)) { - memb_state_commit_token_create (instance, my_commit_token); + memb_state_commit_token_create (instance); - memb_state_commit_enter (instance, my_commit_token); + memb_state_commit_enter (instance); } else { return (0); } @@ -4089,8 +4143,8 @@ static int message_handler_memb_commit_token ( sub_entries) && memb_commit_token->ring_id.seq > instance->my_ring_id.seq) { - - memb_state_commit_enter (instance, memb_commit_token); + memcpy (instance->commit_token, memb_commit_token, msg_len); + memb_state_commit_enter (instance); } break; @@ -4159,6 +4213,7 @@ void main_deliver_fn ( if ((int)message_header->type >= totemsrp_message_handlers.count) { log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong... ignoring %d.\n", (int)message_header->type); +printf ("wrong message type\n"); return; } diff --git a/exec/totemudp.c b/exec/totemudp.c index 87062bb4..84250238 100644 --- a/exec/totemudp.c +++ b/exec/totemudp.c @@ -142,6 +142,8 @@ struct totemudp_instance { void *context, const struct totem_ip_address *iface_address); + void (*totemudp_target_set_completed) (void *context); + /* * Function and data used to log messages */ @@ -1707,7 +1709,10 @@ int totemudp_initialize ( void (*iface_change_fn) ( void *context, - const struct totem_ip_address *iface_address)) + const struct totem_ip_address *iface_address), + + void (*target_set_completed) ( + void *context)) { struct totemudp_instance *instance; @@ -1769,6 +1774,8 @@ int totemudp_initialize ( instance->totemudp_iface_change_fn = iface_change_fn; + instance->totemudp_target_set_completed = target_set_completed; + totemip_localhost (instance->mcast_address.family, &localhost); /* @@ -1940,6 +1947,8 @@ int totemudp_token_target_set ( memcpy (&instance->token_target, token_target, sizeof (struct totem_ip_address)); + instance->totemudp_target_set_completed (instance->context); + return (res); } diff --git a/exec/totemudp.h b/exec/totemudp.h index 66ad80bb..3ad25105 100644 --- a/exec/totemudp.h +++ b/exec/totemudp.h @@ -57,7 +57,10 @@ extern int totemudp_initialize ( void (*iface_change_fn) ( void *context, - const struct totem_ip_address *iface_address)); + const struct totem_ip_address *iface_address), + + void (*target_set_completed) ( + void *context)); extern int totemudp_processor_count_set ( void *udp_context,