From 336dc17daa550ee70a15baf811889b09fb6812b8 Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Sat, 4 Nov 2006 22:29:14 +0000 Subject: [PATCH] Forward port of flow control work from whitetank branch. git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1289 fd59a12c-fef9-0310-b244-a6a79926bd2f --- exec/Makefile | 4 +- exec/amf.c | 1 + exec/amfcomp.c | 1 + exec/cfg.c | 1 + exec/ckpt.c | 1 + exec/clm.c | 1 + exec/cpg.c | 80 ++++++--- exec/evs.c | 1 + exec/evt.c | 1 + exec/flow.c | 434 ++++++++++++++++++++++++++++++++++++++++++++++ exec/flow.h | 73 ++++++++ exec/ipc.c | 233 +++++++++++++++++++------ exec/ipc.h | 21 +++ exec/lck.c | 1 + exec/main.c | 4 + exec/main.h | 2 +- exec/msg.c | 1 + exec/service.h | 1 + include/cpg.h | 10 +- include/hdb.h | 2 +- include/ipc_cpg.h | 10 +- include/queue.h | 18 +- lib/cpg.c | 42 +++-- test/Makefile | 16 +- test/cpgbench.c | 175 +++++++++++++++++++ 25 files changed, 1034 insertions(+), 100 deletions(-) create mode 100755 exec/flow.c create mode 100644 exec/flow.h create mode 100644 test/cpgbench.c diff --git a/exec/Makefile b/exec/Makefile index 6e0f43ba..569bad42 100644 --- a/exec/Makefile +++ b/exec/Makefile @@ -62,9 +62,9 @@ LCR_SRC = evs.c clm.c ckpt.c evt.c lck.c msg.c cfg.c cpg.c aisparser.c vsf_ykd.c LCR_OBJS = evs.o clm.o ckpt.o evt.o lck.o msg.o cfg.o cpg.o aisparser.o vsf_ykd.o $(AMF_OBJS) # main executive objects -MAIN_SRC = main.c print.c mempool.c util.c sync.c service.c ipc.c timer.c \ +MAIN_SRC = main.c print.c mempool.c util.c sync.c service.c ipc.c flow.c timer.c \ totemconfig.c mainconfig.c -MAIN_OBJS = main.o print.o mempool.o util.o sync.o service.o ipc.o timer.o \ +MAIN_OBJS = main.o print.o mempool.o util.o sync.o service.o ipc.o flow.o timer.o \ totemconfig.o mainconfig.o ../lcr/lcr_ifact.o OTHER_OBJS = objdb.o diff --git a/exec/amf.c b/exec/amf.c index c17e56fd..d68a4b7c 100644 --- a/exec/amf.c +++ b/exec/amf.c @@ -374,6 +374,7 @@ static struct openais_service_handler amf_service_handler = { .name = (unsigned char *)"openais availability management framework B.01.01", .id = AMF_SERVICE, .private_data_size = sizeof (struct amf_pd), + .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED, .lib_init_fn = amf_lib_init_fn, .lib_exit_fn = amf_lib_exit_fn, .lib_service = amf_lib_service, diff --git a/exec/amfcomp.c b/exec/amfcomp.c index 266a82c6..a0e6a6ef 100644 --- a/exec/amfcomp.c +++ b/exec/amfcomp.c @@ -1394,6 +1394,7 @@ void amf_comp_instantiate (struct amf_comp *comp) comp->saAmfCompPresenceState); break; } + return 0; } void amf_comp_instantiate_tmo_event (struct amf_comp *comp) diff --git a/exec/cfg.c b/exec/cfg.c index 824c16ef..f8b5f33b 100644 --- a/exec/cfg.c +++ b/exec/cfg.c @@ -167,6 +167,7 @@ struct openais_service_handler cfg_service_handler = { .name = (unsigned char*)"openais configuration service", .id = CFG_SERVICE, .private_data_size = 0, + .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED, .lib_init_fn = cfg_lib_init_fn, .lib_exit_fn = cfg_lib_exit_fn, .lib_service = cfg_lib_service, diff --git a/exec/ckpt.c b/exec/ckpt.c index ad95faa0..60643acc 100644 --- a/exec/ckpt.c +++ b/exec/ckpt.c @@ -548,6 +548,7 @@ struct openais_service_handler ckpt_service_handler = { .name = (unsigned char *)"openais checkpoint service B.01.01", .id = CKPT_SERVICE, .private_data_size = sizeof (struct ckpt_pd), + .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED, .lib_init_fn = ckpt_lib_init_fn, .lib_exit_fn = ckpt_lib_exit_fn, .lib_service = ckpt_lib_service, diff --git a/exec/clm.c b/exec/clm.c index b764035a..18d55428 100644 --- a/exec/clm.c +++ b/exec/clm.c @@ -203,6 +203,7 @@ struct openais_service_handler clm_service_handler = { .name = (unsigned char*)"openais cluster membership service B.01.01", .id = CLM_SERVICE, .private_data_size = sizeof (struct clm_pd), + .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED, .lib_init_fn = clm_lib_init_fn, .lib_exit_fn = clm_lib_exit_fn, .lib_service = clm_lib_service, diff --git a/exec/cpg.c b/exec/cpg.c index 6fd7b18b..4f0d563f 100644 --- a/exec/cpg.c +++ b/exec/cpg.c @@ -32,10 +32,6 @@ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ - -#ifndef OPENAIS_BSD -#include -#endif #include #include #include @@ -70,6 +66,7 @@ #include "jhash.h" #include "swab.h" #include "ipc.h" +#include "flow.h" #include "print.h" #define GROUP_HASH_SIZE 32 @@ -106,6 +103,7 @@ struct process_info { void *conn; void *trackerconn; struct group_info *group; + enum openais_flow_control_state flow_control_state; struct list_head list; /* on the group_info members list */ }; @@ -193,7 +191,7 @@ static struct openais_lib_handler cpg_lib_service[] = }, { /* 2 */ .lib_handler_fn = message_handler_req_lib_cpg_mcast, - .response_size = sizeof (mar_res_header_t), + .response_size = sizeof (struct res_lib_cpg_mcast), .response_id = MESSAGE_RES_CPG_MCAST, .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED }, @@ -241,6 +239,7 @@ struct openais_service_handler cpg_service_handler = { .name = (unsigned char*)"openais cluster closed process group service v1.01", .id = CPG_SERVICE, .private_data_size = sizeof (struct process_info), + .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED, .lib_init_fn = cpg_lib_init_fn, .lib_exit_fn = cpg_lib_exit_fn, .lib_service = cpg_lib_service, @@ -308,6 +307,7 @@ struct req_exec_cpg_mcast { mar_cpg_name_t group_name __attribute__((aligned(8))); mar_uint32_t msglen __attribute__((aligned(8))); mar_uint32_t pid __attribute__((aligned(8))); + mar_message_source_t source __attribute__((aligned(8))); mar_uint8_t message[] __attribute__((aligned(8))); }; @@ -504,7 +504,7 @@ static int cpg_node_joinleave_send (struct group_info *gi, struct process_info * req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin); req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn); - req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin; + req_exec_cpg_iovec.iov_base = &req_exec_cpg_procjoin; req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin); result = totempg_groups_mcast_joined (openais_group_handle, &req_exec_cpg_iovec, 1, TOTEMPG_AGREED); @@ -553,9 +553,8 @@ static void remove_node_from_groups( list_del(&gi->rg->list); newsize = gi->rg->left_list_size * 2; - newrg = realloc (gi->rg, - sizeof(struct removed_group) + newsize * sizeof(mar_cpg_address_t)); - if (newrg == NULL) { + newrg = realloc(gi->rg, sizeof(struct removed_group) + newsize*sizeof(mar_cpg_address_t)); + if (!newrg) { log_printf(LOG_LEVEL_CRIT, "Unable to realloc removed group struct. CPG callbacks will be junk."); return; } @@ -613,6 +612,15 @@ static void cpg_confchg_fn ( } } +static void cpg_flow_control_state_set_fn ( + void *context, + enum openais_flow_control_state flow_control_state) +{ + struct process_info *process_info = (struct process_info *)context; + + process_info->flow_control_state = flow_control_state; +} + /* Can byteswap join & leave messages */ static void exec_cpg_procjoin_endian_convert (void *msg) { @@ -645,7 +653,7 @@ static void exec_cpg_mcast_endian_convert (void *msg) swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name); req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid); req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen); - + swab_mar_message_source_t (&req_exec_cpg_mcast->source); } static void do_proc_join( @@ -787,11 +795,15 @@ static void message_handler_req_exec_cpg_mcast ( { struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)message; struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast; + struct process_info *process_info; int msglen = req_exec_cpg_mcast->msglen; char buf[sizeof(*res_lib_cpg_mcast) + msglen]; struct group_info *gi; struct list_head *iter; + /* + * Track local messages so that flow is controlled on the local node + */ gi = get_group(&req_exec_cpg_mcast->group_name); /* this will always succeed ! */ assert(gi); @@ -801,6 +813,12 @@ static void message_handler_req_exec_cpg_mcast ( res_lib_cpg_mcast->msglen = msglen; res_lib_cpg_mcast->pid = req_exec_cpg_mcast->pid; res_lib_cpg_mcast->nodeid = nodeid; + res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED; + if (message_source_is_local (&req_exec_cpg_mcast->source)) { + openais_ipc_flow_control_local_decrement (req_exec_cpg_mcast->source.conn); + process_info = (struct process_info *)openais_conn_private_data_get (req_exec_cpg_mcast->source.conn); + res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state; + } memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name, sizeof(mar_cpg_name_t)); memcpy(&res_lib_cpg_mcast->message, (char*)message+sizeof(*req_exec_cpg_mcast), @@ -916,6 +934,14 @@ static void message_handler_req_lib_cpg_join (void *conn, void *message) goto join_err; } + openais_ipc_flow_control_create ( + conn, + CPG_SERVICE, + req_lib_cpg_join->group_name.value, + req_lib_cpg_join->group_name.length, + cpg_flow_control_state_set_fn, + pi); + /* Add a node entry for us */ pi->nodeid = this_ip->nodeid; pi->pid = req_lib_cpg_join->pid; @@ -953,6 +979,12 @@ static void message_handler_req_lib_cpg_leave (void *conn, void *message) cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE); pi->group = NULL; + openais_ipc_flow_control_destroy ( + conn, + CPG_SERVICE, + (unsigned char *)gi->group_name.value, + (unsigned int)gi->group_name.length); + leave_ret: /* send return */ res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave); @@ -969,7 +1001,7 @@ static void message_handler_req_lib_cpg_mcast (void *conn, void *message) struct group_info *gi = pi->group; struct iovec req_exec_cpg_iovec[2]; struct req_exec_cpg_mcast req_exec_cpg_mcast; - mar_res_header_t res; + struct res_lib_cpg_mcast res_lib_cpg_mcast; int msglen = req_lib_cpg_mcast->msglen; int result; @@ -977,10 +1009,12 @@ static void message_handler_req_lib_cpg_mcast (void *conn, void *message) /* Can't send if we're not joined */ if (!gi) { - res.size = sizeof(res); - res.id = MESSAGE_RES_CPG_MCAST; - res.error = SA_AIS_ERR_ACCESS; /* TODO Better error code ?? */ - openais_conn_send_response(conn, &res, sizeof(res)); + res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast); + res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST; + res_lib_cpg_mcast.header.error = SA_AIS_ERR_ACCESS; /* TODO Better error code ?? */ + res_lib_cpg_mcast.flow_control_state = CPG_FLOW_CONTROL_DISABLED; + openais_conn_send_response(conn, &res_lib_cpg_mcast, + sizeof(res_lib_cpg_mcast)); return; } @@ -989,21 +1023,25 @@ static void message_handler_req_lib_cpg_mcast (void *conn, void *message) MESSAGE_REQ_EXEC_CPG_MCAST); req_exec_cpg_mcast.pid = pi->pid; req_exec_cpg_mcast.msglen = msglen; + message_source_set (&req_exec_cpg_mcast.source, conn); memcpy(&req_exec_cpg_mcast.group_name, &gi->group_name, sizeof(mar_cpg_name_t)); - req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast; + req_exec_cpg_iovec[0].iov_base = &req_exec_cpg_mcast; req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast); - req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message; + req_exec_cpg_iovec[1].iov_base = &req_lib_cpg_mcast->message; req_exec_cpg_iovec[1].iov_len = msglen; // TODO: guarantee type... result = totempg_groups_mcast_joined (openais_group_handle, req_exec_cpg_iovec, 2, TOTEMPG_AGREED); + openais_ipc_flow_control_local_increment (conn); - res.size = sizeof(res); - res.id = MESSAGE_RES_CPG_MCAST; - res.error = SA_AIS_OK; - openais_conn_send_response(conn, &res, sizeof(res)); + res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast); + res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST; + res_lib_cpg_mcast.header.error = SA_AIS_OK; + res_lib_cpg_mcast.flow_control_state = pi->flow_control_state; + openais_conn_send_response(conn, &res_lib_cpg_mcast, + sizeof(res_lib_cpg_mcast)); } static void message_handler_req_lib_cpg_membership (void *conn, void *message) diff --git a/exec/evs.c b/exec/evs.c index c0b10576..74cba8fa 100644 --- a/exec/evs.c +++ b/exec/evs.c @@ -145,6 +145,7 @@ struct openais_service_handler evs_service_handler = { .name = (unsigned char*)"openais extended virtual synchrony service", .id = EVS_SERVICE, .private_data_size = sizeof (struct evs_pd), + .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED, .lib_init_fn = evs_lib_init_fn, .lib_exit_fn = evs_lib_exit_fn, .lib_service = evs_lib_service, diff --git a/exec/evt.c b/exec/evt.c index 027f66d6..32a810c5 100644 --- a/exec/evt.c +++ b/exec/evt.c @@ -214,6 +214,7 @@ struct openais_service_handler evt_service_handler = { (unsigned char*)"openais event service B.01.01", .id = EVT_SERVICE, .private_data_size = sizeof (struct libevt_pd), + .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED, .lib_init_fn = evt_lib_init, .lib_exit_fn = evt_lib_exit, .lib_service = evt_lib_service, diff --git a/exec/flow.c b/exec/flow.c new file mode 100755 index 00000000..72dfc679 --- /dev/null +++ b/exec/flow.c @@ -0,0 +1,434 @@ +/* + * Copyright (c) 2006 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Steven Dake (sdake@mvista.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * New messages are allowed from the library ONLY when the processor has not + * received a OPENAIS_FLOW_CONTROL_STATE_ENABLED from any processor. If a OPENAIS_FLOW_CONTROL_STATE_ENABLED + * message is sent, it must later be cancelled by a OPENAIS_FLOW_CONTROL_STATE_DISABLED + * message. + */ + +#include +#include +#include +#include + +#include "flow.h" +#include "totem.h" +#include "totempg.h" +#include "print.h" +#include "hdb.h" +#include "../include/list.h" + +#define OPENAIS_FLOW_CONTROL_ENABLED_SERVICES_MAX 128 + +struct flow_control_instance { + struct list_head list_head; + unsigned int service; +}; + +DECLARE_LIST_INIT (flow_control_service_list_head); + +struct flow_control_message { + unsigned int service __attribute__((aligned(8))); + char id[1024] __attribute__((aligned(8))); + unsigned int id_len __attribute__((aligned(8))); + enum openais_flow_control_state flow_control_state __attribute__((aligned(8))); +}; + +struct flow_control_node_state { + unsigned int nodeid; + enum openais_flow_control_state flow_control_state; +}; + +struct flow_control_service { + struct flow_control_node_state flow_control_node_state[PROCESSOR_COUNT_MAX]; + unsigned int service; + char id[1024]; + unsigned int id_len; + void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state); + void *context; + unsigned int processor_count; + enum openais_flow_control_state flow_control_state; + struct list_head list; + struct list_head list_all; +}; + +static struct totempg_group flow_control_group = { + .group = "flowcontrol", + .group_len = 12 +}; + +static totempg_groups_handle flow_control_handle; + +static struct hdb_handle_database flow_control_hdb = { + .handle_count = 0, + .handles = NULL, + .iterator = 0, + .mutex = PTHREAD_MUTEX_INITIALIZER +}; + +static unsigned int flow_control_member_list[PROCESSOR_COUNT_MAX]; +static unsigned int flow_control_member_list_entries; + +static inline int flow_control_xmit ( + struct flow_control_service *flow_control_service, + enum openais_flow_control_state flow_control_state) +{ + struct flow_control_message flow_control_message; + struct iovec iovec; + unsigned int res; + + flow_control_message.service = flow_control_service->service; + flow_control_message.flow_control_state = flow_control_state; + memcpy (&flow_control_message.id, flow_control_service->id, + flow_control_service->id_len); + flow_control_message.id_len = flow_control_service->id_len; + + iovec.iov_base = (char *)&flow_control_message; + iovec.iov_len = sizeof (flow_control_message); + + res = totempg_groups_mcast_joined (flow_control_handle, &iovec, 1, + TOTEMPG_AGREED); + + flow_control_service->flow_control_state_set_fn ( + flow_control_service->context, + flow_control_service->flow_control_state); + + return (res); +} + +static void flow_control_deliver_fn ( + unsigned int nodeid, + struct iovec *iovec, + int iov_len, + int endian_conversion_required) +{ + struct flow_control_message *flow_control_message = (struct flow_control_message *)iovec[0].iov_base; + struct flow_control_service *flow_control_service; + struct list_head *list; + unsigned int i; + + for (list = flow_control_service_list_head.next; + list != &flow_control_service_list_head; + list = list->next) { + + flow_control_service = list_entry (list, struct flow_control_service, list_all); + /* + * Find this nodeid in the flow control service and set the message + * enabled or disabled flag + */ + for (i = 0; i < flow_control_service->processor_count; i++) { + if (nodeid == flow_control_service->flow_control_node_state[i].nodeid) { + flow_control_service->flow_control_node_state[i].flow_control_state = + flow_control_message->flow_control_state; + break; + } + } + + /* + * Determine if any flow control is enabled on any nodes and set + * the internal variable appropriately + */ + flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED; + flow_control_service->flow_control_state_set_fn (flow_control_service->context, flow_control_service->flow_control_state); + for (i = 0; i < flow_control_service->processor_count; i++) { + if (flow_control_service->flow_control_node_state[i].flow_control_state == OPENAIS_FLOW_CONTROL_STATE_ENABLED) { + flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED; + flow_control_service->flow_control_state_set_fn (flow_control_service->context, flow_control_service->flow_control_state); + } + } + } /* for list iteration */ +} + +static void flow_control_confchg_fn ( + enum totem_configuration_type configuration_type, + unsigned int *member_list, int member_list_entries, + unsigned int *left_list, int left_list_entries, + unsigned int *joined_list, int joined_list_entries, + struct memb_ring_id *ring_id) +{ + unsigned int i; + struct flow_control_service *flow_control_service; + struct list_head *list; + + memcpy (flow_control_member_list, member_list, + sizeof (unsigned int) * member_list_entries); + flow_control_member_list_entries = member_list_entries; + + for (list = flow_control_service_list_head.next; + list != &flow_control_service_list_head; + list = list->next) { + + flow_control_service = list_entry (list, struct flow_control_service, list_all); + + /* + * Set all of the node ids after a configuration change + * Turn on all flow control after a configuration change + */ + flow_control_service->processor_count = flow_control_member_list_entries; + flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED; + for (i = 0; i < member_list_entries; i++) { + flow_control_service->flow_control_node_state[i].nodeid = member_list[i]; + flow_control_service->flow_control_node_state[i].flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED; + } + } +} +/* + * External API + */ +unsigned int openais_flow_control_initialize (void) +{ + unsigned int res; + + log_init ("FLOW"); + + res = totempg_groups_initialize ( + &flow_control_handle, + flow_control_deliver_fn, + flow_control_confchg_fn); + + if (res == -1) { + log_printf (LOG_LEVEL_ERROR, + "Couldn't initialize flow control interface.\n"); + return (-1); + } + res = totempg_groups_join ( + flow_control_handle, + &flow_control_group, + 1); + + if (res == -1) { + log_printf (LOG_LEVEL_ERROR, "Couldn't join flow control group.\n"); + return (-1); + } + + return (0); +} + +unsigned int openais_flow_control_ipc_init ( + unsigned int *flow_control_handle, + unsigned int service) +{ + struct flow_control_instance *instance; + unsigned int res; + + res = hdb_handle_create (&flow_control_hdb, + sizeof (struct flow_control_instance), flow_control_handle); + if (res != 0) { + goto error_exit; + } + res = hdb_handle_get (&flow_control_hdb, *flow_control_handle, + (void *)&instance); + if (res != 0) { + goto error_destroy; + } + instance->service = service; + + list_init (&instance->list_head); + + return (0); + +error_destroy: + hdb_handle_destroy (&flow_control_hdb, *flow_control_handle); +error_exit: + return (-1); + +} + +unsigned int openais_flow_control_ipc_exit ( + unsigned int flow_control_handle) +{ + hdb_handle_destroy (&flow_control_hdb, flow_control_handle); + return (0); +} + +unsigned int openais_flow_control_create ( + unsigned int flow_control_handle, + unsigned int service, + void *id, + unsigned int id_len, + void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state), + void *context) +{ + struct flow_control_service *flow_control_service; + struct flow_control_instance *instance; + unsigned int res; + unsigned int i; + + res = hdb_handle_get (&flow_control_hdb, flow_control_handle, + (void *)&instance); + if (res != 0) { + goto error_exit; + } + + flow_control_service = malloc (sizeof (struct flow_control_service)); + if (flow_control_service == NULL) { + goto error_put; + } + + /* + * Add new service to flow control system + */ + memset (flow_control_service, 0, sizeof (struct flow_control_service)); + + flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED; + flow_control_service->service = service; + memcpy (flow_control_service->id, id, id_len); + flow_control_service->id_len = id_len; + flow_control_service->flow_control_state_set_fn = flow_control_state_set_fn; + flow_control_service->context = context; + + list_init (&flow_control_service->list); + list_add_tail (&instance->list_head, + &flow_control_service->list); + + list_init (&flow_control_service->list_all); + list_add_tail (&flow_control_service_list_head, + &flow_control_service->list_all); + + for (i = 0; i < flow_control_member_list_entries; i++) { + flow_control_service->flow_control_node_state[i].nodeid = flow_control_member_list[i]; + flow_control_service->processor_count = flow_control_member_list_entries; + } +error_put: + hdb_handle_put (&flow_control_hdb, flow_control_handle); + +error_exit: + return (res); +} + +unsigned int openais_flow_control_destroy ( + unsigned int flow_control_identifier, + unsigned int service, + unsigned char *id, + unsigned int id_len) +{ + struct flow_control_service *flow_control_service; + struct flow_control_instance *instance; + struct list_head *list; + unsigned int res; + + res = hdb_handle_get (&flow_control_hdb, flow_control_handle, + (void *)&instance); + if (res != 0) { + goto error_exit; + } + + for (list = flow_control_service_list_head.next; + list != &flow_control_service_list_head; + list = list->next) { + + flow_control_service = list_entry (list, struct flow_control_service, list_all); + + if ((flow_control_service->id_len == id_len) && + (memcmp (flow_control_service->id, id, id_len) == 0)) { + list_del (&flow_control_service->list); + list_del (&flow_control_service->list_all); + free (flow_control_service); + break; /* done */ + } + } + hdb_handle_put (&flow_control_hdb, flow_control_handle); + +error_exit: + return (res); +} + +/* + * Disable the ability for new messages to be sent for this service + * with the handle id of length id_len + */ +unsigned int openais_flow_control_disable ( + unsigned int flow_control_handle) +{ + struct flow_control_instance *instance; + struct flow_control_service *flow_control_service; + struct list_head *list; + unsigned int res; + unsigned int i; + + res = hdb_handle_get (&flow_control_hdb, flow_control_handle, + (void *)&instance); + if (res != 0) { + goto error_exit; + } + +i = 0; + for (list = instance->list_head.next; + list != &instance->list_head; + list = list->next) { + + flow_control_service = list_entry (list, struct flow_control_service, list); + flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED; + flow_control_xmit (flow_control_service, OPENAIS_FLOW_CONTROL_STATE_DISABLED); + } + hdb_handle_put (&flow_control_hdb, flow_control_handle); + +error_exit: + return (res); +} + +/* + * Enable the ability for new messagess to be sent for this service + * with the handle id of length id_len + */ +unsigned int openais_flow_control_enable ( + unsigned int flow_control_handle) +{ + struct flow_control_instance *instance; + struct flow_control_service *flow_control_service; + struct list_head *list; + unsigned int res; + + res = hdb_handle_get (&flow_control_hdb, flow_control_handle, + (void *)&instance); + if (res != 0) { + goto error_exit; + } + + for (list = instance->list_head.next; + list != &instance->list_head; + list = list->next) { + + + flow_control_service = list_entry (list, struct flow_control_service, list); + flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED; + flow_control_xmit (flow_control_service, OPENAIS_FLOW_CONTROL_STATE_ENABLED); + } + hdb_handle_put (&flow_control_hdb, flow_control_handle); + +error_exit: + return (res); +} diff --git a/exec/flow.h b/exec/flow.h new file mode 100644 index 00000000..17352312 --- /dev/null +++ b/exec/flow.h @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2006 Red Hat, Inc. + * + * + * All rights reserved. + * + * Author: Steven Dake (sdake@mvista.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef FLOW_H_DEFINED +#define FLOW_H_DEFINED + +enum openais_flow_control_state { + OPENAIS_FLOW_CONTROL_STATE_DISABLED, + OPENAIS_FLOW_CONTROL_STATE_ENABLED +}; + +unsigned int openais_flow_control_initialize (void); + +unsigned int openais_flow_control_ipc_init ( + unsigned int *flow_control_identifier, + unsigned int service); + +unsigned int openais_flow_control_ipc_exit ( + unsigned int flow_control_identifier); + +unsigned int openais_flow_control_create ( + unsigned int flow_control_handle, + unsigned int service, + void *id, + unsigned int id_len, + void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state), + void *context); + +unsigned int openais_flow_control_destroy ( + unsigned int flow_control_identifier, + unsigned int service, + unsigned char *id, + unsigned int id_len); + +unsigned int openais_flow_control_disable ( + unsigned int flow_control_identifier); + +unsigned int openais_flow_control_enable ( + unsigned int flow_control_identifier); + +#endif /* FLOW_H_DEFINED */ diff --git a/exec/ipc.c b/exec/ipc.c index 6eba62dc..14965f4f 100644 --- a/exec/ipc.c +++ b/exec/ipc.c @@ -1,7 +1,6 @@ /* * Copyright (c) 2002-2006 MontaVista Software, Inc. * Copyright (c) 2006 Red Hat, Inc. - * Copyright (c) 2006 Sun Microsystems, Inc. * * All rights reserved. * @@ -68,6 +67,7 @@ #include "totemconfig.h" #include "main.h" #include "ipc.h" +#include "flow.h" #include "service.h" #include "sync.h" #include "swab.h" @@ -79,16 +79,27 @@ #include "util.h" -#ifdef OPENAIS_SOLARIS -#define MSG_NOSIGNAL 0 -#endif - #define SERVER_BACKLOG 5 +/* + * When there are this many entries left in a queue, turn on flow control + */ +#define FLOW_CONTROL_ENTRIES_ENABLE 400 + +/* + * When there are this many entries in a queue, turn off flow control + */ +#define FLOW_CONTROL_ENTRIES_DISABLE 64 + + static unsigned int g_gid_valid = 0; static struct totem_ip_address *my_ip; +static totempg_groups_handle ipc_handle; + +DECLARE_LIST_INIT (conn_info_list_head); + static void (*ipc_serialize_lock_fn) (void); static void (*ipc_serialize_unlock_fn) (void); @@ -122,16 +133,22 @@ struct conn_info { int authenticated; /* Is this connection authenticated? */ void *private_data; /* library connection private data */ struct conn_info *conn_info_partner; /* partner connection dispatch<->response */ + unsigned int flow_control_handle; /* flow control identifier */ + unsigned int flow_control_enabled; /* flow control enabled bit */ + unsigned int flow_control_local_count; /* flow control local count */ + enum openais_flow_control flow_control; /* Does this service use IPC flow control */ + pthread_mutex_t flow_control_mutex; int (*lib_exit_fn) (void *conn); struct timerlist timerlist; pthread_mutex_t mutex; pthread_mutex_t *shared_mutex; - + struct list_head list; }; static void *prioritized_poll_thread (void *conn); static int conn_info_outq_flush (struct conn_info *conn_info); static void libais_deliver (struct conn_info *conn_info); +static void ipc_flow_control (struct conn_info *conn_info); /* * IPC Initializers @@ -250,6 +267,15 @@ static int dispatch_init_send_response ( conn_info->conn_info_partner->state = CONN_STATE_ACTIVE; conn_info->lib_exit_fn = ais_service[conn_info->service]->lib_exit_fn; ais_service[conn_info->service]->lib_init_fn (conn_info); + + conn_info->flow_control = ais_service[conn_info->service]->flow_control; + conn_info->conn_info_partner->flow_control = ais_service[conn_info->service]->flow_control; + if (ais_service[conn_info->service]->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) { + openais_flow_control_ipc_init ( + &conn_info->flow_control_handle, + conn_info->service); + + } return (0); } @@ -288,6 +314,7 @@ static inline unsigned int conn_info_create (int fd) { } pthread_mutex_init (&conn_info->mutex, NULL); + pthread_mutex_init (&conn_info->flow_control_mutex, NULL); pthread_mutex_init (conn_info->shared_mutex, NULL); conn_info->state = CONN_STATE_ACTIVE; @@ -295,6 +322,9 @@ static inline unsigned int conn_info_create (int fd) { conn_info->events = POLLIN|POLLNVAL; conn_info->service = SOCKET_SERVICE_INIT; + list_init (&conn_info->list); + list_add (&conn_info_list_head, &conn_info->list); + pthread_attr_init (&conn_info->thread_attr); pthread_attr_setstacksize (&conn_info->thread_attr, 200000); pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_DETACHED); @@ -321,6 +351,7 @@ static void conn_info_destroy (struct conn_info *conn_info) if (conn_info->conn_info_partner) { conn_info->conn_info_partner->conn_info_partner = NULL; } + list_del (&conn_info->list); free (conn_info); } @@ -377,6 +408,9 @@ static int libais_disconnect (struct conn_info *conn_info) } conn_info->state = CONN_STATE_DISCONNECTED; conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTED; + if (conn_info->flow_control_enabled == 1) { + openais_flow_control_disable (conn_info->flow_control_handle); + } return (0); } @@ -410,17 +444,14 @@ static void *prioritized_poll_thread (void *conn) struct conn_info *conn_info = (struct conn_info *)conn; struct pollfd ufd; int fds; + struct sched_param sched_param; int res; pthread_mutex_t *rel_mutex; unsigned int service; struct conn_info *cinfo_partner; -#if ! defined(TS_CLASS) && (defined(OPENAIS_BSD) || defined(OPENAIS_LINUX) || defined(OPENAIS_SOLARIS)) - struct sched_param sched_param; - sched_param.sched_priority = 1; res = pthread_setschedparam (conn_info->thread, SCHED_RR, &sched_param); -#endif ufd.fd = conn_info->fd; for (;;) { @@ -495,6 +526,9 @@ retry_poll: if ((ufd.revents & POLLIN) == POLLIN) { libais_deliver (conn_info); } + + ipc_flow_control (conn_info); + } ipc_serialize_unlock_fn (); @@ -507,20 +541,55 @@ retry_poll: return (0); } -#if defined(OPENAIS_LINUX) || defined(OPENAIS_SOLARIS) +#if defined(OPENAIS_LINUX) /* SUN_LEN is broken for abstract namespace */ #define AIS_SUN_LEN(a) sizeof(*(a)) -#else -#define AIS_SUN_LEN(a) SUN_LEN(a) -#endif - -#if defined(OPENAIS_LINUX) + char *socketname = "libais.socket"; #else +#define AIS_SUN_LEN(a) SUN_LEN(a) + char *socketname = "/var/run/libais.socket"; #endif + +static void ipc_flow_control (struct conn_info *conn_info) +{ + unsigned int entries_used; + unsigned int entries_usedhw; + + entries_used = queue_used (&conn_info->outq); + if (conn_info->flow_control_local_count > entries_used) { + entries_used = conn_info->flow_control_local_count; + } + /* + * IPC group-wide flow control + */ + if (conn_info->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) { + if (conn_info->flow_control_enabled == 0 && + ((entries_used + FLOW_CONTROL_ENTRIES_ENABLE) > SIZEQUEUE)) { + + entries_usedhw = queue_usedhw (&conn_info->outq); + log_printf (LOG_LEVEL_NOTICE, "Enabling flow control - HW mark %d of %d %p.\n", entries_usedhw, SIZEQUEUE, &conn_info->outq); + openais_flow_control_enable (conn_info->flow_control_handle); + conn_info->flow_control_enabled = 1; + conn_info->conn_info_partner->flow_control_enabled = 1; + } + if (conn_info->flow_control_enabled == 1 && + + entries_used <= FLOW_CONTROL_ENTRIES_DISABLE) { + entries_usedhw = queue_usedhw (&conn_info->outq); + + log_printf (LOG_LEVEL_NOTICE, "Disabling flow control - HW mark [%d/%d].\n", + entries_usedhw, SIZEQUEUE); + openais_flow_control_disable (conn_info->flow_control_handle); + conn_info->flow_control_enabled = 0; + conn_info->conn_info_partner->flow_control_enabled = 0; + } + } +} + static int conn_info_outq_flush (struct conn_info *conn_info) { struct queue *outq; int res = 0; @@ -538,14 +607,9 @@ static int conn_info_outq_flush (struct conn_info *conn_info) { msg_send.msg_name = 0; msg_send.msg_namelen = 0; msg_send.msg_iovlen = 1; -#ifndef OPENAIS_SOLARIS msg_send.msg_control = 0; msg_send.msg_controllen = 0; msg_send.msg_flags = 0; -#else - msg_send.msg_accrights = NULL; - msg_send.msg_accrightslen = 0; -#endif while (!queue_is_empty (outq)) { queue_item = queue_item_get (outq); @@ -588,6 +652,7 @@ retry_sendmsg: if (queue_is_empty (outq)) { conn_info->events = POLLIN|POLLNVAL; } + return (0); } @@ -610,8 +675,6 @@ static void libais_deliver (struct conn_info *conn_info) char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))]; struct ucred *cred; int on = 0; -#elif defined(OPENAIS_SOLARIS) - int fd; #else uid_t euid; gid_t egid; @@ -625,25 +688,15 @@ static void libais_deliver (struct conn_info *conn_info) msg_recv.msg_iovlen = 1; msg_recv.msg_name = 0; msg_recv.msg_namelen = 0; -#ifndef OPENAIS_SOLARIS msg_recv.msg_flags = 0; -#endif if (conn_info->authenticated) { -#ifndef OPENAIS_SOLARIS msg_recv.msg_control = 0; msg_recv.msg_controllen = 0; -#else - msg_recv.msg_accrights = NULL; - msg_recv.msg_accrightslen = 0; -#endif } else { #ifdef OPENAIS_LINUX msg_recv.msg_control = (void *)cmsg_cred; msg_recv.msg_controllen = sizeof (cmsg_cred); -#elif defined(OPENAIS_SOLARIS) - msg_recv.msg_accrights = (char *)&fd; - msg_recv.msg_accrightslen = sizeof (fd); #else euid = -1; egid = -1; if (getpeereid(conn_info->fd, &euid, &egid) != -1 && @@ -658,7 +711,9 @@ static void libais_deliver (struct conn_info *conn_info) iov_recv.iov_base = &conn_info->inb[conn_info->inb_start]; iov_recv.iov_len = (SIZEINB) - conn_info->inb_start; - assert (iov_recv.iov_len != 0); + if (conn_info->inb_inuse == SIZEINB) { + return; + } retry_recv: res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL); @@ -669,12 +724,6 @@ retry_recv: return; } else if (res == 0) { -#if defined(OPENAIS_SOLARIS) || defined(OPENAIS_BSD) || defined(OPENAIS_DARWIN) - /* On many OS poll never return POLLHUP or POLLERR. - * EOF is detected when recvmsg return 0. - */ - libais_disconnect_request (conn_info); -#endif return; } @@ -696,9 +745,6 @@ retry_recv: log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, expecting %d\n", cred->gid, g_gid_valid); } } -#elif defined(OPENAIS_SOLARIS) - /* TODO Fix this. There is no authentication on Solaris yet. */ - conn_info->authenticated = 1; #endif /* * Dispatch all messages received in recvmsg that can be dispatched @@ -737,7 +783,7 @@ retry_recv: * to queue a message, otherwise tell the library we are busy and to * try again later */ - send_ok_joined_iovec.iov_base = (char *)header; + send_ok_joined_iovec.iov_base = header; send_ok_joined_iovec.iov_len = header->size; send_ok_joined = totempg_groups_send_ok_joined (openais_group_handle, &send_ok_joined_iovec, 1); @@ -866,6 +912,29 @@ void message_source_set ( source->conn = conn; } +static void ipc_confchg_fn ( + enum totem_configuration_type configuration_type, + unsigned int *member_list, int member_list_entries, + unsigned int *left_list, int left_list_entries, + unsigned int *joined_list, int joined_list_entries, + struct memb_ring_id *ring_id) +{ + struct conn_info *conn_info; + struct list_head *list; + + /* + * Turn on flow control enabled flag for all connections + */ + for (list = conn_info_list_head.next; + list != &conn_info_list_head; + list = list->next) { + + conn_info = list_entry (list, struct conn_info, list); + conn_info->flow_control_enabled = 1; + conn_info->conn_info_partner->flow_control_enabled = 1; + } +} + void openais_ipc_init ( void (*serialize_lock_fn) (void), void (*serialize_unlock_fn) (void), @@ -928,6 +997,15 @@ void openais_ipc_init ( g_gid_valid = gid_valid; my_ip = my_ip_in; + + /* + * Reset internal state of flow control when + * configuration change occurs + */ + res = totempg_groups_initialize ( + &ipc_handle, + NULL, + ipc_confchg_fn); } @@ -982,20 +1060,18 @@ int openais_conn_send_response ( if (!libais_connection_active (conn_info)) { return (-1); } + + ipc_flow_control (conn_info); + outq = &conn_info->outq; msg_send.msg_iov = &iov_send; msg_send.msg_name = 0; msg_send.msg_namelen = 0; msg_send.msg_iovlen = 1; -#ifndef OPENAIS_SOLARIS msg_send.msg_control = 0; msg_send.msg_controllen = 0; msg_send.msg_flags = 0; -#else - msg_send.msg_accrights = NULL; - msg_send.msg_accrightslen = 0; -#endif if (queue_is_full (outq)) { /* @@ -1135,3 +1211,62 @@ void openais_ipc_timer_del_data ( timerlist_del (&conn_info->timerlist, timer_handle); } + +void openais_ipc_flow_control_create ( + void *conn, + unsigned int service, + char *id, + int id_len, + void (*flow_control_state_set_fn) (void *conn, enum openais_flow_control_state), + void *context) +{ + struct conn_info *conn_info = (struct conn_info *)conn; + + openais_flow_control_create ( + conn_info->flow_control_handle, + service, + id, + id_len, + flow_control_state_set_fn, + context); + conn_info->conn_info_partner->flow_control_handle = conn_info->flow_control_handle; +} + +void openais_ipc_flow_control_destroy ( + void *conn, + unsigned int service, + unsigned char *id, + int id_len) +{ + struct conn_info *conn_info = (struct conn_info *)conn; + + openais_flow_control_destroy ( + conn_info->flow_control_handle, + service, + id, + id_len); +} + +void openais_ipc_flow_control_local_increment ( + void *conn) +{ + struct conn_info *conn_info = (struct conn_info *)conn; + + pthread_mutex_lock (&conn_info->flow_control_mutex); + + conn_info->flow_control_local_count++; + + pthread_mutex_unlock (&conn_info->flow_control_mutex); +} + +void openais_ipc_flow_control_local_decrement ( + void *conn) +{ + struct conn_info *conn_info = (struct conn_info *)conn; + + pthread_mutex_lock (&conn_info->flow_control_mutex); + + conn_info->flow_control_local_count--; + + pthread_mutex_unlock (&conn_info->flow_control_mutex); +} diff --git a/exec/ipc.h b/exec/ipc.h index e5cbd204..92774c30 100644 --- a/exec/ipc.h +++ b/exec/ipc.h @@ -36,6 +36,7 @@ #define IPC_H_DEFINED #include "tlist.h" +#include "flow.h" extern void message_source_set (mar_message_source_t *source, void *conn); @@ -68,4 +69,24 @@ extern void openais_ipc_timer_del_data ( void *conn, timer_handle timer_handle); +extern void openais_ipc_flow_control_create ( + void *conn, + unsigned int service, + char *id, + int id_len, + void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state_set), + void *context); + +extern void openais_ipc_flow_control_destroy ( + void *conn, + unsigned int service, + unsigned char *id, + int id_len); + +extern void openais_ipc_flow_control_local_increment ( + void *conn); + +extern void openais_ipc_flow_control_local_decrement ( + void *conn); + #endif /* IPC_H_DEFINED */ diff --git a/exec/lck.c b/exec/lck.c index 07457757..abe13b16 100644 --- a/exec/lck.c +++ b/exec/lck.c @@ -303,6 +303,7 @@ struct openais_service_handler lck_service_handler = { .name = (unsigned char*)"openais distributed locking service B.01.01", .id = LCK_SERVICE, .private_data_size = sizeof (struct lck_pd), + .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED, .lib_init_fn = lck_lib_init_fn, .lib_exit_fn = lck_lib_exit_fn, .lib_service = lck_lib_service, diff --git a/exec/main.c b/exec/main.c index 5f2747b5..49f99d43 100644 --- a/exec/main.c +++ b/exec/main.c @@ -76,6 +76,7 @@ #include "timer.h" #include "print.h" #include "util.h" +#include "flow.h" #include "version.h" #define SERVER_BACKLOG 5 @@ -575,6 +576,9 @@ int main (int argc, char **argv) sync_register (openais_sync_callbacks_retrieve, openais_sync_completed, totem_config.vsf_type); + + res = openais_flow_control_initialize (); + /* * Drop root privleges to user 'ais' * TODO: Don't really need full root capabilities; diff --git a/exec/main.h b/exec/main.h index bd05277b..878b2b74 100644 --- a/exec/main.h +++ b/exec/main.h @@ -46,7 +46,7 @@ * Size of the queue (entries) for I/O's to the API over socket IPC. */ -#define SIZEQUEUE 256 +#define SIZEQUEUE 800 #define SOCKET_SERVICE_INIT 254 diff --git a/exec/msg.c b/exec/msg.c index cce474ba..33ac0a8b 100644 --- a/exec/msg.c +++ b/exec/msg.c @@ -436,6 +436,7 @@ struct openais_service_handler msg_service_handler = { .name = (unsigned char *)"openais message service B.01.01", .id = MSG_SERVICE, .private_data_size = sizeof (struct msg_pd), + .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED, .lib_init_fn = msg_lib_init_fn, .lib_exit_fn = msg_lib_exit_fn, .lib_service = msg_lib_service, diff --git a/exec/service.h b/exec/service.h index 9404ecc0..899f4288 100644 --- a/exec/service.h +++ b/exec/service.h @@ -65,6 +65,7 @@ struct openais_service_handler { unsigned char *name; unsigned short id; unsigned int private_data_size; + enum openais_flow_control flow_control; int (*lib_init_fn) (void *conn); int (*lib_exit_fn) (void *conn); struct openais_lib_handler *lib_service; diff --git a/include/cpg.h b/include/cpg.h index e3c3df11..06f6e406 100644 --- a/include/cpg.h +++ b/include/cpg.h @@ -56,6 +56,11 @@ typedef enum { CPG_TYPE_SAFE /* not implemented */ } cpg_guarantee_t; +typedef enum { + CPG_FLOW_CONTROL_DISABLED, /* flow control is disabled - new messages may be sent */ + CPG_FLOW_CONTROL_ENABLED /* flow control is enabled - new messages should not be sent */ +} cpg_flow_control_state_t; + typedef enum { CPG_OK = 1, CPG_ERR_LIBRARY = 2, @@ -102,7 +107,6 @@ typedef void (*cpg_deliver_fn_t) ( void *msg, int msg_len); - typedef void (*cpg_confchg_fn_t) ( cpg_handle_t handle, struct cpg_name *group_name, @@ -183,4 +187,8 @@ cpg_error_t cpg_membership_get ( struct cpg_address *member_list, int *member_list_entries); +cpg_error_t cpg_flow_control_state_get ( + cpg_handle_t handle, + cpg_flow_control_state_t *flow_control_enabled); + #endif /* OPENAIS_CPG_H_DEFINED */ diff --git a/include/hdb.h b/include/hdb.h index 3c220a25..569832f9 100644 --- a/include/hdb.h +++ b/include/hdb.h @@ -1,5 +1,6 @@ /* * Copyright (c) 2002-2006 MontaVista Software, Inc. + * Copyright (c) 2006 Red Hat, Inc. * Copyright (c) 2006 Sun Microsystems, Inc. * * All rights reserved. @@ -197,7 +198,6 @@ static inline int hdb_iterator_next ( handle_database, handle_database->iterator, instance); - handle_database->iterator += 1; if (res == 0) { diff --git a/include/ipc_cpg.h b/include/ipc_cpg.h index a2960642..cc3609db 100644 --- a/include/ipc_cpg.h +++ b/include/ipc_cpg.h @@ -56,7 +56,8 @@ enum res_cpg_types { MESSAGE_RES_CPG_CONFCHG_CALLBACK = 4, MESSAGE_RES_CPG_DELIVER_CALLBACK = 5, MESSAGE_RES_CPG_TRACKSTART = 6, - MESSAGE_RES_CPG_TRACKSTOP = 7 + MESSAGE_RES_CPG_TRACKSTOP = 7, + MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8 }; enum lib_cpg_confchg_reason { @@ -104,6 +105,11 @@ struct req_lib_cpg_mcast { mar_uint8_t message[] __attribute__((aligned(8))); }; +struct res_lib_cpg_mcast { + mar_res_header_t header __attribute__((aligned(8))); + mar_uint32_t flow_control_state __attribute__((aligned(8))); +}; + /* Message from another node */ struct res_lib_cpg_deliver_callback { mar_res_header_t header __attribute__((aligned(8))); @@ -111,6 +117,7 @@ struct res_lib_cpg_deliver_callback { mar_uint32_t msglen __attribute__((aligned(8))); mar_uint32_t nodeid __attribute__((aligned(8))); mar_uint32_t pid __attribute__((aligned(8))); + mar_uint32_t flow_control_state __attribute__((aligned(8))); mar_uint8_t message[] __attribute__((aligned(8))); }; @@ -140,5 +147,4 @@ struct res_lib_cpg_leave { mar_res_header_t header __attribute__((aligned(8))); }; - #endif /* IPC_CPG_H_DEFINED */ diff --git a/include/queue.h b/include/queue.h index 0638d981..3b985f01 100644 --- a/include/queue.h +++ b/include/queue.h @@ -1,6 +1,5 @@ /* * Copyright (c) 2002-2004 MontaVista Software, Inc. - * Copyright (c) 2006 Sun Microsystems, Inc. * * All rights reserved. * @@ -39,12 +38,7 @@ #include #include "assert.h" -#ifndef OPENAIS_SOLARIS struct queue { -#else -struct _queue { -#define queue _queue -#endif int head; int tail; int used; @@ -103,7 +97,7 @@ static inline int queue_is_empty (struct queue *queue) { int empty; pthread_mutex_lock (&queue->mutex); - empty = queue->used == 0; + empty = (queue->used == 0); pthread_mutex_unlock (&queue->mutex); return (empty); } @@ -219,4 +213,14 @@ static inline int queue_used (struct queue *queue) { return (used); } +static inline int queue_usedhw (struct queue *queue) { + int usedhw; + + pthread_mutex_lock (&queue->mutex); + usedhw = queue->usedhw; + pthread_mutex_unlock (&queue->mutex); + + return (usedhw); +} + #endif /* QUEUE_H_DEFINED */ diff --git a/lib/cpg.c b/lib/cpg.c index 5ce5a3a5..47bca107 100644 --- a/lib/cpg.c +++ b/lib/cpg.c @@ -2,7 +2,6 @@ * vi: set autoindent tabstop=4 shiftwidth=4 : * * Copyright (c) 2006 Red Hat, Inc. - * Copyright (c) 2006 Sun Microsystems, Inc. * * All rights reserved. * @@ -56,6 +55,7 @@ struct cpg_inst { int response_fd; int dispatch_fd; int finalize; + cpg_flow_control_state_t flow_control_state; cpg_callbacks_t callbacks; pthread_mutex_t response_mutex; pthread_mutex_t dispatch_mutex; @@ -307,6 +307,7 @@ cpg_error_t cpg_dispatch ( case MESSAGE_RES_CPG_DELIVER_CALLBACK: res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data; + cpg_inst->flow_control_state = res_cpg_deliver_callback->flow_control_state; marshall_from_mar_cpg_name_t ( &group_name, &res_cpg_deliver_callback->group_name); @@ -353,7 +354,6 @@ cpg_error_t cpg_dispatch ( res_cpg_confchg_callback->joined_list_entries); break; - default: error = SA_AIS_ERR_LIBRARY; goto error_nounlock; @@ -412,7 +412,7 @@ cpg_error_t cpg_join ( marshall_to_mar_cpg_name_t (&req_lib_cpg_trackstart.group_name, group); - iov[0].iov_base = (char *)&req_lib_cpg_trackstart; + iov[0].iov_base = &req_lib_cpg_trackstart; iov[0].iov_len = sizeof (struct req_lib_cpg_trackstart); error = saSendMsgReceiveReply (cpg_inst->dispatch_fd, iov, 1, @@ -430,7 +430,7 @@ cpg_error_t cpg_join ( marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name, group); - iov[0].iov_base = (char *)&req_lib_cpg_join; + iov[0].iov_base = &req_lib_cpg_join; iov[0].iov_len = sizeof (struct req_lib_cpg_join); error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, 1, @@ -471,7 +471,7 @@ cpg_error_t cpg_leave ( marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name, group); - iov[0].iov_base = (char *)&req_lib_cpg_leave; + iov[0].iov_base = &req_lib_cpg_leave; iov[0].iov_len = sizeof (struct req_lib_cpg_leave); pthread_mutex_lock (&cpg_inst->response_mutex); @@ -503,7 +503,7 @@ cpg_error_t cpg_mcast_joined ( struct cpg_inst *cpg_inst; struct iovec iov[64]; struct req_lib_cpg_mcast req_lib_cpg_mcast; - mar_res_header_t res_lib_cpg_mcast; + struct res_lib_cpg_mcast res_lib_cpg_mcast; int msg_len = 0; error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); @@ -522,14 +522,14 @@ cpg_error_t cpg_mcast_joined ( req_lib_cpg_mcast.guarantee = guarantee; req_lib_cpg_mcast.msglen = msg_len; - iov[0].iov_base = (char *)&req_lib_cpg_mcast; + iov[0].iov_base = &req_lib_cpg_mcast; iov[0].iov_len = sizeof (struct req_lib_cpg_mcast); memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec)); pthread_mutex_lock (&cpg_inst->response_mutex); error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, iov_len + 1, - &res_lib_cpg_mcast, sizeof (mar_res_header_t)); + &res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast)); pthread_mutex_unlock (&cpg_inst->response_mutex); @@ -537,7 +537,11 @@ cpg_error_t cpg_mcast_joined ( goto error_exit; } - error = res_lib_cpg_mcast.error; + cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state; + if (res_lib_cpg_mcast.header.error == CPG_ERR_TRY_AGAIN) { + cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED; + } + error = res_lib_cpg_mcast.header.error; error_exit: saHandleInstancePut (&cpg_handle_t_db, handle); @@ -568,7 +572,7 @@ cpg_error_t cpg_membership_get ( marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name, group_name); - iov.iov_base = (char *)&req_lib_cpg_membership_get; + iov.iov_base = &req_lib_cpg_membership_get; iov.iov_len = sizeof (mar_req_header_t); pthread_mutex_lock (&cpg_inst->response_mutex); @@ -601,4 +605,22 @@ error_exit: return (error); } +cpg_error_t cpg_flow_control_state_get ( + cpg_handle_t handle, + cpg_flow_control_state_t *flow_control_state) +{ + cpg_error_t error; + struct cpg_inst *cpg_inst; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != SA_AIS_OK) { + return (error); + } + + *flow_control_state = cpg_inst->flow_control_state; + + saHandleInstancePut (&cpg_handle_t_db, handle); + + return (error); +} /** @} */ diff --git a/test/Makefile b/test/Makefile index e46ae7f5..9d795991 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,4 +1,5 @@ # Copyright (c) 2002-2004 MontaVista Software, Inc. +# Copyright (c) 2006 Red Hat, Inc. # Copyright (c) 2006 Sun Microsystems, Inc. # # All rights reserved. @@ -45,17 +46,17 @@ LDFLAGS += -L../lib EXTRA_CFLAGS = -I../include TEST_SRC = testclm.c testamf1.c \ - testamf4.c testamf5.c testamf6.c testamfth.c \ - testckpt.c ckptstress.c ckptbench.c \ - ckptbenchth.c testevt.c testevs.c evsbench.c \ - subscription.c publish.c evtbench.c \ - sa_error.c unlink.c testclm2.c testlck.c testmsg.c + testamf4.c testamf5.c testamf6.c testamfth.c \ + testckpt.c ckptstress.c ckptbench.c \ + ckptbenchth.c testevt.c testevs.c evsbench.c \ + subscription.c publish.c evtbench.c \ + sa_error.c unlink.c testclm2.c testlck.c testmsg.c all: testclm testamf1 \ testckpt ckptstress ckptbench \ ckptbenchth ckpt-rd ckpt-wr testevt testevs \ evsbench subscription publish evtbench unlink testclm2 testlck \ - testmsg testcpg openais-cfgtool + testmsg testcpg cpgbench openais-cfgtool testtimer: testtimer.o $(LIBRARIES) $(CC) $(LDFLAGS) -o testtimer testtimer.o ../exec/timer.o @@ -144,6 +145,9 @@ testmsg: testmsg.o $(LIBRARIES) testcpg: testcpg.o $(LIBRARIES) $(CC) $(LDFLAGS) -o testcpg testcpg.o $(LIBS) +cpgbench: cpgbench.o $(LIBRARIES) + $(CC) $(LDFLAGS) -o cpgbench cpgbench.o $(LIBS) + openais-cfgtool: openais-cfgtool.o $(LIBRARIES) $(CC) $(LDFLAGS) -o openais-cfgtool openais-cfgtool.o $(LIBS) diff --git a/test/cpgbench.c b/test/cpgbench.c new file mode 100644 index 00000000..6d09eed2 --- /dev/null +++ b/test/cpgbench.c @@ -0,0 +1,175 @@ +#define _BSD_SOURCE +/* + * Copyright (c) 2006 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Steven Dake (sdake@mvista.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "saAis.h" +#include "cpg.h" + +int alarm_notice; + +void cpg_bm_confchg_fn ( + cpg_handle_t handle, + struct cpg_name *group_name, + struct cpg_address *member_list, int member_list_entries, + struct cpg_address *left_list, int left_list_entries, + struct cpg_address *joined_list, int joined_list_entries) +{ +} + +unsigned int write_count; + +void cpg_bm_deliver_fn ( + cpg_handle_t handle, + struct cpg_name *group_name, + uint32_t nodeid, + uint32_t pid, + void *msg, + int msg_len) +{ + write_count++; +} + +cpg_callbacks_t callbacks = { + .cpg_deliver_fn = cpg_bm_deliver_fn, + .cpg_confchg_fn = cpg_bm_confchg_fn +}; + +char data[500000]; + +void cpg_benchmark ( + cpg_handle_t handle, + int write_size) +{ + struct timeval tv1, tv2, tv_elapsed; + struct iovec iov; + unsigned int res; + cpg_flow_control_state_t flow_control_state; + + alarm_notice = 0; + iov.iov_base = data; + iov.iov_len = write_size; + + write_count = 0; + alarm (10); + + gettimeofday (&tv1, NULL); + do { + /* + * Test checkpoint write + */ + cpg_flow_control_state_get (handle, &flow_control_state); + if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) { +retry: + res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1); + if (res == CPG_ERR_TRY_AGAIN) { + goto retry; + } + } + res = cpg_dispatch (handle, CPG_DISPATCH_ALL); + if (res != CPG_OK) { + printf ("cpg dispatch returned error %d\n", res); + exit (1); + } + } while (alarm_notice == 0); + gettimeofday (&tv2, NULL); + timersub (&tv2, &tv1, &tv_elapsed); + + printf ("%5d messages received ", write_count); + printf ("%5d bytes per write ", write_size); + printf ("%7.3f Seconds runtime ", + (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); + printf ("%9.3f TP/s ", + ((float)write_count) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); + printf ("%7.3f MB/s.\n", + ((float)write_count) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0)); +} + +void sigalrm_handler (int num) +{ + alarm_notice = 1; +} + +static struct cpg_name group_name = { + .value = "cpg_bm", + .length = 6 +}; + +int main (void) { + cpg_handle_t handle; + unsigned int size = 1; + int i; + unsigned int res; + + signal (SIGALRM, sigalrm_handler); + res = cpg_initialize (&handle, &callbacks); + if (res != CPG_OK) { + printf ("cpg_initialize failed with result %d\n", res); + exit (1); + } + + res = cpg_join (handle, &group_name); + if (res != CPG_OK) { + printf ("cpg_join failed with result %d\n", res); + exit (1); + } + + for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */ + cpg_benchmark (handle, size); + size += 1000; + } + + res = cpg_finalize (handle); + if (res != CPG_OK) { + printf ("cpg_join failed with result %d\n", res); + exit (1); + } + return (0); +}