Block multicast messages during synchronization.

(Logical change 1.151)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@535 fd59a12c-fef9-0310-b244-a6a79926bd2f
This commit is contained in:
Steven Dake 2005-03-16 23:41:41 +00:00
parent 6157ca5acf
commit 1e3efb5ec5
10 changed files with 86 additions and 18 deletions

View File

@ -29,13 +29,13 @@
# THE POSSIBILITY OF SUCH DAMAGE.
# Production mode flags
#CFLAGS = -O3 -Wall -fomit-frame-pointer
#LDFLAGS =
CFLAGS = -O3 -Wall -fomit-frame-pointer
LDFLAGS =
# Debug mode flags
CFLAGS = -g -Wall
#CFLAGS = -g -Wall
##-DDEBUG
LDFLAGS = -g
#LDFLAGS = -g
# Profile mode flags
#CFLAGS = -O2 -pg

View File

@ -318,61 +318,73 @@ struct libais_handler amf_libais_handlers[] =
.libais_handler_fn = message_handler_req_lib_activatepoll,
.response_size = sizeof (struct res_lib_activatepoll),
.response_id = MESSAGE_RES_LIB_ACTIVATEPOLL, // TODO RESPONSE
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 1 */
.libais_handler_fn = message_handler_req_amf_componentregister,
.response_size = sizeof (struct res_lib_amf_componentregister),
.response_id = MESSAGE_RES_AMF_COMPONENTREGISTER,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 2 */
.libais_handler_fn = message_handler_req_amf_componentunregister,
.response_size = sizeof (struct res_lib_amf_componentunregister),
.response_id = MESSAGE_RES_AMF_COMPONENTUNREGISTER,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 3 */
.libais_handler_fn = message_handler_req_amf_readinessstateget,
.response_size = sizeof (struct res_lib_amf_readinessstateget),
.response_id = MESSAGE_RES_AMF_READINESSSTATEGET,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 4 */
.libais_handler_fn = message_handler_req_amf_hastateget,
.response_size = sizeof (struct res_lib_amf_hastateget),
.response_id = MESSAGE_RES_AMF_READINESSSTATEGET,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 5 */
.libais_handler_fn = message_handler_req_amf_protectiongrouptrackstart,
.response_size = sizeof (struct res_lib_amf_protectiongrouptrackstart),
.response_id = MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTART,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 6 */
.libais_handler_fn = message_handler_req_amf_protectiongrouptrackstop,
.response_size = sizeof (struct res_lib_amf_protectiongrouptrackstop),
.response_id = MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTOP,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 7 */
.libais_handler_fn = message_handler_req_amf_errorreport,
.response_size = sizeof (struct res_lib_amf_errorreport),
.response_id = MESSAGE_RES_AMF_ERRORREPORT,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 8 */
.libais_handler_fn = message_handler_req_amf_errorcancelall,
.response_size = sizeof (struct res_lib_amf_errorcancelall),
.response_id = MESSAGE_RES_AMF_ERRORCANCELALL,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 9 */
.libais_handler_fn = message_handler_req_amf_stoppingcomplete,
.response_size = sizeof (struct res_lib_amf_stoppingcomplete),
.response_id = MESSAGE_RES_AMF_STOPPINGCOMPLETE, // TODO
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 10 */
.libais_handler_fn = message_handler_req_amf_response,
.response_size = sizeof (struct res_lib_amf_response),
.response_id = MESSAGE_RES_AMF_RESPONSE, // TODO
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 11 */
.libais_handler_fn = message_handler_req_amf_componentcapabilitymodelget,
.response_size = sizeof (struct res_lib_amf_componentcapabilitymodelget),
.response_id = MESSAGE_RES_AMF_COMPONENTCAPABILITYMODELGET,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
}
};

View File

