diff --git a/exec/ipc_glue.c b/exec/ipc_glue.c index 0ea4b9f4..60865805 100644 --- a/exec/ipc_glue.c +++ b/exec/ipc_glue.c @@ -80,6 +80,12 @@ struct cs_ipcs_mapper { char name[256]; }; +struct outq_item { + void *msg; + size_t mlen; + struct list_head list; +}; + static struct cs_ipcs_mapper ipcs_mapper[SERVICE_HANDLER_MAXIMUM_COUNT]; static int32_t cs_ipcs_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn); @@ -278,6 +284,12 @@ static char * pid_to_name (pid_t pid, char *out_name, size_t name_len) struct cs_ipcs_conn_context { qb_handle_t stats_handle; + struct list_head outq_head; + int32_t queuing; + uint32_t queued; + uint64_t invalid_request; + uint64_t overload; + uint32_t sent; char data[1]; }; @@ -300,6 +312,12 @@ static void cs_ipcs_connection_created(qb_ipcs_connection_t *c) size += ais_service[service]->private_data_size; context = calloc(1, size); + + list_init(&context->outq_head); + context->queuing = QB_FALSE; + context->queued = 0; + context->sent = 0; + qb_ipcs_context_set(c, context); ais_service[service]->lib_init_fn(c); @@ -378,6 +396,21 @@ static void cs_ipcs_connection_created(qb_ipcs_connection_t *c) "flow_control_count", &zero_64, sizeof (zero_64), OBJDB_VALUETYPE_UINT64); + + api->object_key_create_typed (object_handle, + "queue_size", + &zero_32, sizeof (zero_32), + OBJDB_VALUETYPE_UINT32); + + api->object_key_create_typed (object_handle, + "invalid_request", + &zero_64, sizeof (zero_64), + OBJDB_VALUETYPE_UINT64); + + api->object_key_create_typed (object_handle, + "overload", + &zero_64, sizeof (zero_64), + OBJDB_VALUETYPE_UINT64); } void cs_ipc_refcnt_inc(void *conn) @@ -399,12 +432,25 @@ void *cs_ipcs_private_data_get(void *conn) static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c) { - struct cs_ipcs_conn_context *cnx; - log_printf(LOG_INFO, "%s() ", __func__); - cnx = qb_ipcs_context_get(c); + struct cs_ipcs_conn_context *context; + struct list_head *list, *list_next; + struct outq_item *outq_item; - if (cnx) { - free(cnx); + log_printf(LOG_INFO, "%s() ", __func__); + + context = qb_ipcs_context_get(c); + if (context) { + for (list = context->outq_head.next; + list != &context->outq_head; list = list_next) { + + list_next = list->next; + outq_item = list_entry (list, struct outq_item, list); + + list_del (list); + free (outq_item->msg); + free (outq_item); + } + free(context); } } @@ -454,24 +500,113 @@ int cs_ipcs_response_send(void *conn, const void *msg, size_t mlen) return rc; } +static void outq_flush (void *data) +{ + qb_ipcs_connection_t *conn = data; + struct list_head *list, *list_next; + struct outq_item *outq_item; + int32_t rc; + struct cs_ipcs_conn_context *context = qb_ipcs_context_get(conn); + + for (list = context->outq_head.next; + list != &context->outq_head; list = list_next) { + + list_next = list->next; + outq_item = list_entry (list, struct outq_item, list); + + rc = qb_ipcs_event_send(conn, outq_item->msg, outq_item->mlen); + if (rc != outq_item->mlen) { + break; + } + context->sent++; + context->queued--; + + list_del (list); + free (outq_item->msg); + free (outq_item); + } + if (list_empty (&context->outq_head)) { + context->queuing = QB_FALSE; + log_printf(LOGSYS_LEVEL_INFO, "Q empty, queued:%d sent:%d.", + context->queued, context->sent); + context->queued = 0; + context->sent = 0; + return; + } + qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush); + if (rc < 0 && rc != -EAGAIN) { + log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d!", rc); + } +} + +static void msg_send_or_queue(qb_ipcs_connection_t *conn, const struct iovec *iov, uint32_t iov_len) +{ + int32_t rc = 0; + int32_t i; + int32_t bytes_msg = 0; + struct outq_item *outq_item; + char *write_buf = 0; + struct cs_ipcs_conn_context *context = qb_ipcs_context_get(conn); + + for (i = 0; i < iov_len; i++) { + bytes_msg += iov[i].iov_len; + } + + if (!context->queuing) { + assert(list_empty (&context->outq_head)); + rc = qb_ipcs_event_sendv(conn, iov, iov_len); + if (rc == bytes_msg) { + context->sent++; + return; + } + if (rc == -EAGAIN) { + context->queued = 0; + context->sent = 0; + context->queuing = QB_TRUE; + qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush); + } else { + log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d, expected %d!", rc, bytes_msg); + return; + } + } + outq_item = malloc (sizeof (struct outq_item)); + if (outq_item == NULL) { + qb_ipcs_disconnect(conn); + return; + } + outq_item->msg = malloc (bytes_msg); + if (outq_item->msg == NULL) { + free (outq_item); + qb_ipcs_disconnect(conn); + return; + } + + write_buf = outq_item->msg; + for (i = 0; i < iov_len; i++) { + memcpy (write_buf, iov[i].iov_base, iov[i].iov_len); + write_buf += iov[i].iov_len; + } + outq_item->mlen = bytes_msg; + list_init (&outq_item->list); + list_add_tail (&outq_item->list, &context->outq_head); + context->queued++; +} + int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen) { - int32_t rc = qb_ipcs_event_send(conn, msg, mlen); - if (rc >= 0) { - return 0; - } - return rc; + struct iovec iov; + iov.iov_base = (void *)msg; + iov.iov_len = mlen; + msg_send_or_queue (conn, &iov, 1); + return 0; } int cs_ipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len) { - int32_t rc = qb_ipcs_event_sendv(conn, iov, iov_len); - if (rc >= 0) { - return 0; - } - return rc; + msg_send_or_queue(conn, iov, iov_len); + return 0; } static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c, @@ -480,34 +615,48 @@ static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c, struct qb_ipc_response_header response; struct qb_ipc_request_header *request_pt = (struct qb_ipc_request_header *)data; int32_t service = qb_ipcs_service_id_get(c); - int32_t send_ok; + int32_t send_ok = 0; + int32_t is_async_call = QB_FALSE; ssize_t res = -1; int sending_allowed_private_data; + struct cs_ipcs_conn_context *cnx; send_ok = corosync_sending_allowed (service, request_pt->id, request_pt, &sending_allowed_private_data); + is_async_call = (service == CPG_SERVICE && request_pt->id == 2); + /* * This happens when the message contains some kind of invalid * parameter, such as an invalid size */ - if (send_ok == -1) { + if (send_ok == -EINVAL) { response.size = sizeof (response); response.id = 0; response.error = CS_ERR_INVALID_PARAM; - log_printf(LOG_INFO, "%s() invalid message! size:%d error:%d", - __func__, response.size, response.error); - qb_ipcs_response_send (c, + + cnx = qb_ipcs_context_get(c); + if (cnx) { + cnx->invalid_request++; + } + + if (is_async_call) { + log_printf(LOGSYS_LEVEL_INFO, "*** %s() invalid message! size:%d error:%d", + __func__, response.size, response.error); + } else { + qb_ipcs_response_send (c, &response, sizeof (response)); + } res = -EINVAL; - } else { - if (send_ok) { - ais_service[service]->lib_engine[request_pt->id].lib_handler_fn(c, request_pt); - res = 0; - } else { + } else if (send_ok < 0) { + cnx = qb_ipcs_context_get(c); + if (cnx) { + cnx->overload++; + } + if (!is_async_call) { /* * Overload, tell library to retry */ @@ -515,10 +664,20 @@ static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c, response.id = 0; response.error = CS_ERR_TRY_AGAIN; qb_ipcs_response_send (c, - &response, - sizeof (response)); - res = -ENOBUFS; + &response, + sizeof (response)); + } else { + log_printf(LOGSYS_LEVEL_WARNING, + "*** %s() (%d:%d - %d) %s!", + __func__, service, request_pt->id, + is_async_call, strerror(-send_ok)); } + res = -ENOBUFS; + } + + if (send_ok) { + ais_service[service]->lib_engine[request_pt->id].lib_handler_fn(c, request_pt); + res = 0; } corosync_sending_allowed_release (&sending_allowed_private_data); return res; @@ -677,13 +836,21 @@ void cs_ipcs_stats_update(void) api->object_key_replace(cnx->stats_handle, "recv_retries", strlen("recv_retries"), &stats.recv_retries, sizeof(uint64_t)); + api->object_key_replace(cnx->stats_handle, + "flow_control", strlen("flow_control"), + &stats.flow_control_state, sizeof(uint32_t)); api->object_key_replace(cnx->stats_handle, "flow_control_count", strlen("flow_control_count"), &stats.flow_control_count, sizeof(uint64_t)); api->object_key_replace(cnx->stats_handle, - "flow_control_state", strlen("flow_control_state"), - &stats.flow_control_state, sizeof(uint32_t)); - + "queue_size", strlen("queue_size"), + &cnx->queued, sizeof(uint32_t)); + api->object_key_replace(cnx->stats_handle, + "invalid_request", strlen("invalid_request"), + &cnx->invalid_request, sizeof(uint64_t)); + api->object_key_replace(cnx->stats_handle, + "overload", strlen("overload"), + &cnx->overload, sizeof(uint64_t)); qb_ipcs_connection_unref(c); } } diff --git a/exec/main.c b/exec/main.c index 4612b0d5..62ca494f 100644 --- a/exec/main.c +++ b/exec/main.c @@ -940,7 +940,7 @@ int corosync_sending_allowed ( corosync_group_handle, &reserve_iovec, 1); if (pd->reserved_msgs == -1) { - return (-1); + return -EINVAL; } sending_allowed = QB_FALSE; @@ -952,7 +952,13 @@ int corosync_sending_allowed ( sending_allowed = QB_TRUE; } else if (pd->reserved_msgs && sync_in_process == 0) { sending_allowed = QB_TRUE; + } else if (pd->reserved_msgs == 0) { + return -ENOBUFS; + } else /* (sync_in_process) */ { + return -EINPROGRESS; } + } else { + return -EHOSTUNREACH; } return (sending_allowed); diff --git a/exec/service.c b/exec/service.c index 64f3889d..04353142 100644 --- a/exec/service.c +++ b/exec/service.c @@ -592,7 +592,7 @@ static void service_unlink_schedwrk_handler (void *data) { lcr_ifact_release (cb_data->service_handle); - qb_loop_job_add(corosync_poll_handle_get(), + qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, data, service_exit_schedwrk_handler); @@ -600,7 +600,7 @@ static void service_unlink_schedwrk_handler (void *data) { return; redo_this_function: - qb_loop_job_add(corosync_poll_handle_get(), + qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, data, service_unlink_schedwrk_handler); @@ -638,14 +638,14 @@ static void service_exit_schedwrk_handler (void *data) { cb_data->service_engine = current_service_engine; cb_data->service_handle = service_handle; - qb_loop_job_add(corosync_poll_handle_get(), + qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, data, service_unlink_schedwrk_handler); return; } - qb_loop_job_add(corosync_poll_handle_get(), + qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, data, service_exit_schedwrk_handler); @@ -671,7 +671,7 @@ void corosync_service_unlink_all ( cb_data.api = api; - qb_loop_job_add(corosync_poll_handle_get(), + qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, &cb_data, service_exit_schedwrk_handler); @@ -698,7 +698,7 @@ static void service_unlink_and_exit_schedwrk_handler (void *data) if (res == 0) { free (service_unlink_and_exit_data); } else { - qb_loop_job_add(corosync_poll_handle_get(), + qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, data, service_unlink_and_exit_schedwrk_handler); @@ -720,7 +720,7 @@ unsigned int corosync_service_unlink_and_exit ( service_unlink_and_exit_data->name = strdup (service_name); service_unlink_and_exit_data->ver = service_ver; - qb_loop_job_add(corosync_poll_handle_get(), + qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, service_unlink_and_exit_data, service_unlink_and_exit_schedwrk_handler); diff --git a/services/cpg.c b/services/cpg.c index 87cfaba9..e7573c5b 100644 --- a/services/cpg.c +++ b/services/cpg.c @@ -1664,7 +1664,6 @@ static void message_handler_req_lib_cpg_mcast (void *conn, const void *message) struct iovec req_exec_cpg_iovec[2]; struct req_exec_cpg_mcast req_exec_cpg_mcast; - struct res_lib_cpg_mcast res_lib_cpg_mcast; int msglen = req_lib_cpg_mcast->msglen; int result; cs_error_t error = CS_ERR_NOT_EXIST; @@ -1703,13 +1702,10 @@ static void message_handler_req_lib_cpg_mcast (void *conn, const void *message) result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED); assert(result == 0); + } else { + log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d\n", + conn, group_name.value, cpd->cpd_state, error); } - - res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast); - res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST; - res_lib_cpg_mcast.header.error = error; - api->ipc_response_send (conn, &res_lib_cpg_mcast, - sizeof (res_lib_cpg_mcast)); } static void message_handler_req_lib_cpg_zc_execute (