Add a target token set completed callback in totemrrp and below layers.

Handle management of callback in totemsrp.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2371 fd59a12c-fef9-0310-b244-a6a79926bd2f
This commit is contained in:
Steven Dake 2009-07-27 00:29:32 +00:00
parent a1db33fc99
commit f9f663f459
7 changed files with 171 additions and 75 deletions

View File

@ -57,7 +57,10 @@ struct transport {
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address));
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context));
int (*processor_count_set) (
void *transport_context,
@ -195,7 +198,10 @@ int totemnet_initialize (
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address))
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context))
{
struct totemnet_instance *instance;
unsigned int res;
@ -208,7 +214,7 @@ int totemnet_initialize (
res = instance->transport->initialize (poll_handle,
&instance->transport_context, totem_config,
interface_no, context, deliver_fn, iface_change_fn);
interface_no, context, deliver_fn, iface_change_fn, target_set_completed);
if (res == -1) {
goto error_destroy;

View File

@ -64,7 +64,10 @@ extern int totemnet_initialize (
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address));
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context));
extern int totemnet_processor_count_set (
void *net_context,

View File

@ -194,6 +194,9 @@ struct totemrrp_instance {
unsigned int *seqid,
unsigned int *token_is);
void (*totemrrp_target_set_completed) (
void *context);
unsigned int (*totemrrp_msgs_missing) (void);
/*
@ -1432,6 +1435,13 @@ int totemrrp_finalize (
return (0);
}
static void rrp_target_set_completed (void *context)
{
struct deliver_fn_context *deliver_fn_context = (struct deliver_fn_context *)context;
deliver_fn_context->instance->totemrrp_target_set_completed (deliver_fn_context->context);
}
/*
* Totem Redundant Ring interface
* depends on poll abstraction, POSIX, IPV4
@ -1461,7 +1471,9 @@ int totemrrp_initialize (
unsigned int *seqid,
unsigned int *token_is),
unsigned int (*msgs_missing) (void))
unsigned int (*msgs_missing) (void),
void (*target_set_completed) (void *context))
{
struct totemrrp_instance *instance;
unsigned int res;
@ -1504,6 +1516,8 @@ int totemrrp_initialize (
instance->totemrrp_token_seqid_get = token_seqid_get;
instance->totemrrp_target_set_completed = target_set_completed;
instance->totemrrp_msgs_missing = msgs_missing;
instance->interface_count = totem_config->interface_count;
@ -1522,6 +1536,7 @@ int totemrrp_initialize (
deliver_fn_context->instance = instance;
deliver_fn_context->context = context;
deliver_fn_context->iface_no = i;
printf ("deliver fn context %p\n", deliver_fn_context);
totemnet_initialize (
poll_handle,
@ -1530,7 +1545,8 @@ int totemrrp_initialize (
i,
(void *)deliver_fn_context,
rrp_deliver_fn,
rrp_iface_change_fn);
rrp_iface_change_fn,
rrp_target_set_completed);
totemnet_net_mtu_adjust (instance->net_handles[i], totem_config);
}

View File

@ -72,7 +72,11 @@ extern int totemrrp_initialize (
unsigned int *seqid,
unsigned int *token_is),
unsigned int (*msgs_missing) (void));
unsigned int (*msgs_missing) (void),
void (*target_set_completed) (
void *context)
);
extern int totemrrp_processor_count_set (

View File

@ -505,6 +505,10 @@ struct totemsrp_instance {
unsigned int my_cbl;
struct timeval pause_timestamp;
struct memb_commit_token *commit_token;
char commit_token_storage[9000];
};
struct message_handlers {
@ -586,10 +590,12 @@ static void messages_free (struct totemsrp_instance *instance, unsigned int toke
static void memb_ring_id_set_and_store (struct totemsrp_instance *instance,
const struct memb_ring_id *ring_id);
static void memb_state_commit_token_update (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
static void memb_state_commit_token_target_set (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
static int memb_state_commit_token_send (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
static void memb_state_commit_token_create (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
static void target_set_completed (void *context);
static void memb_state_commit_token_update (struct totemsrp_instance *instance);
static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
static int memb_state_commit_token_send (struct totemsrp_instance *instance);
static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
static void memb_state_commit_token_create (struct totemsrp_instance *instance);
static int token_hold_cancel_send (struct totemsrp_instance *instance);
static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
@ -661,6 +667,8 @@ static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
instance->my_high_seq_received = SEQNO_START_MSG;
instance->my_high_delivered = SEQNO_START_MSG;
instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
}
static void main_token_seqid_get (
@ -872,7 +880,8 @@ int totemsrp_initialize (
main_deliver_fn,
main_iface_change_fn,
main_token_seqid_get,
main_msgs_missing);
main_msgs_missing,
target_set_completed);
/*
* Must have net_mtu adjusted by totemrrp_initialize first
@ -1792,23 +1801,25 @@ static void memb_state_gather_enter (
static void timer_function_token_retransmit_timeout (void *data);
static void target_set_completed (
void *context)
{
struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
memb_state_commit_token_send (instance);
}
static void memb_state_commit_enter (
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
struct totemsrp_instance *instance)
{
ring_save (instance);
old_ring_state_save (instance);
memb_state_commit_token_update (instance, commit_token);
memb_state_commit_token_update (instance);
memb_state_commit_token_target_set (instance, commit_token);
memb_ring_id_set_and_store (instance, &commit_token->ring_id);
memb_state_commit_token_send (instance, commit_token);
instance->token_ring_id_seq = instance->my_ring_id.seq;
memb_state_commit_token_target_set (instance);
poll_timer_delete (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
@ -1818,13 +1829,17 @@ static void memb_state_commit_enter (
instance->memb_timer_state_gather_consensus_timeout = 0;
reset_token_timeout (instance); // REVIEWED
reset_token_retransmit_timeout (instance); // REVIEWED
memb_ring_id_set_and_store (instance, &instance->commit_token->ring_id);
instance->token_ring_id_seq = instance->my_ring_id.seq;
log_printf (instance->totemsrp_log_level_debug,
"entering COMMIT state.\n");
instance->memb_state = MEMB_STATE_COMMIT;
reset_token_retransmit_timeout (instance); // REVIEWED
reset_token_timeout (instance); // REVIEWED
/*
* reset all flow control variables since we are starting a new ring
@ -1832,7 +1847,9 @@ static void memb_state_commit_enter (
instance->my_trc = 0;
instance->my_pbl = 0;
instance->my_cbl = 0;
return;
/*
* commit token sent after callback that token target has been set
*/
}
static void memb_state_recovery_enter (
@ -1863,7 +1880,7 @@ static void memb_state_recovery_enter (
low_ring_aru = instance->old_ring_state_high_seq_received;
memb_state_commit_token_send (instance, commit_token);
memb_state_commit_token_send_recovery (instance, commit_token);
instance->my_token_seq = SEQNO_START_TOKEN - 1;
@ -2604,27 +2621,26 @@ static int orf_token_send_initial (struct totemsrp_instance *instance)
}
static void memb_state_commit_token_update (
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
struct totemsrp_instance *instance)
{
struct srp_addr *addr;
struct memb_commit_token_memb_entry *memb_list;
unsigned int high_aru;
unsigned int i;
addr = (struct srp_addr *)commit_token->end_of_commit_token;
memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
memcpy (instance->my_new_memb_list, addr,
sizeof (struct srp_addr) * commit_token->addr_entries);
sizeof (struct srp_addr) * instance->commit_token->addr_entries);
instance->my_new_memb_entries = commit_token->addr_entries;
instance->my_new_memb_entries = instance->commit_token->addr_entries;
memcpy (&memb_list[commit_token->memb_index].ring_id,
memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
&instance->my_old_ring_id, sizeof (struct memb_ring_id));
assert (!totemip_zero_check(&instance->my_old_ring_id.rep));
memb_list[commit_token->memb_index].aru = instance->old_ring_state_aru;
memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
/*
* TODO high delivered is really instance->my_aru, but with safe this
* could change?
@ -2632,17 +2648,17 @@ static void memb_state_commit_token_update (
instance->my_received_flg =
(instance->my_aru == instance->my_high_seq_received);
memb_list[commit_token->memb_index].received_flg = instance->my_received_flg;
memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
memb_list[commit_token->memb_index].high_delivered = instance->my_high_delivered;
memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
/*
* find high aru up to current memb_index for all matching ring ids
* if any ring id matching memb_index has aru less then high aru set
* received flag for that entry to false
*/
high_aru = memb_list[commit_token->memb_index].aru;
for (i = 0; i <= commit_token->memb_index; i++) {
if (memcmp (&memb_list[commit_token->memb_index].ring_id,
high_aru = memb_list[instance->commit_token->memb_index].aru;
for (i = 0; i <= instance->commit_token->memb_index; i++) {
if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
&memb_list[i].ring_id,
sizeof (struct memb_ring_id)) == 0) {
@ -2652,45 +2668,44 @@ static void memb_state_commit_token_update (
}
}
for (i = 0; i <= commit_token->memb_index; i++) {
if (memcmp (&memb_list[commit_token->memb_index].ring_id,
for (i = 0; i <= instance->commit_token->memb_index; i++) {
if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
&memb_list[i].ring_id,
sizeof (struct memb_ring_id)) == 0) {
if (sq_lt_compare (memb_list[i].aru, high_aru)) {
memb_list[i].received_flg = 0;
if (i == commit_token->memb_index) {
if (i == instance->commit_token->memb_index) {
instance->my_received_flg = 0;
}
}
}
}
commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
commit_token->memb_index += 1;
assert (commit_token->memb_index <= commit_token->addr_entries);
assert (commit_token->header.nodeid);
instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
instance->commit_token->memb_index += 1;
assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
assert (instance->commit_token->header.nodeid);
}
static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
struct totemsrp_instance *instance)
{
struct srp_addr *addr;
unsigned int i;
addr = (struct srp_addr *)commit_token->end_of_commit_token;
addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
for (i = 0; i < instance->totem_config->interface_count; i++) {
totemrrp_token_target_set (
instance->totemrrp_context,
&addr[commit_token->memb_index %
commit_token->addr_entries].addr[i],
&addr[instance->commit_token->memb_index %
instance->commit_token->addr_entries].addr[i],
i);
}
}
static int memb_state_commit_token_send (
static int memb_state_commit_token_send_recovery (
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
{
@ -2722,6 +2737,37 @@ static int memb_state_commit_token_send (
return (0);
}
static int memb_state_commit_token_send (
struct totemsrp_instance *instance)
{
struct srp_addr *addr;
struct memb_commit_token_memb_entry *memb_list;
unsigned int commit_token_size;
addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
instance->commit_token->token_seq++;
commit_token_size = sizeof (struct memb_commit_token) +
((sizeof (struct srp_addr) +
sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
/*
* Make a copy for retransmission if necessary
*/
memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
instance->orf_token_retransmit_size = commit_token_size;
totemrrp_token_send (instance->totemrrp_context,
instance->commit_token,
commit_token_size);
/*
* Request retransmission of the commit token in case it is lost
*/
reset_token_retransmit_timeout (instance);
return (0);
}
static int memb_lowest_in_config (struct totemsrp_instance *instance)
{
@ -2756,8 +2802,7 @@ static int srp_addr_compare (const void *a, const void *b)
}
static void memb_state_commit_token_create (
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
struct totemsrp_instance *instance)
{
struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
struct srp_addr *addr;
@ -2771,16 +2816,16 @@ static void memb_state_commit_token_create (
instance->my_proc_list, instance->my_proc_list_entries,
instance->my_failed_list, instance->my_failed_list_entries);
memset (commit_token, 0, sizeof (struct memb_commit_token));
commit_token->header.type = MESSAGE_TYPE_MEMB_COMMIT_TOKEN;
commit_token->header.endian_detector = ENDIAN_LOCAL;
commit_token->header.encapsulated = 0;
commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
assert (commit_token->header.nodeid);
memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
instance->commit_token->header.type = MESSAGE_TYPE_MEMB_COMMIT_TOKEN;
instance->commit_token->header.endian_detector = ENDIAN_LOCAL;
instance->commit_token->header.encapsulated = 0;
instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
assert (instance->commit_token->header.nodeid);
totemip_copy(&commit_token->ring_id.rep, &instance->my_id.addr[0]);
totemip_copy(&instance->commit_token->ring_id.rep, &instance->my_id.addr[0]);
commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
/*
* This qsort is necessary to ensure the commit token traverses
@ -2789,11 +2834,11 @@ static void memb_state_commit_token_create (
qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
srp_addr_compare);
commit_token->memb_index = 0;
commit_token->addr_entries = token_memb_entries;
instance->commit_token->memb_index = 0;
instance->commit_token->addr_entries = token_memb_entries;
addr = (struct srp_addr *)commit_token->end_of_commit_token;
memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
memcpy (addr, token_memb,
token_memb_entries * sizeof (struct srp_addr));
@ -3349,6 +3394,12 @@ static int message_handler_orf_token (
fcc_rtr_limit (instance, token, &transmits_allowed);
mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
/*
if (mcasted_regular) {
printf ("mcasted regular %d\n", mcasted_regular);
printf ("token seq %d\n", token->seq);
}
*/
fcc_token_update (instance, token, mcasted_retransmit +
mcasted_regular);
@ -3779,15 +3830,18 @@ static int memb_join_process (
struct totemsrp_instance *instance,
const struct memb_join *memb_join)
{
unsigned char *commit_token_storage[TOKEN_SIZE_MAX];
struct memb_commit_token *my_commit_token =
(struct memb_commit_token *)commit_token_storage;
struct srp_addr *proc_list;
struct srp_addr *failed_list;
proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
failed_list = proc_list + memb_join->proc_list_entries;
/*
memb_set_print ("proclist", proc_list, memb_join->proc_list_entries);
memb_set_print ("faillist", failed_list, memb_join->failed_list_entries);
memb_set_print ("my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
memb_set_print ("my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
*/
if (memb_set_equal (proc_list,
memb_join->proc_list_entries,
instance->my_proc_list,
@ -3803,9 +3857,9 @@ static int memb_join_process (
if (memb_consensus_agreed (instance) &&
memb_lowest_in_config (instance)) {
memb_state_commit_token_create (instance, my_commit_token);
memb_state_commit_token_create (instance);
memb_state_commit_enter (instance, my_commit_token);
memb_state_commit_enter (instance);
} else {
return (0);
}
@ -4089,8 +4143,8 @@ static int message_handler_memb_commit_token (
sub_entries) &&
memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
memb_state_commit_enter (instance, memb_commit_token);
memcpy (instance->commit_token, memb_commit_token, msg_len);
memb_state_commit_enter (instance);
}
break;
@ -4159,6 +4213,7 @@ void main_deliver_fn (
if ((int)message_header->type >= totemsrp_message_handlers.count) {
log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong... ignoring %d.\n", (int)message_header->type);
printf ("wrong message type\n");
return;
}

View File

@ -142,6 +142,8 @@ struct totemudp_instance {
void *context,
const struct totem_ip_address *iface_address);
void (*totemudp_target_set_completed) (void *context);
/*
* Function and data used to log messages
*/
@ -1707,7 +1709,10 @@ int totemudp_initialize (
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address))
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context))
{
struct totemudp_instance *instance;
@ -1769,6 +1774,8 @@ int totemudp_initialize (
instance->totemudp_iface_change_fn = iface_change_fn;
instance->totemudp_target_set_completed = target_set_completed;
totemip_localhost (instance->mcast_address.family, &localhost);
/*
@ -1940,6 +1947,8 @@ int totemudp_token_target_set (
memcpy (&instance->token_target, token_target,
sizeof (struct totem_ip_address));
instance->totemudp_target_set_completed (instance->context);
return (res);
}

View File

@ -57,7 +57,10 @@ extern int totemudp_initialize (
void (*iface_change_fn) (
void *context,
const struct totem_ip_address *iface_address));
const struct totem_ip_address *iface_address),
void (*target_set_completed) (
void *context));
extern int totemudp_processor_count_set (
void *udp_context,