From f9609f3217ded242e6dce5dfc163a7bb6f06acbc Mon Sep 17 00:00:00 2001 From: Angus Salkeld Date: Wed, 17 Sep 2008 19:15:00 +0000 Subject: [PATCH] Propagate the flow control state between AIS exec and library This patch causes the flow control state in the library to be set properly when the flow control is turned off (disabled). Then it can be read properly by the flow control apis. This also fixes the case where the application is no longer sending messages and it has already dispatched all its received messages before flow control is disabled. Also, CPG response messages with a TRY_AGAIN error did NOT contain a valid flow control state value. This meant the library could get stuck with flow control enabled (flow control was never enabled for the EXEC, so no disable event occurred). This case was hit when a new node was joining - sync_in_process() resulted in a TRY_AGAIN for error cpg_mcast_joined). Also, in message_handler_req_exec_cpg_mcast() the state passed back to the library defaulted to disabled for messages received from another node (even if flow control was still enabled) - this meant if multiple nodes were sending CPG messages, then the library flow control state flip-flopped between enabled and disabled. Author: Steven Dake & Tim Beale git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1667 fd59a12c-fef9-0310-b244-a6a79926bd2f --- exec/apidef.c | 1 + exec/ipc.c | 17 ++++++++++++++++- exec/ipc.h | 2 ++ include/corosync/engine/coroapi.h | 2 ++ include/corosync/ipc_cpg.h | 8 +++++++- lib/cpg.c | 19 ++++++++++++++++--- services/cpg.c | 23 ++++++++++++++++++++--- 7 files changed, 64 insertions(+), 8 deletions(-) diff --git a/exec/apidef.c b/exec/apidef.c index f74ec922..a3b3520b 100644 --- a/exec/apidef.c +++ b/exec/apidef.c @@ -70,6 +70,7 @@ static struct corosync_api_v1 apidef_corosync_api_v1 = { .ipc_source_is_local = message_source_is_local, .ipc_private_data_get = corosync_conn_private_data_get, .ipc_response_send = NULL, + .ipc_response_no_fcc = corosync_conn_send_response_no_fcc, .ipc_dispatch_send = NULL, .ipc_conn_send_response = corosync_conn_send_response, .ipc_conn_partner_get = corosync_conn_partner_get, diff --git a/exec/ipc.c b/exec/ipc.c index 977f60c0..04712478 100644 --- a/exec/ipc.c +++ b/exec/ipc.c @@ -111,6 +111,8 @@ LOGSYS_DECLARE_SUBSYS ("IPC", LOG_INFO); static unsigned int g_gid_valid = 0; +static unsigned int dont_call_flow_control = 0; + static totempg_groups_handle ipc_handle; DECLARE_LIST_INIT (conn_info_list_head); @@ -1125,6 +1127,17 @@ void *corosync_conn_partner_get (void *conn) } } +int corosync_conn_send_response_no_fcc ( + void *conn, + void *msg, + int mlen) +{ + dont_call_flow_control = 1; + corosync_conn_send_response ( + conn, msg, mlen); + dont_call_flow_control = 0; +} + int corosync_conn_send_response ( void *conn, void *msg, @@ -1149,7 +1162,9 @@ int corosync_conn_send_response ( return (-1); } - ipc_flow_control (conn_info); + if (dont_call_flow_control == 0) { + ipc_flow_control (conn_info); + } outq = &conn_info->outq; diff --git a/exec/ipc.h b/exec/ipc.h index a29a6981..48da14fe 100644 --- a/exec/ipc.h +++ b/exec/ipc.h @@ -52,6 +52,8 @@ extern void *corosync_conn_private_data_get (void *conn); extern int corosync_conn_send_response (void *conn, void *msg, int mlen); +extern int corosync_conn_send_response_no_fcc (void *conn, void *msg,int mlen); + extern void corosync_ipc_init ( void (*serialize_lock_fn) (void), void (*serialize_unlock_fn) (void), diff --git a/include/corosync/engine/coroapi.h b/include/corosync/engine/coroapi.h index ac27f4ca..d2abd12d 100644 --- a/include/corosync/engine/coroapi.h +++ b/include/corosync/engine/coroapi.h @@ -336,6 +336,8 @@ struct corosync_api_v1 { int (*ipc_response_send) (void *conn, void *msg, int mlen); + int (*ipc_response_no_fcc) (void *conn, void *msg, int mlen); + int (*ipc_dispatch_send) (void *conn, void *msg, int mlen); /* diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h index 32b8544f..c1e68bef 100644 --- a/include/corosync/ipc_cpg.h +++ b/include/corosync/ipc_cpg.h @@ -62,7 +62,8 @@ enum res_cpg_types { MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8, MESSAGE_RES_CPG_LOCAL_GET = 9, MESSAGE_RES_CPG_GROUPS_GET = 10, - MESSAGE_RES_CPG_GROUPS_CALLBACK = 11 + MESSAGE_RES_CPG_GROUPS_CALLBACK = 11, + MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK = 12 }; enum lib_cpg_confchg_reason { @@ -135,6 +136,11 @@ struct res_lib_cpg_deliver_callback { mar_uint8_t message[] __attribute__((aligned(8))); }; +struct res_lib_cpg_flowcontrol_callback { + mar_res_header_t header __attribute__((aligned(8))); + mar_uint32_t flow_control_state __attribute__((aligned(8))); +}; + struct req_lib_cpg_membership { mar_req_header_t header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); diff --git a/lib/cpg.c b/lib/cpg.c index 6509b7af..89390f7a 100644 --- a/lib/cpg.c +++ b/lib/cpg.c @@ -248,6 +248,7 @@ cpg_error_t cpg_dispatch ( int cont = 1; /* always continue do loop except when set to 0 */ int dispatch_avail; struct cpg_inst *cpg_inst; + struct res_lib_cpg_flowcontrol_callback *res_cpg_flowcontrol_callback; struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback; struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback; struct res_lib_cpg_groups_get_callback *res_lib_cpg_groups_get_callback; @@ -397,6 +398,7 @@ cpg_error_t cpg_dispatch ( joined_list, res_cpg_confchg_callback->joined_list_entries); break; + case MESSAGE_RES_CPG_GROUPS_CALLBACK: res_lib_cpg_groups_get_callback = (struct res_lib_cpg_groups_get_callback *)&dispatch_data; marshall_from_mar_cpg_name_t ( @@ -413,6 +415,12 @@ cpg_error_t cpg_dispatch ( &group_name, member_list, res_lib_cpg_groups_get_callback->num_members); + + break; + + case MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK: + res_cpg_flowcontrol_callback = (struct res_lib_cpg_flowcontrol_callback *)&dispatch_data; + cpg_inst->flow_control_state = res_cpg_flowcontrol_callback->flow_control_state; break; default: @@ -598,9 +606,14 @@ cpg_error_t cpg_mcast_joined ( goto error_exit; } - cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state; - if (res_lib_cpg_mcast.header.error == CPG_ERR_TRY_AGAIN) { - cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED; +/* Only update the flow control state when the return value is OK. + * Otherwise the flow control state is not guaranteed to be valid in the + * return message. + * Also, don't set to ENABLED if the return value is TRY_AGAIN as this can lead + * to Flow Control State sync issues between AIS LIB and EXEC. + */ + if (res_lib_cpg_mcast.header.error == CPG_OK) { + cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state; } error = res_lib_cpg_mcast.header.error; diff --git a/services/cpg.c b/services/cpg.c index d5d37deb..01405e00 100644 --- a/services/cpg.c +++ b/services/cpg.c @@ -738,9 +738,28 @@ static void cpg_flow_control_state_set_fn ( void *context, enum corosync_flow_control_state flow_control_state) { + struct res_lib_cpg_flowcontrol_callback res_lib_cpg_flowcontrol_callback; struct process_info *process_info = (struct process_info *)context; process_info->flow_control_state = flow_control_state; + /* + * Send disabled flow control if a disabled occurs. This prevents + * the condition where a disabled occurs after all messages have been + * delivered and then there is no valid way to retrieve the flow + * control state + */ + if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) { + res_lib_cpg_flowcontrol_callback.header.id = MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK; + res_lib_cpg_flowcontrol_callback.header.size = sizeof (struct res_lib_cpg_flowcontrol_callback); + res_lib_cpg_flowcontrol_callback.flow_control_state = flow_control_state; + + if (process_info->trackerconn) { + api->ipc_response_no_fcc ( + process_info->trackerconn, + &res_lib_cpg_flowcontrol_callback, + sizeof (struct res_lib_cpg_flowcontrol_callback)); + } + } } /* Can byteswap join & leave messages */ @@ -965,7 +984,6 @@ static void message_handler_req_exec_cpg_mcast ( { struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)message; struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast; - struct process_info *process_info; int msglen = req_exec_cpg_mcast->msglen; char buf[sizeof(*res_lib_cpg_mcast) + msglen]; struct group_info *gi; @@ -986,8 +1004,6 @@ static void message_handler_req_exec_cpg_mcast ( res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED; if (api->ipc_source_is_local (&req_exec_cpg_mcast->source)) { api->ipc_refcnt_dec (req_exec_cpg_mcast->source.conn); - process_info = (struct process_info *)api->ipc_private_data_get (req_exec_cpg_mcast->source.conn); - res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state; } memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name, sizeof(mar_cpg_name_t)); @@ -998,6 +1014,7 @@ static void message_handler_req_exec_cpg_mcast ( for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { struct process_info *pi = list_entry(iter, struct process_info, list); if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) { + res_lib_cpg_mcast->flow_control_state = pi->flow_control_state; api->ipc_conn_send_response( pi->trackerconn, buf,