@ -191,91 +191,109 @@ struct libais_handler ckpt_libais_handlers[] =
.libais_handler_fn = message_handler_req_lib_activatepoll,
.response_size = sizeof (struct res_lib_activatepoll),
.response_id = MESSAGE_RES_LIB_ACTIVATEPOLL,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 1 */
.libais_handler_fn = message_handler_req_lib_ckpt_checkpointopen,
.response_size = sizeof (struct res_lib_ckpt_checkpointopen),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 2 */
.libais_handler_fn = message_handler_req_lib_ckpt_checkpointopenasync,
.response_size = sizeof (struct res_lib_ckpt_checkpointopenasync),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 3 */
.libais_handler_fn = message_handler_req_lib_ckpt_checkpointclose,
.response_size = sizeof (struct res_lib_ckpt_checkpointclose),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTCLOSE,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 4 */
.libais_handler_fn = message_handler_req_lib_ckpt_checkpointunlink,
.response_size = sizeof (struct res_lib_ckpt_checkpointunlink),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTUNLINK,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 5 */
.libais_handler_fn = message_handler_req_lib_ckpt_checkpointretentiondurationset,
.response_size = sizeof (struct res_lib_ckpt_checkpointretentiondurationset),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 6 */
.libais_handler_fn = message_handler_req_lib_ckpt_activereplicaset,
.response_size = sizeof (struct res_lib_ckpt_activereplicaset),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_ACTIVEREPLICASET,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 7 */
.libais_handler_fn = message_handler_req_lib_ckpt_checkpointstatusget,
.response_size = sizeof (struct res_lib_ckpt_checkpointstatusget),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 8 */
.libais_handler_fn = message_handler_req_lib_ckpt_sectioncreate,
.response_size = sizeof (struct res_lib_ckpt_sectioncreate),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 9 */
.libais_handler_fn = message_handler_req_lib_ckpt_sectiondelete,
.response_size = sizeof (struct res_lib_ckpt_sectiondelete),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 10 */
.libais_handler_fn = message_handler_req_lib_ckpt_sectionexpirationtimeset,
.response_size = sizeof (struct res_lib_ckpt_sectionexpirationtimeset),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 11 */
.libais_handler_fn = message_handler_req_lib_ckpt_sectionwrite,
.response_size = sizeof (struct res_lib_ckpt_sectionwrite),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONWRITE,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 12 */
.libais_handler_fn = message_handler_req_lib_ckpt_sectionoverwrite,
.response_size = sizeof (struct res_lib_ckpt_sectionoverwrite),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONOVERWRITE,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 13 */
.libais_handler_fn = message_handler_req_lib_ckpt_sectionread,
.response_size = sizeof (struct res_lib_ckpt_sectionread),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONREAD,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 14 */
.libais_handler_fn = message_handler_req_lib_ckpt_checkpointsynchronize,
.response_size = sizeof (struct res_lib_ckpt_checkpointsynchronize),
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 15 */
.libais_handler_fn = message_handler_req_lib_ckpt_checkpointsynchronizeasync,
.response_size = sizeof (struct res_lib_ckpt_checkpointsynchronizeasync), /* TODO RESPONSE */
.response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 16 */
.libais_handler_fn = message_handler_req_lib_ckpt_sectioniteratorinitialize,
.response_size = sizeof (struct res_lib_ckpt_sectioniteratorinitialize),
.response_id = MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 17 */
.libais_handler_fn = message_handler_req_lib_ckpt_sectioniteratornext,
.response_size = sizeof (struct res_lib_ckpt_sectioniteratornext),
.response_id = MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORNEXT,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
}
};

View File

