diff --git a/exec/apidef.c b/exec/apidef.c index f74ec922..a3b3520b 100644 --- a/exec/apidef.c +++ b/exec/apidef.c @@ -70,6 +70,7 @@ static struct corosync_api_v1 apidef_corosync_api_v1 = { .ipc_source_is_local = message_source_is_local, .ipc_private_data_get = corosync_conn_private_data_get, .ipc_response_send = NULL, + .ipc_response_no_fcc = corosync_conn_send_response_no_fcc, .ipc_dispatch_send = NULL, .ipc_conn_send_response = corosync_conn_send_response, .ipc_conn_partner_get = corosync_conn_partner_get, diff --git a/exec/ipc.c b/exec/ipc.c index 977f60c0..04712478 100644 --- a/exec/ipc.c +++ b/exec/ipc.c @@ -111,6 +111,8 @@ LOGSYS_DECLARE_SUBSYS ("IPC", LOG_INFO); static unsigned int g_gid_valid = 0; +static unsigned int dont_call_flow_control = 0; + static totempg_groups_handle ipc_handle; DECLARE_LIST_INIT (conn_info_list_head); @@ -1125,6 +1127,17 @@ void *corosync_conn_partner_get (void *conn) } } +int corosync_conn_send_response_no_fcc ( + void *conn, + void *msg, + int mlen) +{ + dont_call_flow_control = 1; + corosync_conn_send_response ( + conn, msg, mlen); + dont_call_flow_control = 0; +} + int corosync_conn_send_response ( void *conn, void *msg, @@ -1149,7 +1162,9 @@ int corosync_conn_send_response ( return (-1); } - ipc_flow_control (conn_info); + if (dont_call_flow_control == 0) { + ipc_flow_control (conn_info); + } outq = &conn_info->outq; diff --git a/exec/ipc.h b/exec/ipc.h index a29a6981..48da14fe 100644 --- a/exec/ipc.h +++ b/exec/ipc.h @@ -52,6 +52,8 @@ extern void *corosync_conn_private_data_get (void *conn); extern int corosync_conn_send_response (void *conn, void *msg, int mlen); +extern int corosync_conn_send_response_no_fcc (void *conn, void *msg,int mlen); + extern void corosync_ipc_init ( void (*serialize_lock_fn) (void), void (*serialize_unlock_fn) (void), diff --git a/include/corosync/engine/coroapi.h b/include/corosync/engine/coroapi.h index ac27f4ca..d2abd12d 100644 --- a/include/corosync/engine/coroapi.h +++ b/include/corosync/engine/coroapi.h @@ -336,6 +336,8 @@ struct corosync_api_v1 { int (*ipc_response_send) (void *conn, void *msg, int mlen); + int (*ipc_response_no_fcc) (void *conn, void *msg, int mlen); + int (*ipc_dispatch_send) (void *conn, void *msg, int mlen); /* diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h index 32b8544f..c1e68bef 100644 --- a/include/corosync/ipc_cpg.h +++ b/include/corosync/ipc_cpg.h @@ -62,7 +62,8 @@ enum res_cpg_types { MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8, MESSAGE_RES_CPG_LOCAL_GET = 9, MESSAGE_RES_CPG_GROUPS_GET = 10, - MESSAGE_RES_CPG_GROUPS_CALLBACK = 11 + MESSAGE_RES_CPG_GROUPS_CALLBACK = 11, + MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK = 12 }; enum lib_cpg_confchg_reason { @@ -135,6 +136,11 @@ struct res_lib_cpg_deliver_callback { mar_uint8_t message[] __attribute__((aligned(8))); }; +struct res_lib_cpg_flowcontrol_callback { + mar_res_header_t header __attribute__((aligned(8))); + mar_uint32_t flow_control_state __attribute__((aligned(8))); +}; + struct req_lib_cpg_membership { mar_req_header_t header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); diff --git a/lib/cpg.c b/lib/cpg.c index 6509b7af..89390f7a 100644 --- a/lib/cpg.c +++ b/lib/cpg.c @@ -248,6 +248,7 @@ cpg_error_t cpg_dispatch ( int cont = 1; /* always continue do loop except when set to 0 */ int dispatch_avail; struct cpg_inst *cpg_inst; + struct res_lib_cpg_flowcontrol_callback *res_cpg_flowcontrol_callback; struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback; struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback; struct res_lib_cpg_groups_get_callback *res_lib_cpg_groups_get_callback; @@ -397,6 +398,7 @@ cpg_error_t cpg_dispatch ( joined_list, res_cpg_confchg_callback->joined_list_entries); break; + case MESSAGE_RES_CPG_GROUPS_CALLBACK: res_lib_cpg_groups_get_callback = (struct res_lib_cpg_groups_get_callback *)&dispatch_data; marshall_from_mar_cpg_name_t ( @@ -413,6 +415,12 @@ cpg_error_t cpg_dispatch ( &group_name, member_list, res_lib_cpg_groups_get_callback->num_members); + + break; + + case MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK: + res_cpg_flowcontrol_callback = (struct res_lib_cpg_flowcontrol_callback *)&dispatch_data; + cpg_inst->flow_control_state = res_cpg_flowcontrol_callback->flow_control_state; break; default: @@ -598,9 +606,14 @@ cpg_error_t cpg_mcast_joined ( goto error_exit; } - cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state; - if (res_lib_cpg_mcast.header.error == CPG_ERR_TRY_AGAIN) { - cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED; +/* Only update the flow control state when the return value is OK. + * Otherwise the flow control state is not guaranteed to be valid in the + * return message. + * Also, don't set to ENABLED if the return value is TRY_AGAIN as this can lead + * to Flow Control State sync issues between AIS LIB and EXEC. + */ + if (res_lib_cpg_mcast.header.error == CPG_OK) { + cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state; } error = res_lib_cpg_mcast.header.error; diff --git a/services/cpg.c b/services/cpg.c index d5d37deb..01405e00 100644 --- a/services/cpg.c +++ b/services/cpg.c @@ -738,9 +738,28 @@ static void cpg_flow_control_state_set_fn ( void *context, enum corosync_flow_control_state flow_control_state) { + struct res_lib_cpg_flowcontrol_callback res_lib_cpg_flowcontrol_callback; struct process_info *process_info = (struct process_info *)context; process_info->flow_control_state = flow_control_state; + /* + * Send disabled flow control if a disabled occurs. This prevents + * the condition where a disabled occurs after all messages have been + * delivered and then there is no valid way to retrieve the flow + * control state + */ + if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) { + res_lib_cpg_flowcontrol_callback.header.id = MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK; + res_lib_cpg_flowcontrol_callback.header.size = sizeof (struct res_lib_cpg_flowcontrol_callback); + res_lib_cpg_flowcontrol_callback.flow_control_state = flow_control_state; + + if (process_info->trackerconn) { + api->ipc_response_no_fcc ( + process_info->trackerconn, + &res_lib_cpg_flowcontrol_callback, + sizeof (struct res_lib_cpg_flowcontrol_callback)); + } + } } /* Can byteswap join & leave messages */ @@ -965,7 +984,6 @@ static void message_handler_req_exec_cpg_mcast ( { struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)message; struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast; - struct process_info *process_info; int msglen = req_exec_cpg_mcast->msglen; char buf[sizeof(*res_lib_cpg_mcast) + msglen]; struct group_info *gi; @@ -986,8 +1004,6 @@ static void message_handler_req_exec_cpg_mcast ( res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED; if (api->ipc_source_is_local (&req_exec_cpg_mcast->source)) { api->ipc_refcnt_dec (req_exec_cpg_mcast->source.conn); - process_info = (struct process_info *)api->ipc_private_data_get (req_exec_cpg_mcast->source.conn); - res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state; } memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name, sizeof(mar_cpg_name_t)); @@ -998,6 +1014,7 @@ static void message_handler_req_exec_cpg_mcast ( for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) { + res_lib_cpg_mcast->flow_control_state = pi->flow_control_state; api->ipc_conn_send_response( pi->trackerconn, buf,