diff --git a/exec/Makefile b/exec/Makefile index 9b3e3a7c..667cdb4a 100644 --- a/exec/Makefile +++ b/exec/Makefile @@ -29,13 +29,13 @@ # THE POSSIBILITY OF SUCH DAMAGE. # Production mode flags -#CFLAGS = -O3 -Wall -fomit-frame-pointer -#LDFLAGS = +CFLAGS = -O3 -Wall -fomit-frame-pointer +LDFLAGS = # Debug mode flags -CFLAGS = -g -Wall +#CFLAGS = -g -Wall ##-DDEBUG -LDFLAGS = -g +#LDFLAGS = -g # Profile mode flags #CFLAGS = -O2 -pg diff --git a/exec/amf.c b/exec/amf.c index 78b09ab3..102e6bda 100644 --- a/exec/amf.c +++ b/exec/amf.c @@ -318,61 +318,73 @@ struct libais_handler amf_libais_handlers[] = .libais_handler_fn = message_handler_req_lib_activatepoll, .response_size = sizeof (struct res_lib_activatepoll), .response_id = MESSAGE_RES_LIB_ACTIVATEPOLL, // TODO RESPONSE + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 1 */ .libais_handler_fn = message_handler_req_amf_componentregister, .response_size = sizeof (struct res_lib_amf_componentregister), .response_id = MESSAGE_RES_AMF_COMPONENTREGISTER, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 2 */ .libais_handler_fn = message_handler_req_amf_componentunregister, .response_size = sizeof (struct res_lib_amf_componentunregister), .response_id = MESSAGE_RES_AMF_COMPONENTUNREGISTER, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 3 */ .libais_handler_fn = message_handler_req_amf_readinessstateget, .response_size = sizeof (struct res_lib_amf_readinessstateget), .response_id = MESSAGE_RES_AMF_READINESSSTATEGET, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 4 */ .libais_handler_fn = message_handler_req_amf_hastateget, .response_size = sizeof (struct res_lib_amf_hastateget), .response_id = MESSAGE_RES_AMF_READINESSSTATEGET, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 5 */ .libais_handler_fn = message_handler_req_amf_protectiongrouptrackstart, .response_size = sizeof (struct res_lib_amf_protectiongrouptrackstart), .response_id = MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTART, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 6 */ .libais_handler_fn = message_handler_req_amf_protectiongrouptrackstop, .response_size = sizeof (struct res_lib_amf_protectiongrouptrackstop), .response_id = MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTOP, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 7 */ .libais_handler_fn = message_handler_req_amf_errorreport, .response_size = sizeof (struct res_lib_amf_errorreport), .response_id = MESSAGE_RES_AMF_ERRORREPORT, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 8 */ .libais_handler_fn = message_handler_req_amf_errorcancelall, .response_size = sizeof (struct res_lib_amf_errorcancelall), .response_id = MESSAGE_RES_AMF_ERRORCANCELALL, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 9 */ .libais_handler_fn = message_handler_req_amf_stoppingcomplete, .response_size = sizeof (struct res_lib_amf_stoppingcomplete), .response_id = MESSAGE_RES_AMF_STOPPINGCOMPLETE, // TODO + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 10 */ .libais_handler_fn = message_handler_req_amf_response, .response_size = sizeof (struct res_lib_amf_response), .response_id = MESSAGE_RES_AMF_RESPONSE, // TODO + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 11 */ .libais_handler_fn = message_handler_req_amf_componentcapabilitymodelget, .response_size = sizeof (struct res_lib_amf_componentcapabilitymodelget), .response_id = MESSAGE_RES_AMF_COMPONENTCAPABILITYMODELGET, + .flow_control = FLOW_CONTROL_NOT_REQUIRED } }; diff --git a/exec/ckpt.c b/exec/ckpt.c index 57a5f80e..b2918b36 100644 --- a/exec/ckpt.c +++ b/exec/ckpt.c @@ -191,91 +191,109 @@ struct libais_handler ckpt_libais_handlers[] = .libais_handler_fn = message_handler_req_lib_activatepoll, .response_size = sizeof (struct res_lib_activatepoll), .response_id = MESSAGE_RES_LIB_ACTIVATEPOLL, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 1 */ .libais_handler_fn = message_handler_req_lib_ckpt_checkpointopen, .response_size = sizeof (struct res_lib_ckpt_checkpointopen), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 2 */ .libais_handler_fn = message_handler_req_lib_ckpt_checkpointopenasync, .response_size = sizeof (struct res_lib_ckpt_checkpointopenasync), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 3 */ .libais_handler_fn = message_handler_req_lib_ckpt_checkpointclose, .response_size = sizeof (struct res_lib_ckpt_checkpointclose), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTCLOSE, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 4 */ .libais_handler_fn = message_handler_req_lib_ckpt_checkpointunlink, .response_size = sizeof (struct res_lib_ckpt_checkpointunlink), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTUNLINK, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 5 */ .libais_handler_fn = message_handler_req_lib_ckpt_checkpointretentiondurationset, .response_size = sizeof (struct res_lib_ckpt_checkpointretentiondurationset), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 6 */ .libais_handler_fn = message_handler_req_lib_ckpt_activereplicaset, .response_size = sizeof (struct res_lib_ckpt_activereplicaset), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_ACTIVEREPLICASET, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 7 */ .libais_handler_fn = message_handler_req_lib_ckpt_checkpointstatusget, .response_size = sizeof (struct res_lib_ckpt_checkpointstatusget), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 8 */ .libais_handler_fn = message_handler_req_lib_ckpt_sectioncreate, .response_size = sizeof (struct res_lib_ckpt_sectioncreate), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 9 */ .libais_handler_fn = message_handler_req_lib_ckpt_sectiondelete, .response_size = sizeof (struct res_lib_ckpt_sectiondelete), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 10 */ .libais_handler_fn = message_handler_req_lib_ckpt_sectionexpirationtimeset, .response_size = sizeof (struct res_lib_ckpt_sectionexpirationtimeset), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 11 */ .libais_handler_fn = message_handler_req_lib_ckpt_sectionwrite, .response_size = sizeof (struct res_lib_ckpt_sectionwrite), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONWRITE, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 12 */ .libais_handler_fn = message_handler_req_lib_ckpt_sectionoverwrite, .response_size = sizeof (struct res_lib_ckpt_sectionoverwrite), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONOVERWRITE, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 13 */ .libais_handler_fn = message_handler_req_lib_ckpt_sectionread, .response_size = sizeof (struct res_lib_ckpt_sectionread), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONREAD, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 14 */ .libais_handler_fn = message_handler_req_lib_ckpt_checkpointsynchronize, .response_size = sizeof (struct res_lib_ckpt_checkpointsynchronize), .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 15 */ .libais_handler_fn = message_handler_req_lib_ckpt_checkpointsynchronizeasync, .response_size = sizeof (struct res_lib_ckpt_checkpointsynchronizeasync), /* TODO RESPONSE */ .response_id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 16 */ .libais_handler_fn = message_handler_req_lib_ckpt_sectioniteratorinitialize, .response_size = sizeof (struct res_lib_ckpt_sectioniteratorinitialize), .response_id = MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 17 */ .libais_handler_fn = message_handler_req_lib_ckpt_sectioniteratornext, .response_size = sizeof (struct res_lib_ckpt_sectioniteratornext), .response_id = MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORNEXT, + .flow_control = FLOW_CONTROL_NOT_REQUIRED } }; diff --git a/exec/clm.c b/exec/clm.c index c8419faf..77a0b8cf 100644 --- a/exec/clm.c +++ b/exec/clm.c @@ -76,8 +76,6 @@ int clusterNodeEntries = 0; static DECLARE_LIST_INIT (library_notification_send_listhead); -static int should_send_nodejoin = 0; - SaClmClusterNodeT *clm_get_by_nodeid (struct in_addr node_id) { SaClmClusterNodeT *ret = NULL; @@ -146,26 +144,31 @@ struct libais_handler clm_libais_handlers[] = .libais_handler_fn = message_handler_req_lib_activatepoll, .response_size = sizeof (struct res_lib_activatepoll), .response_id = MESSAGE_RES_LIB_ACTIVATEPOLL, // TODO RESPONSE + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .libais_handler_fn = message_handler_req_clm_clustertrack, .response_size = sizeof (struct res_clm_clustertrack), .response_id = MESSAGE_RES_CLM_TRACKSTART, // TODO RESPONSE + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .libais_handler_fn = message_handler_req_clm_trackstop, .response_size = sizeof (struct res_clm_trackstop), .response_id = MESSAGE_RES_CLM_TRACKSTOP, // TODO RESPONSE + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 3 */ .libais_handler_fn = message_handler_req_clm_nodeget, .response_size = sizeof (struct res_clm_nodeget), .response_id = MESSAGE_RES_CLM_NODEGET, // TODO RESPONSE + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 4 */ .libais_handler_fn = message_handler_req_clm_nodegetasync, .response_size = sizeof (struct res_clm_nodegetasync), .response_id = MESSAGE_RES_CLM_NODEGETCALLBACK, // TODO RESPONSE + .flow_control = FLOW_CONTROL_NOT_REQUIRED } }; @@ -417,10 +420,6 @@ static int clm_confchg_fn ( log_printf (LOG_LEVEL_NOTICE, "\t%s\n", inet_ntoa (joined_list[i])); } - if (joined_list_entries > 0) { - should_send_nodejoin = 1; - } - for (i = 0; i < left_list_entries; i++) { nodes[i] = left_list[i].s_addr; } @@ -447,9 +446,7 @@ static int clm_sync_process (void) /* * Send node information to other nodes */ - if (should_send_nodejoin) { - return (clm_nodejoin_send ()); - } + return (clm_nodejoin_send ()); return (0); } @@ -458,7 +455,6 @@ static int clm_sync_process (void) */ static void clm_sync_activate (void) { - should_send_nodejoin = 0; return; } diff --git a/exec/evs.c b/exec/evs.c index 51e477a8..f6d2c88f 100644 --- a/exec/evs.c +++ b/exec/evs.c @@ -100,26 +100,31 @@ struct libais_handler evs_libais_handlers[] = .libais_handler_fn = message_handler_req_lib_activatepoll, .response_size = sizeof (struct res_lib_activatepoll), .response_id = MESSAGE_RES_LIB_ACTIVATEPOLL, // TODO RESPONSE + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 1 */ .libais_handler_fn = message_handler_req_evs_join, .response_size = sizeof (struct res_lib_evs_join), .response_id = MESSAGE_RES_EVS_JOIN, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 2 */ .libais_handler_fn = message_handler_req_evs_leave, .response_size = sizeof (struct res_lib_evs_leave), .response_id = MESSAGE_RES_EVS_LEAVE, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { /* 3 */ .libais_handler_fn = message_handler_req_evs_mcast_joined, .response_size = sizeof (struct res_lib_evs_mcast_joined), .response_id = MESSAGE_RES_EVS_MCAST_JOINED, + .flow_control = FLOW_CONTROL_REQUIRED }, { /* 4 */ .libais_handler_fn = message_handler_req_evs_mcast_groups, .response_size = sizeof (struct res_lib_evs_mcast_groups), .response_id = MESSAGE_RES_EVS_MCAST_GROUPS, + .flow_control = FLOW_CONTROL_REQUIRED } }; diff --git a/exec/evt.c b/exec/evt.c index c40d8490..e0e5a562 100644 --- a/exec/evt.c +++ b/exec/evt.c @@ -108,51 +108,61 @@ static struct libais_handler evt_libais_handlers[] = { .libais_handler_fn = message_handler_req_lib_activatepoll, .response_size = sizeof(struct res_lib_activatepoll), .response_id = MESSAGE_RES_LIB_ACTIVATEPOLL, + .flow_control = FLOW_CONTROL_REQUIRED }, { .libais_handler_fn = lib_evt_open_channel, .response_size = sizeof(struct res_evt_channel_open), .response_id = MESSAGE_RES_EVT_OPEN_CHANNEL, + .flow_control = FLOW_CONTROL_REQUIRED }, { .libais_handler_fn = lib_evt_open_channel_async, .response_size = sizeof(struct res_evt_channel_open), .response_id = MESSAGE_RES_EVT_OPEN_CHANNEL, + .flow_control = FLOW_CONTROL_REQUIRED }, { .libais_handler_fn = lib_evt_close_channel, .response_size = sizeof(struct res_evt_channel_close), .response_id = MESSAGE_RES_EVT_CLOSE_CHANNEL, + .flow_control = FLOW_CONTROL_REQUIRED }, { .libais_handler_fn = lib_evt_unlink_channel, .response_size = sizeof(struct res_evt_channel_unlink), .response_id = MESSAGE_RES_EVT_UNLINK_CHANNEL, + .flow_control = FLOW_CONTROL_REQUIRED }, { .libais_handler_fn = lib_evt_event_subscribe, .response_size = sizeof(struct res_evt_event_subscribe), .response_id = MESSAGE_RES_EVT_SUBSCRIBE, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { .libais_handler_fn = lib_evt_event_unsubscribe, .response_size = sizeof(struct res_evt_event_unsubscribe), .response_id = MESSAGE_RES_EVT_UNSUBSCRIBE, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, { .libais_handler_fn = lib_evt_event_publish, .response_size = sizeof(struct res_evt_event_publish), .response_id = MESSAGE_RES_EVT_PUBLISH, + .flow_control = FLOW_CONTROL_REQUIRED }, { .libais_handler_fn = lib_evt_event_clear_retentiontime, .response_size = sizeof(struct res_evt_event_clear_retentiontime), .response_id = MESSAGE_RES_EVT_CLEAR_RETENTIONTIME, + .flow_control = FLOW_CONTROL_REQUIRED }, { .libais_handler_fn = lib_evt_event_data_get, .response_size = sizeof(struct lib_event_data), .response_id = MESSAGE_RES_EVT_EVENT_DATA, + .flow_control = FLOW_CONTROL_NOT_REQUIRED }, }; diff --git a/exec/handlers.h b/exec/handlers.h index 3a9740ad..8b438100 100644 --- a/exec/handlers.h +++ b/exec/handlers.h @@ -39,10 +39,16 @@ #include "totempg.h" #include "totemsrp.h" +enum flow_control { + FLOW_CONTROL_REQUIRED = 1, + FLOW_CONTROL_NOT_REQUIRED = 2 +}; + struct libais_handler { int (*libais_handler_fn) (struct conn_info *conn_info, void *msg); int response_size; int response_id; + enum flow_control flow_control; }; struct service_handler { diff --git a/exec/main.c b/exec/main.c index eab825c6..ae4574db 100644 --- a/exec/main.c +++ b/exec/main.c @@ -572,11 +572,17 @@ retry_recv: } /* - * Determine if a message can be queued with totempg and if so - * deliver it, otherwise tell the library we are too busy + * If flow control is required of the library handle, determine that + * openais is not in synchronization and that totempg has room available + * to queue a message, otherwise tell the library we are busy and to + * try again later */ - - send_ok = totempg_send_ok (1000 + header->size); + send_ok = + (ais_service_handlers[service - 1]->libais_handlers[header->id].flow_control == FLOW_CONTROL_NOT_REQUIRED) || + ((ais_service_handlers[service - 1]->libais_handlers[header->id].flow_control == FLOW_CONTROL_REQUIRED) && + (totempg_send_ok (1000 + header->size)) && + (sync_in_process() == 0)); + if (send_ok) { // *prio = 0; res = ais_service_handlers[service - 1]->libais_handlers[header->id].libais_handler_fn(conn_info, header); diff --git a/exec/sync.c b/exec/sync.c index 7d652245..a9ee20b6 100644 --- a/exec/sync.c +++ b/exec/sync.c @@ -68,6 +68,8 @@ static struct sync_callbacks *sync_callbacks; static int sync_callback_count; +static int sync_processing = 0; + static void (*sync_synchronization_completed) (void); static int sync_recovery_index = 0; @@ -145,6 +147,7 @@ static int sync_service_process (enum totem_callback_token_type type, void *data sync_recovery_index += 1; totemsrp_callback_token_destroy (&sync_callback_token_handle); if (sync_recovery_index > sync_callback_count) { + sync_processing = 0; } else { sync_barrier_start (ring_id); } @@ -175,6 +178,8 @@ void sync_confchg_fn ( return; } + sync_processing = 1; + totemsrp_callback_token_destroy (&sync_callback_token_handle); sync_ring_id = ring_id; @@ -245,7 +250,15 @@ int sync_deliver_fn (void *msg, struct in_addr source_addr, sizeof (barrier_data_confchg)); if (sync_recovery_index < sync_callback_count) { sync_service_init (&deliver_ring_id); + } else { + sync_processing = 0; } } return (0); } + +int sync_in_process (void) +{ + return (sync_processing); +} + diff --git a/exec/sync.h b/exec/sync.h index 3360edad..d407efdf 100644 --- a/exec/sync.h +++ b/exec/sync.h @@ -62,4 +62,6 @@ void sync_confchg_fn ( int sync_deliver_fn (void *msg, struct in_addr source_addr, int endian_conversion_needed); +int sync_in_process (void); + #endif /* SYNC_H_DEFINED */