@ -76,8 +76,6 @@ int clusterNodeEntries = 0;
static DECLARE_LIST_INIT (library_notification_send_listhead);
static int should_send_nodejoin = 0;
SaClmClusterNodeT *clm_get_by_nodeid (struct in_addr node_id)
{
SaClmClusterNodeT *ret = NULL;
@ -146,26 +144,31 @@ struct libais_handler clm_libais_handlers[] =
.libais_handler_fn = message_handler_req_lib_activatepoll,
.response_size = sizeof (struct res_lib_activatepoll),
.response_id = MESSAGE_RES_LIB_ACTIVATEPOLL, // TODO RESPONSE
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 1 */
.libais_handler_fn = message_handler_req_clm_clustertrack,
.response_size = sizeof (struct res_clm_clustertrack),
.response_id = MESSAGE_RES_CLM_TRACKSTART, // TODO RESPONSE
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 2 */
.libais_handler_fn = message_handler_req_clm_trackstop,
.response_size = sizeof (struct res_clm_trackstop),
.response_id = MESSAGE_RES_CLM_TRACKSTOP, // TODO RESPONSE
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 3 */
.libais_handler_fn = message_handler_req_clm_nodeget,
.response_size = sizeof (struct res_clm_nodeget),
.response_id = MESSAGE_RES_CLM_NODEGET, // TODO RESPONSE
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 4 */
.libais_handler_fn = message_handler_req_clm_nodegetasync,
.response_size = sizeof (struct res_clm_nodegetasync),
.response_id = MESSAGE_RES_CLM_NODEGETCALLBACK, // TODO RESPONSE
.flow_control = FLOW_CONTROL_NOT_REQUIRED
}
};
@ -417,10 +420,6 @@ static int clm_confchg_fn (
log_printf (LOG_LEVEL_NOTICE, "\t%s\n", inet_ntoa (joined_list[i]));
}
if (joined_list_entries > 0) {
should_send_nodejoin = 1;
}
for (i = 0; i < left_list_entries; i++) {
nodes[i] = left_list[i].s_addr;
}
@ -447,9 +446,7 @@ static int clm_sync_process (void)
/*
* Send node information to other nodes
*/
if (should_send_nodejoin) {
return (clm_nodejoin_send ());
}
return (clm_nodejoin_send ());
return (0);
}
@ -458,7 +455,6 @@ static int clm_sync_process (void)
*/
static void clm_sync_activate (void)
{
should_send_nodejoin = 0;
return;
}

View File

@ -100,26 +100,31 @@ struct libais_handler evs_libais_handlers[] =
.libais_handler_fn = message_handler_req_lib_activatepoll,
.response_size = sizeof (struct res_lib_activatepoll),
.response_id = MESSAGE_RES_LIB_ACTIVATEPOLL, // TODO RESPONSE
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 1 */
.libais_handler_fn = message_handler_req_evs_join,
.response_size = sizeof (struct res_lib_evs_join),
.response_id = MESSAGE_RES_EVS_JOIN,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 2 */
.libais_handler_fn = message_handler_req_evs_leave,
.response_size = sizeof (struct res_lib_evs_leave),
.response_id = MESSAGE_RES_EVS_LEAVE,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{ /* 3 */
.libais_handler_fn = message_handler_req_evs_mcast_joined,
.response_size = sizeof (struct res_lib_evs_mcast_joined),
.response_id = MESSAGE_RES_EVS_MCAST_JOINED,
.flow_control = FLOW_CONTROL_REQUIRED
},
{ /* 4 */
.libais_handler_fn = message_handler_req_evs_mcast_groups,
.response_size = sizeof (struct res_lib_evs_mcast_groups),
.response_id = MESSAGE_RES_EVS_MCAST_GROUPS,
.flow_control = FLOW_CONTROL_REQUIRED
}
};

View File

