diff --git a/exec/coroipcs.c b/exec/coroipcs.c index 065a3551..07ae67dd 100644 --- a/exec/coroipcs.c +++ b/exec/coroipcs.c @@ -139,6 +139,7 @@ struct conn_info { unsigned int service; enum conn_state state; int notify_flow_control_enabled; + int flow_control_state; int refcount; hdb_handle_t stats_handle; #if _POSIX_THREAD_PROCESS_SHARED < 1 @@ -1188,6 +1189,36 @@ static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int l conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size; } +/** + * simulate the behaviour in coroipcc.c + */ +static int flow_control_event_send (struct conn_info *conn_info, char event) +{ + int new_fc = 0; + + if (event == MESSAGE_RES_OUTQ_NOT_EMPTY || + event == MESSAGE_RES_ENABLE_FLOWCONTROL) { + new_fc = 1; + } + + if (conn_info->flow_control_state != new_fc) { + if (new_fc == 1) { + log_printf (LOGSYS_LEVEL_INFO, "Enabling flow control for %d, event %d\n", + conn_info->client_pid, event); + } else { + log_printf (LOGSYS_LEVEL_INFO, "Disabling flow control for %d, event %d\n", + conn_info->client_pid, event); + } + conn_info->flow_control_state = new_fc; + api->stats_update_value (conn_info->stats_handle, "flow_control", + &conn_info->flow_control_state, + sizeof(conn_info->flow_control_state)); + api->stats_increment_value (conn_info->stats_handle, "flow_control_count"); + } + + return send (conn_info->fd, &event, 1, MSG_NOSIGNAL); +} + static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len, int locked) { @@ -1197,14 +1228,16 @@ static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len, #endif int res; int i; - char buf; for (i = 0; i < iov_len; i++) { memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len); } - buf = !list_empty (&conn_info->outq_head); - res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL); + if (list_empty (&conn_info->outq_head)) + res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_EMPTY); + else + res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_NOT_EMPTY); + if (res == -1 && errno == EAGAIN) { if (locked == 0) { pthread_mutex_lock (&conn_info->mutex); @@ -1244,13 +1277,11 @@ static void outq_flush (struct conn_info *conn_info) { struct outq_item *outq_item; unsigned int bytes_left; struct iovec iov; - char buf; int res; pthread_mutex_lock (&conn_info->mutex); if (list_empty (&conn_info->outq_head)) { - buf = 3; - res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL); + res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_FLUSH_NR); pthread_mutex_unlock (&conn_info->mutex); return; } @@ -1692,9 +1723,13 @@ int coroipcs_handler_dispatch ( coroipcs_refcount_inc (conn_info); pthread_mutex_lock (&conn_info->mutex); if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) { - buf = !list_empty (&conn_info->outq_head); + if (list_empty (&conn_info->outq_head)) + buf = MESSAGE_RES_OUTQ_EMPTY; + else + buf = MESSAGE_RES_OUTQ_NOT_EMPTY; + for (; conn_info->pending_semops;) { - res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL); + res = flow_control_event_send (conn_info, buf); if (res == 1) { conn_info->pending_semops--; } else { @@ -1702,8 +1737,7 @@ int coroipcs_handler_dispatch ( } } if (conn_info->notify_flow_control_enabled) { - buf = 2; - res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL); + res = flow_control_event_send (conn_info, MESSAGE_RES_ENABLE_FLOWCONTROL); if (res == 1) { conn_info->notify_flow_control_enabled = 0; } diff --git a/exec/main.c b/exec/main.c index 1cb12ddf..def3013a 100644 --- a/exec/main.c +++ b/exec/main.c @@ -994,6 +994,16 @@ static hdb_handle_t corosync_stats_create_connection (const char* name, &zero_64, sizeof (zero_64), OBJDB_VALUETYPE_UINT64); + objdb->object_key_create_typed (object_handle, + "flow_control", + &zero_32, sizeof (zero_32), + OBJDB_VALUETYPE_UINT32); + + objdb->object_key_create_typed (object_handle, + "flow_control_count", + &zero_64, sizeof (zero_64), + OBJDB_VALUETYPE_UINT64); + return object_handle; } diff --git a/include/corosync/coroipc_ipc.h b/include/corosync/coroipc_ipc.h index de25e481..5126faa6 100644 --- a/include/corosync/coroipc_ipc.h +++ b/include/corosync/coroipc_ipc.h @@ -62,6 +62,11 @@ enum req_init_types { #define MESSAGE_REQ_CHANGE_EUID 1 #define MESSAGE_REQ_OUTQ_FLUSH 2 +#define MESSAGE_RES_OUTQ_EMPTY 0 +#define MESSAGE_RES_OUTQ_NOT_EMPTY 1 +#define MESSAGE_RES_ENABLE_FLOWCONTROL 2 +#define MESSAGE_RES_OUTQ_FLUSH_NR 3 + struct control_buffer { unsigned int read; unsigned int write; diff --git a/lib/coroipcc.c b/lib/coroipcc.c index c0861a58..aa9546c0 100644 --- a/lib/coroipcc.c +++ b/lib/coroipcc.c @@ -849,7 +849,7 @@ coroipcc_dispatch_get ( goto error_put; } ipc_instance->flow_control_state = 0; - if (buf == 1 || buf == 2) { + if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf == MESSAGE_RES_ENABLE_FLOWCONTROL) { ipc_instance->flow_control_state = 1; } /* @@ -864,11 +864,11 @@ coroipcc_dispatch_get ( * This is just a notification of flow control starting at the addition * of a new pending message, not a message to dispatch */ - if (buf == 2) { + if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) { error = CS_ERR_TRY_AGAIN; goto error_put; } - if (buf == 3) { + if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) { error = CS_ERR_TRY_AGAIN; goto error_put; }