Propagate the flow control state between AIS exec and library

This patch causes the flow control state in the library to be set
properly when the flow control is turned off (disabled).  Then it can be
read properly by the flow control apis.
This also fixes the case where the application is no longer sending
messages and it has already dispatched all its received messages
before flow control is disabled.

Also, CPG response messages with a TRY_AGAIN error did NOT contain
a valid flow control state value. This meant the library could get
stuck with flow control enabled (flow control was never enabled
for the EXEC, so no disable event occurred).
This case was hit when a new node was joining - sync_in_process()
resulted in a TRY_AGAIN for error cpg_mcast_joined).

Also, in message_handler_req_exec_cpg_mcast() the state passed
back to the library defaulted to disabled for messages received
from another node (even if flow control was still enabled)
- this meant if multiple nodes were sending CPG messages,
  then the library flow control state flip-flopped between
  enabled and disabled.

Author: Steven Dake <sdake@redhat.com> &
        Tim Beale <tim.beale@alliedtelesis.co.nz>




git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1667 fd59a12c-fef9-0310-b244-a6a79926bd2f
This commit is contained in:
Angus Salkeld 2008-09-17 19:15:00 +00:00
parent 531bd3adec
commit f9609f3217
7 changed files with 64 additions and 8 deletions

View File

@ -70,6 +70,7 @@ static struct corosync_api_v1 apidef_corosync_api_v1 = {
.ipc_source_is_local = message_source_is_local,
.ipc_private_data_get = corosync_conn_private_data_get,
.ipc_response_send = NULL,
.ipc_response_no_fcc = corosync_conn_send_response_no_fcc,
.ipc_dispatch_send = NULL,
.ipc_conn_send_response = corosync_conn_send_response,
.ipc_conn_partner_get = corosync_conn_partner_get,

View File

@ -111,6 +111,8 @@ LOGSYS_DECLARE_SUBSYS ("IPC", LOG_INFO);
static unsigned int g_gid_valid = 0;
static unsigned int dont_call_flow_control = 0;
static totempg_groups_handle ipc_handle;
DECLARE_LIST_INIT (conn_info_list_head);
@ -1125,6 +1127,17 @@ void *corosync_conn_partner_get (void *conn)
}
}
int corosync_conn_send_response_no_fcc (
void *conn,
void *msg,
int mlen)
{
dont_call_flow_control = 1;
corosync_conn_send_response (
conn, msg, mlen);
dont_call_flow_control = 0;
}
int corosync_conn_send_response (
void *conn,
void *msg,
@ -1149,7 +1162,9 @@ int corosync_conn_send_response (
return (-1);
}
ipc_flow_control (conn_info);
if (dont_call_flow_control == 0) {
ipc_flow_control (conn_info);
}
outq = &conn_info->outq;

View File

@ -52,6 +52,8 @@ extern void *corosync_conn_private_data_get (void *conn);
extern int corosync_conn_send_response (void *conn, void *msg, int mlen);
extern int corosync_conn_send_response_no_fcc (void *conn, void *msg,int mlen);
extern void corosync_ipc_init (
void (*serialize_lock_fn) (void),
void (*serialize_unlock_fn) (void),

View File

@ -336,6 +336,8 @@ struct corosync_api_v1 {
int (*ipc_response_send) (void *conn, void *msg, int mlen);
int (*ipc_response_no_fcc) (void *conn, void *msg, int mlen);
int (*ipc_dispatch_send) (void *conn, void *msg, int mlen);
/*

View File

@ -62,7 +62,8 @@ enum res_cpg_types {
MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8,
MESSAGE_RES_CPG_LOCAL_GET = 9,
MESSAGE_RES_CPG_GROUPS_GET = 10,
MESSAGE_RES_CPG_GROUPS_CALLBACK = 11
MESSAGE_RES_CPG_GROUPS_CALLBACK = 11,
MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK = 12
};
enum lib_cpg_confchg_reason {
@ -135,6 +136,11 @@ struct res_lib_cpg_deliver_callback {
mar_uint8_t message[] __attribute__((aligned(8)));
};
struct res_lib_cpg_flowcontrol_callback {
mar_res_header_t header __attribute__((aligned(8)));
mar_uint32_t flow_control_state __attribute__((aligned(8)));
};
struct req_lib_cpg_membership {
mar_req_header_t header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));

View File

@ -248,6 +248,7 @@ cpg_error_t cpg_dispatch (
int cont = 1; /* always continue do loop except when set to 0 */
int dispatch_avail;
struct cpg_inst *cpg_inst;
struct res_lib_cpg_flowcontrol_callback *res_cpg_flowcontrol_callback;
struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
struct res_lib_cpg_groups_get_callback *res_lib_cpg_groups_get_callback;
@ -397,6 +398,7 @@ cpg_error_t cpg_dispatch (
joined_list,
res_cpg_confchg_callback->joined_list_entries);
break;
case MESSAGE_RES_CPG_GROUPS_CALLBACK:
res_lib_cpg_groups_get_callback = (struct res_lib_cpg_groups_get_callback *)&dispatch_data;
marshall_from_mar_cpg_name_t (
@ -413,6 +415,12 @@ cpg_error_t cpg_dispatch (
&group_name,
member_list,
res_lib_cpg_groups_get_callback->num_members);
break;
case MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK:
res_cpg_flowcontrol_callback = (struct res_lib_cpg_flowcontrol_callback *)&dispatch_data;
cpg_inst->flow_control_state = res_cpg_flowcontrol_callback->flow_control_state;
break;
default:
@ -598,9 +606,14 @@ cpg_error_t cpg_mcast_joined (
goto error_exit;
}
cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
if (res_lib_cpg_mcast.header.error == CPG_ERR_TRY_AGAIN) {
cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED;
/* Only update the flow control state when the return value is OK.
* Otherwise the flow control state is not guaranteed to be valid in the
* return message.
* Also, don't set to ENABLED if the return value is TRY_AGAIN as this can lead
* to Flow Control State sync issues between AIS LIB and EXEC.
*/
if (res_lib_cpg_mcast.header.error == CPG_OK) {
cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
}
error = res_lib_cpg_mcast.header.error;

View File

@ -738,9 +738,28 @@ static void cpg_flow_control_state_set_fn (
void *context,
enum corosync_flow_control_state flow_control_state)
{
struct res_lib_cpg_flowcontrol_callback res_lib_cpg_flowcontrol_callback;
struct process_info *process_info = (struct process_info *)context;
process_info->flow_control_state = flow_control_state;
/*
* Send disabled flow control if a disabled occurs. This prevents
* the condition where a disabled occurs after all messages have been
* delivered and then there is no valid way to retrieve the flow
* control state
*/
if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
res_lib_cpg_flowcontrol_callback.header.id = MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK;
res_lib_cpg_flowcontrol_callback.header.size = sizeof (struct res_lib_cpg_flowcontrol_callback);
res_lib_cpg_flowcontrol_callback.flow_control_state = flow_control_state;
if (process_info->trackerconn) {
api->ipc_response_no_fcc (
process_info->trackerconn,
&res_lib_cpg_flowcontrol_callback,
sizeof (struct res_lib_cpg_flowcontrol_callback));
}
}
}
/* Can byteswap join & leave messages */
@ -965,7 +984,6 @@ static void message_handler_req_exec_cpg_mcast (
{
struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)message;
struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast;
struct process_info *process_info;
int msglen = req_exec_cpg_mcast->msglen;
char buf[sizeof(*res_lib_cpg_mcast) + msglen];
struct group_info *gi;
@ -986,8 +1004,6 @@ static void message_handler_req_exec_cpg_mcast (
res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
if (api->ipc_source_is_local (&req_exec_cpg_mcast->source)) {
api->ipc_refcnt_dec (req_exec_cpg_mcast->source.conn);
process_info = (struct process_info *)api->ipc_private_data_get (req_exec_cpg_mcast->source.conn);
res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state;
}
memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
sizeof(mar_cpg_name_t));
@ -998,6 +1014,7 @@ static void message_handler_req_exec_cpg_mcast (
for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
struct process_info *pi = list_entry(iter, struct process_info, list);
if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) {
res_lib_cpg_mcast->flow_control_state = pi->flow_control_state;
api->ipc_conn_send_response(
pi->trackerconn,
buf,