Fix problem with sync operations under very rare circumstances

This patch creates a special message queue for synchronization messages.
This prevents a situation in which messages are queued in the
new_message_queue but have not yet been originated from corrupting the
synchronization process.

Signed-off-by: Steven Dake <sdake@redhat.com>
Reviewed-by: Jan Friesse <jfriesse@redhat.com>
Reviewed-by: Fabio M. Di Nitto <fdinitto@redhat.com>
This commit is contained in:
Steven Dake 2012-11-07 16:45:12 +01:00 committed by Jan Friesse
parent 28af35234a
commit 402638929e
7 changed files with 71 additions and 6 deletions

View File

@ -275,6 +275,10 @@ static void corosync_sync_completed (void)
cs_ipcs_sync_state_changed(sync_in_process);
cs_ipc_allow_connections(1);
/*
* Inform totem to start using new message queue again
*/
totempg_trans_ack();
}
static int corosync_sync_callbacks_retrieve (

View File

@ -276,3 +276,8 @@ void totemmrp_threaded_mode_enable (void)
{
totemsrp_threaded_mode_enable (totemsrp_context);
}
void totemmrp_trans_ack (void)
{
totemsrp_trans_ack (totemsrp_context);
}

View File

@ -131,4 +131,6 @@ extern int totemmrp_member_remove (
void totemmrp_threaded_mode_enable (void);
void totemmrp_trans_ack (void);
#endif /* TOTEMMRP_H_DEFINED */

View File

@ -1477,3 +1477,8 @@ void totempg_threaded_mode_enable (void)
totemmrp_threaded_mode_enable ();
}
void totempg_trans_ack (void)
{
totemmrp_trans_ack ();
}

View File

@ -369,6 +369,8 @@ struct totemsrp_instance {
*/
struct cs_queue new_message_queue;
struct cs_queue new_message_queue_trans;
struct cs_queue retrans_message_queue;
struct sq regular_sort_queue;
@ -502,6 +504,8 @@ struct totemsrp_instance {
uint32_t orf_token_discard;
uint32_t threaded_mode_enabled;
uint32_t waiting_trans_ack;
void * token_recv_event_handle;
void * token_sent_event_handle;
@ -679,6 +683,8 @@ static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
instance->my_id.no_addrs = INTERFACE_MAX;
instance->waiting_trans_ack = 1;
}
static void main_token_seqid_get (
@ -950,6 +956,10 @@ int totemsrp_initialize (
MESSAGE_QUEUE_MAX,
sizeof (struct message_item), instance->threaded_mode_enabled);
cs_queue_init (&instance->new_message_queue_trans,
MESSAGE_QUEUE_MAX,
sizeof (struct message_item), instance->threaded_mode_enabled);
totemsrp_callback_token_create (instance,
&instance->token_recv_event_handle,
TOTEM_CALLBACK_TOKEN_RECEIVED,
@ -981,6 +991,7 @@ void totemsrp_finalize (
memb_leave_message_send (instance);
totemrrp_finalize (instance->totemrrp_context);
cs_queue_free (&instance->new_message_queue);
cs_queue_free (&instance->new_message_queue_trans);
cs_queue_free (&instance->retrans_message_queue);
sq_free (&instance->regular_sort_queue);
sq_free (&instance->recovery_sort_queue);
@ -1825,6 +1836,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
trans_memb_list_totemip, instance->my_trans_memb_entries,
left_list, instance->my_left_memb_entries,
0, 0, &instance->my_ring_id);
instance->waiting_trans_ack = 1;
// TODO we need to filter to ensure we only deliver those
// messages which are part of instance->my_deliver_memb
@ -2265,8 +2277,15 @@ int totemsrp_mcast (
struct message_item message_item;
char *addr;
unsigned int addr_idx;
struct cs_queue *queue_use;
if (cs_queue_is_full (&instance->new_message_queue)) {
if (instance->waiting_trans_ack) {
queue_use = &instance->new_message_queue_trans;
} else {
queue_use = &instance->new_message_queue;
}
if (cs_queue_is_full (queue_use)) {
log_printf (instance->totemsrp_log_level_debug, "queue full");
return (-1);
}
@ -2305,7 +2324,7 @@ int totemsrp_mcast (
log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
instance->stats.mcast_tx++;
cs_queue_item_add (&instance->new_message_queue, &message_item);
cs_queue_item_add (queue_use, &message_item);
return (0);
@ -2320,8 +2339,14 @@ int totemsrp_avail (void *srp_context)
{
struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
int avail;
struct cs_queue *queue_use;
cs_queue_avail (&instance->new_message_queue, &avail);
if (instance->waiting_trans_ack) {
queue_use = &instance->new_message_queue_trans;
} else {
queue_use = &instance->new_message_queue;
}
cs_queue_avail (queue_use, &avail);
return (avail);
}
@ -2483,7 +2508,12 @@ static int orf_token_mcast (
sort_queue = &instance->recovery_sort_queue;
reset_token_retransmit_timeout (instance); // REVIEWED
} else {
mcast_queue = &instance->new_message_queue;
if (instance->waiting_trans_ack) {
mcast_queue = &instance->new_message_queue_trans;
} else {
mcast_queue = &instance->new_message_queue;
}
sort_queue = &instance->regular_sort_queue;
}
@ -3372,13 +3402,20 @@ static void token_callbacks_execute (
static unsigned int backlog_get (struct totemsrp_instance *instance)
{
unsigned int backlog = 0;
struct cs_queue *queue_use = NULL;
if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
backlog = cs_queue_used (&instance->new_message_queue);
if (instance->waiting_trans_ack) {
queue_use = &instance->new_message_queue_trans;
} else {
queue_use = &instance->new_message_queue;
}
} else
if (instance->memb_state == MEMB_STATE_RECOVERY) {
backlog = cs_queue_used (&instance->retrans_message_queue);
queue_use = &instance->retrans_message_queue;
}
backlog = cs_queue_used (queue_use);
instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
return (backlog);
}
@ -4572,3 +4609,10 @@ void totemsrp_threaded_mode_enable (void *context)
instance->threaded_mode_enabled = 1;
}
void totemsrp_trans_ack (void *context)
{
struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
instance->waiting_trans_ack = 0;
}

View File

@ -138,4 +138,7 @@ extern int totemsrp_member_remove (
void totemsrp_threaded_mode_enable (
void *srp_context);
void totemsrp_trans_ack (
void *srp_context);
#endif /* TOTEMSRP_H_DEFINED */

View File

@ -184,6 +184,8 @@ extern void totempg_queue_level_register_callback (totem_queue_level_changed_fn)
extern void totempg_threaded_mode_enable (void);
extern void totempg_trans_ack (void);
#ifdef __cplusplus
}
#endif