@ -108,51 +108,61 @@ static struct libais_handler evt_libais_handlers[] = {
.libais_handler_fn = message_handler_req_lib_activatepoll,
.response_size = sizeof(struct res_lib_activatepoll),
.response_id = MESSAGE_RES_LIB_ACTIVATEPOLL,
.flow_control = FLOW_CONTROL_REQUIRED
},
{
.libais_handler_fn = lib_evt_open_channel,
.response_size = sizeof(struct res_evt_channel_open),
.response_id = MESSAGE_RES_EVT_OPEN_CHANNEL,
.flow_control = FLOW_CONTROL_REQUIRED
},
{
.libais_handler_fn = lib_evt_open_channel_async,
.response_size = sizeof(struct res_evt_channel_open),
.response_id = MESSAGE_RES_EVT_OPEN_CHANNEL,
.flow_control = FLOW_CONTROL_REQUIRED
},
{
.libais_handler_fn = lib_evt_close_channel,
.response_size = sizeof(struct res_evt_channel_close),
.response_id = MESSAGE_RES_EVT_CLOSE_CHANNEL,
.flow_control = FLOW_CONTROL_REQUIRED
},
{
.libais_handler_fn = lib_evt_unlink_channel,
.response_size = sizeof(struct res_evt_channel_unlink),
.response_id = MESSAGE_RES_EVT_UNLINK_CHANNEL,
.flow_control = FLOW_CONTROL_REQUIRED
},
{
.libais_handler_fn = lib_evt_event_subscribe,
.response_size = sizeof(struct res_evt_event_subscribe),
.response_id = MESSAGE_RES_EVT_SUBSCRIBE,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{
.libais_handler_fn = lib_evt_event_unsubscribe,
.response_size = sizeof(struct res_evt_event_unsubscribe),
.response_id = MESSAGE_RES_EVT_UNSUBSCRIBE,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
{
.libais_handler_fn = lib_evt_event_publish,
.response_size = sizeof(struct res_evt_event_publish),
.response_id = MESSAGE_RES_EVT_PUBLISH,
.flow_control = FLOW_CONTROL_REQUIRED
},
{
.libais_handler_fn = lib_evt_event_clear_retentiontime,
.response_size = sizeof(struct res_evt_event_clear_retentiontime),
.response_id = MESSAGE_RES_EVT_CLEAR_RETENTIONTIME,
.flow_control = FLOW_CONTROL_REQUIRED
},
{
.libais_handler_fn = lib_evt_event_data_get,
.response_size = sizeof(struct lib_event_data),
.response_id = MESSAGE_RES_EVT_EVENT_DATA,
.flow_control = FLOW_CONTROL_NOT_REQUIRED
},
};

View File

@ -39,10 +39,16 @@
#include "totempg.h"
#include "totemsrp.h"
enum flow_control {
FLOW_CONTROL_REQUIRED = 1,
FLOW_CONTROL_NOT_REQUIRED = 2
};
struct libais_handler {
int (*libais_handler_fn) (struct conn_info *conn_info, void *msg);
int response_size;
int response_id;
enum flow_control flow_control;
};
struct service_handler {

View File

@ -572,11 +572,17 @@ retry_recv:
}
/*
* Determine if a message can be queued with totempg and if so
* deliver it, otherwise tell the library we are too busy
* If flow control is required of the library handle, determine that
* openais is not in synchronization and that totempg has room available
* to queue a message, otherwise tell the library we are busy and to
* try again later
*/
send_ok = totempg_send_ok (1000 + header->size);
send_ok =
(ais_service_handlers[service - 1]->libais_handlers[header->id].flow_control == FLOW_CONTROL_NOT_REQUIRED) ||
((ais_service_handlers[service - 1]->libais_handlers[header->id].flow_control == FLOW_CONTROL_REQUIRED) &&
(totempg_send_ok (1000 + header->size)) &&
(sync_in_process() == 0));
if (send_ok) {
// *prio = 0;
res = ais_service_handlers[service - 1]->libais_handlers[header->id].libais_handler_fn(conn_info, header);

View File

@ -68,6 +68,8 @@ static struct sync_callbacks *sync_callbacks;
static int sync_callback_count;
static int sync_processing = 0;
static void (*sync_synchronization_completed) (void);
static int sync_recovery_index = 0;
@ -145,6 +147,7 @@ static int sync_service_process (enum totem_callback_token_type type, void *data
sync_recovery_index += 1;
totemsrp_callback_token_destroy (&sync_callback_token_handle);
if (sync_recovery_index > sync_callback_count) {
sync_processing = 0;
} else {
sync_barrier_start (ring_id);
}
@ -175,6 +178,8 @@ void sync_confchg_fn (
return;
}
sync_processing = 1;
totemsrp_callback_token_destroy (&sync_callback_token_handle);
sync_ring_id = ring_id;
@ -245,7 +250,15 @@ int sync_deliver_fn (void *msg, struct in_addr source_addr,
sizeof (barrier_data_confchg));
if (sync_recovery_index < sync_callback_count) {
sync_service_init (&deliver_ring_id);
} else {
sync_processing = 0;
}
}
return (0);
}
int sync_in_process (void)
{
return (sync_processing);
}

View File

@ -62,4 +62,6 @@ void sync_confchg_fn (
int sync_deliver_fn (void *msg, struct in_addr source_addr,
int endian_conversion_needed);
int sync_in_process (void);
#endif /* SYNC_H_DEFINED */