From 55ccceeb5490491a7cfaf1a0e67b1b366af5c839 Mon Sep 17 00:00:00 2001 From: Patrick Caulfield Date: Fri, 3 Mar 2006 08:46:45 +0000 Subject: [PATCH] Add cpg (closed process groups) component. git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@936 fd59a12c-fef9-0310-b244-a6a79926bd2f --- Makefile | 2 + exec/Makefile | 20 +- exec/cpg.c | 994 ++++++++++++++++++++++++++++++++++++++++++++ exec/jhash.h | 146 +++++++ exec/print.c | 3 +- exec/print.h | 1 + include/cpg.h | 179 ++++++++ include/ipc_cpg.h | 161 +++++++ lib/Makefile | 46 +- lib/cpg.c | 552 ++++++++++++++++++++++++ lib/libcpg.versions | 29 ++ test/Makefile | 9 +- test/testcpg.c | 169 ++++++++ 13 files changed, 2287 insertions(+), 24 deletions(-) create mode 100644 exec/cpg.c create mode 100644 exec/jhash.h create mode 100644 include/cpg.h create mode 100644 include/ipc_cpg.h create mode 100644 lib/cpg.c create mode 100644 lib/libcpg.versions create mode 100644 test/testcpg.c diff --git a/Makefile b/Makefile index 66e1ea4d..6c9ffeb6 100644 --- a/Makefile +++ b/Makefile @@ -52,6 +52,8 @@ install: cp lib/libSa*.so* $(DESTDIR)/usr/lib cp lib/libevs.a $(DESTDIR)/usr/lib cp lib/libevs.so* $(DESTDIR)/usr/lib + cp lib/libcpg.a $(DESTDIR)/usr/lib + cp lib/libcpg.so* $(DESTDIR)/usr/lib cp exec/libtotem_pg* $(DESTDIR)/usr/lib install -m 755 exec/aisexec $(DESTDIR)/sbin diff --git a/exec/Makefile b/exec/Makefile index c7439caa..1e9749a1 100644 --- a/exec/Makefile +++ b/exec/Makefile @@ -53,8 +53,8 @@ TOTEM_OBJS = aispoll.o totemip.o totemnet.o totemrrp.o totemsrp.o totemmrp.o tot EXEC_LIBS = libtotem_pg.a # service handler objects -SERV_SRC = evs.c clm.c amf.c ckpt.c evt.c lck.c msg.c cfg.c -SERV_OBJS = evs.o clm.o amf.o ckpt.o evt.o lck.o msg.o cfg.o +SERV_SRC = evs.c clm.c amf.c ckpt.c evt.c lck.c msg.c cfg.c cpg.c +SERV_OBJS = evs.o clm.o amf.o ckpt.o evt.o lck.o msg.o cfg.o cpg.o # main executive objects MAIN_SRC = main.c print.c mempool.c \ @@ -70,7 +70,7 @@ all:libtotem_pg.a libtotem_pg.so.1.0 ../lcr/lcr_ifact.o \ aisexec \ service_evs.lcrso service_clm.lcrso service_amf.lcrso \ service_ckpt.lcrso service_evt.lcrso service_lck.lcrso \ - service_msg.lcrso service_cfg.lcrso \ + service_msg.lcrso service_cfg.lcrso service_cpg.lcrso \ keygen openais-instantiate else EXEC_OBJS = $(TOTEM_OBJS) $(MAIN_OBJS) $(SERV_OBJS) @@ -94,7 +94,7 @@ service_ckpt.lcrso: ckpt.o service_evt.lcrso: evt.o $(CC) -bundle -bundle_loader ./aisexec -bind_at_load evt.o -o $@ - + service_lck.lcrso: lck.o $(CC) -bundle -bundle_loader ./aisexec -bind_at_load lck.o -o $@ @@ -104,6 +104,9 @@ service_msg.lcrso: msg.o service_cfg.lcrso: cfg.o $(CC) -bundle -bundle_loader ./aisexec -bind_at_load cfg.o -o $@ +service_cpg.lcrso: cpg.o + $(CC) -bundle -bundle_loader ./aisexec -bind_at_load cpg.o -o $@ + else service_evs.lcrso: evs.o @@ -129,9 +132,13 @@ service_msg.lcrso: msg.o service_cfg.lcrso: cfg.o $(CC) -shared -Wl,-soname,service_cfg.lcrso cfg.o -o $@ - + +service_cpg.lcrso: cpg.o + $(CC) -shared -Wl,-soname,service_cpg.lcrso cpg.o -o $@ + endif + aisexec: $(EXEC_OBJS) libtotem_pg.a $(CC) $(LDFLAGS) $(EXEC_OBJS) $(EXEC_LIBS) -o aisexec @@ -196,6 +203,9 @@ msg.o: msg.c cfg.o: cfg.c $(CC) $(CFLAGS) -c -o $@ $(*F).c +cpg.o: cpg.c + $(CC) $(CFLAGS) -c -o $@ $(*F).c + # -fPIC rules required for lib totem aispoll.o: aispoll.c diff --git a/exec/cpg.c b/exec/cpg.c new file mode 100644 index 00000000..45fd9cb6 --- /dev/null +++ b/exec/cpg.c @@ -0,0 +1,994 @@ +/* + * vi: set autoindent tabstop=4 shiftwidth=4 : + * + * Copyright (c) 2006 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Patrick Caulfield (pcaulfie@redhat.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../include/saAis.h" +#include "../include/saClm.h" +#include "../include/ipc_gen.h" +#include "../include/ipc_cpg.h" +#include "../include/list.h" +#include "../include/queue.h" +#include "../lcr/lcr_comp.h" +#include "aispoll.h" +#include "totempg.h" +#include "totemip.h" +#include "main.h" +#include "mempool.h" +#include "handlers.h" +#include "jhash.h" +#include "swab.h" + +#define LOG_SERVICE LOG_SERVICE_CPG +#include "print.h" + +#define GROUP_HASH_SIZE 32 + +enum cpg_message_req_types { + MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0, + MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1, + MESSAGE_REQ_EXEC_CPG_JOINLIST = 2, + MESSAGE_REQ_EXEC_CPG_MCAST = 3 +}; + +struct removed_group +{ + struct group_info *gi; + int left_list_entries; + struct cpg_groupinfo left_list[32]; /* ?? arbitrary numbers R us */ + struct list_head list; +}; + +struct group_info { + struct cpg_name groupName; + struct list_head members; + struct list_head list; /* on hash list */ + struct removed_group *rg; /* when a node goes down */ +}; + +struct process_info { + struct totem_ip_address node; + uint32_t pid; + void *conn; + void *trackerconn; + struct group_info *group; + struct list_head list; /* on the group_info members list */ +}; + +struct join_list_entry { + uint32_t pid; + struct cpg_name groupName; +}; + +static struct list_head group_lists[GROUP_HASH_SIZE]; + +/* + * Service Interfaces required by service_message_handler struct + */ +static void cpg_confchg_fn ( + enum totem_configuration_type configuration_type, + struct totem_ip_address *member_list, int member_list_entries, + struct totem_ip_address *left_list, int left_list_entries, + struct totem_ip_address *joined_list, int joined_list_entries, + struct memb_ring_id *ring_id); + +static int cpg_exec_init_fn (struct openais_config *); + +static int cpg_lib_init_fn (void *conn); + +static int cpg_lib_exit_fn (void *conn); + +static void message_handler_req_exec_cpg_procjoin ( + void *message, + struct totem_ip_address *source_addr); + +static void message_handler_req_exec_cpg_procleave ( + void *message, + struct totem_ip_address *source_addr); + +static void message_handler_req_exec_cpg_joinlist ( + void *message, + struct totem_ip_address *source_addr); + +static void message_handler_req_exec_cpg_mcast ( + void *message, + struct totem_ip_address *source_addr); + +static void exec_cpg_procjoin_endian_convert (void *msg); + +static void exec_cpg_joinlist_endian_convert (void *msg); + +static void exec_cpg_mcast_endian_convert (void *msg); + +static void message_handler_req_lib_cpg_join (void *conn, void *message); + +static void message_handler_req_lib_cpg_leave (void *conn, void *message); + +static void message_handler_req_lib_cpg_mcast (void *conn, void *message); + +static void message_handler_req_lib_cpg_membership (void *conn, void *message); + +static void message_handler_req_lib_cpg_trackstart (void *conn, void *message); + +static void message_handler_req_lib_cpg_trackstop (void *conn, void *message); + +static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason); + +static void cpg_exec_send_joinlist(void); + +/* + * Library Handler Definition + */ +static struct openais_lib_handler cpg_lib_handlers[] = +{ + { /* 0 */ + .lib_handler_fn = message_handler_req_lib_cpg_join, + .response_size = sizeof (struct res_lib_cpg_join), + .response_id = MESSAGE_RES_CPG_JOIN, + .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED + }, + { /* 1 */ + .lib_handler_fn = message_handler_req_lib_cpg_leave, + .response_size = sizeof (struct res_lib_cpg_leave), + .response_id = MESSAGE_RES_CPG_LEAVE, + .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED + }, + { /* 2 */ + .lib_handler_fn = message_handler_req_lib_cpg_mcast, + .response_size = sizeof (struct res_header), + .response_id = MESSAGE_RES_CPG_MCAST, + .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED + }, + { /* 3 */ + .lib_handler_fn = message_handler_req_lib_cpg_membership, + .response_size = sizeof (struct res_header), + .response_id = MESSAGE_RES_CPG_MEMBERSHIP, + .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED + }, + { /* 4 */ + .lib_handler_fn = message_handler_req_lib_cpg_trackstart, + .response_size = sizeof (struct res_lib_cpg_trackstart), + .response_id = MESSAGE_RES_CPG_TRACKSTART, + .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED + }, + { /* 5 */ + .lib_handler_fn = message_handler_req_lib_cpg_trackstop, + .response_size = sizeof (struct res_lib_cpg_trackstart), + .response_id = MESSAGE_RES_CPG_TRACKSTOP, + .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED + } +}; + +static struct openais_exec_handler cpg_exec_handlers[] = +{ + { /* 0 */ + .exec_handler_fn = message_handler_req_exec_cpg_procjoin, + .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert + }, + { /* 1 */ + .exec_handler_fn = message_handler_req_exec_cpg_procleave, + .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert + }, + { /* 2 */ + .exec_handler_fn = message_handler_req_exec_cpg_joinlist, + .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert + }, + { /* 3 */ + .exec_handler_fn = message_handler_req_exec_cpg_mcast, + .exec_endian_convert_fn = exec_cpg_mcast_endian_convert + }, +}; + +struct openais_service_handler cpg_service_handler = { + .name = (unsigned char*)"openais cluster closed process group service v1.01", + .id = CPG_SERVICE, + .private_data_size = sizeof (struct process_info), + .lib_init_fn = cpg_lib_init_fn, + .lib_exit_fn = cpg_lib_exit_fn, + .lib_handlers = cpg_lib_handlers, + .lib_handlers_count = sizeof (cpg_lib_handlers) / sizeof (struct openais_lib_handler), + .exec_init_fn = cpg_exec_init_fn, + .exec_dump_fn = NULL, + .exec_handlers = cpg_exec_handlers, + .exec_handlers_count = sizeof (cpg_exec_handlers) / sizeof (struct openais_exec_handler), + .confchg_fn = cpg_confchg_fn, +}; + +/* + * Dynamic loader definition + */ +struct openais_service_handler *cpg_get_service_handler_ver0 (void); + +struct openais_service_handler_iface_ver0 cpg_service_handler_iface = { + .openais_get_service_handler_ver0 = cpg_get_service_handler_ver0 +}; + +struct lcr_iface openais_cpg_ver0[1] = { + { + .name = "openais_cpg", + .version = 0, + .versions_replace = 0, + .versions_replace_count = 0, + .dependencies = 0, + .dependency_count = 0, + .constructor = NULL, + .destructor = NULL, + .interfaces = (void **)&cpg_service_handler_iface, + } +}; + +struct lcr_comp cpg_comp_ver0 = { + .iface_count = 1, + .ifaces = openais_cpg_ver0 +}; + + +struct openais_service_handler *cpg_get_service_handler_ver0 (void) +{ + return (&cpg_service_handler); +} + +__attribute__ ((constructor)) static void cpg_comp_register (void) { + lcr_component_register (&cpg_comp_ver0); +} + +struct req_exec_cpg_procjoin { + struct req_header header; + struct cpg_name groupName; + uint32_t pid; + uint32_t reason; +}; + +struct req_exec_cpg_mcast { + struct req_header header; + struct cpg_name groupName; + uint32_t msglen; + uint32_t pid; + char message[]; +}; + +static int notify_lib_joinlist(struct group_info *gi, void *conn, + int joined_list_entries, struct cpg_groupinfo *joined_list, + int left_list_entries, struct cpg_groupinfo *left_list, + int id) +{ + int count = 0; + char *buf; + struct res_lib_cpg_confchg_callback *res; + struct list_head *iter; + struct list_head *tmp; + struct cpg_groupinfo *retgi; + int size; + + /* First, we need to know how many nodes are in the list. While we're + traversing this list, look for the 'us' entry so we knw which + connection to send back down */ + for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { + struct process_info *pi = list_entry(iter, struct process_info, list); + if (pi->pid) + count++; + } + + log_printf(LOG_LEVEL_DEBUG, "Sending new joinlist (%d elements) to clients\n", count); + + size = sizeof(struct res_lib_cpg_confchg_callback) + + sizeof(struct cpg_groupinfo) * (count + left_list_entries + joined_list_entries); + buf = alloca(size); + if (!buf) + return SA_AIS_ERR_NO_SPACE; + + res = (struct res_lib_cpg_confchg_callback *)buf; + res->joined_list_entries = joined_list_entries; + res->left_list_entries = left_list_entries; + retgi = res->member_list; + + res->header.size = size; + res->header.id = id; + memcpy(&res->groupName, &gi->groupName, sizeof(struct cpg_name)); + + /* Build up the message */ + count = 0; + for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { + struct process_info *pi = list_entry(iter, struct process_info, list); + if (pi->pid) { + /* Processes leaving will be removed AFTER this is done (so that they get their + own leave notifications), so exclude them from the members list here */ + int i; + for (i=0; ipid && left_list[i].nodeId == pi->node.nodeid) + goto next_member; + } + + retgi->nodeId = pi->node.nodeid; + retgi->pid = pi->pid; + retgi++; + count++; + next_member: ; + } + } + res->member_list_entries = count; + + if (left_list_entries) { + memcpy(retgi, left_list, left_list_entries * sizeof(struct cpg_groupinfo)); + retgi += left_list_entries; + } + + if (joined_list_entries) { + memcpy(retgi, joined_list, joined_list_entries * sizeof(struct cpg_groupinfo)); + retgi += joined_list_entries; + } + + if (conn) { + openais_conn_send_response(conn, buf, size); + } + else { + /* Send it to all listeners */ + for (iter = gi->members.next, tmp=iter->next; iter != &gi->members; iter = tmp, tmp=iter->next) { + struct process_info *pi = list_entry(iter, struct process_info, list); + if (pi->trackerconn) { + if (openais_conn_send_response(pi->trackerconn, buf, size) == -1) { + // Error ?? + } + } + } + } + + return SA_AIS_OK; +} + +static void remove_group(struct group_info *gi) +{ + list_del(&gi->list); + free(gi); +} + + +static int cpg_exec_init_fn (struct openais_config *openais_config) +{ + int i; + + for (i=0; igroup; + struct cpg_groupinfo notify_info; + + log_printf(LOG_LEVEL_DEBUG, "exit_fn for conn=%p\n", conn); + + if (gi) { + notify_info.pid = pi->pid; + notify_info.nodeId = this_ip->nodeid; + notify_info.reason = CONFCHG_CPG_REASON_PROCDOWN; + cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_PROCDOWN); + list_del(&pi->list); + } + return (0); +} + +static struct group_info *get_group(struct cpg_name *name) +{ + struct list_head *iter; + struct group_info *gi = NULL; + uint32_t hash = jhash(name->value, name->length, 0) % GROUP_HASH_SIZE; + + for (iter = group_lists[hash].next; iter != &group_lists[hash]; iter = iter->next) { + gi = list_entry(iter, struct group_info, list); + if (memcmp(gi->groupName.value, name->value, name->length) == 0) + break; + } + + if (!gi) { + gi = malloc(sizeof(struct group_info)); + if (!gi) { + log_printf(LOG_LEVEL_WARNING, "Unable to allocate group_info struct"); + return NULL; + } + memcpy(&gi->groupName, name, sizeof(struct cpg_name)); + gi->rg = NULL; + list_init(&gi->members); + list_add(&gi->list, &group_lists[hash]); + } + return gi; +} + +static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason) +{ + struct req_exec_cpg_procjoin req_exec_cpg_procjoin; + struct iovec req_exec_cpg_iovec; + int result; + + memcpy(&req_exec_cpg_procjoin.groupName, &gi->groupName, sizeof(struct cpg_name)); + req_exec_cpg_procjoin.pid = pi->pid; + req_exec_cpg_procjoin.reason = reason; + + req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin); + req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn); + + req_exec_cpg_iovec.iov_base = &req_exec_cpg_procjoin; + req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin); + + result = totempg_groups_mcast_joined (openais_group_handle, &req_exec_cpg_iovec, 1, TOTEMPG_AGREED); + + return (result); +} + +static void remove_node_from_groups(struct totem_ip_address *node, struct list_head *remlist) +{ + int i; + struct list_head *iter, *iter2, *tmp; + struct process_info *pi; + struct group_info *gi; + + for (i=0; i < GROUP_HASH_SIZE; i++) { + for (iter = group_lists[i].next, tmp=iter->next; iter != &group_lists[i]; iter = tmp, tmp=iter->next) { + gi = list_entry(iter, struct group_info, list); + for (iter2 = gi->members.next; iter2 != &gi->members; iter2 = iter2->next) { + pi = list_entry(iter2, struct process_info, list); + + if (totemip_equal(&pi->node, node)) { + + /* Add it to the list of nodes to send notifications for */ + if (!gi->rg) { + gi->rg = malloc(sizeof(struct removed_group)); + if (gi->rg) { + list_add(&gi->rg->list, remlist); + gi->rg->gi = gi; + gi->rg->left_list_entries = 0; + } + gi->rg->left_list[gi->rg->left_list_entries].pid = pi->pid; + gi->rg->left_list[gi->rg->left_list_entries].nodeId = pi->node.nodeid; + gi->rg->left_list[gi->rg->left_list_entries].reason = CONFCHG_CPG_REASON_NODEDOWN; + gi->rg->left_list_entries++; + + /* Remove node info for dead node */ + list_del(&pi->list); + free(pi); + } + } + } + } + } +} + + +static void cpg_confchg_fn ( + enum totem_configuration_type configuration_type, + struct totem_ip_address *member_list, int member_list_entries, + struct totem_ip_address *left_list, int left_list_entries, + struct totem_ip_address *joined_list, int joined_list_entries, + struct memb_ring_id *ring_id) +{ + + int i; + struct list_head removed_list; + + log_printf(LOG_LEVEL_DEBUG, "confchg. joined_list: %d, left_list: %d\n", joined_list_entries, left_list_entries); + + list_init(&removed_list); + + /* Tell any newly joined nodes our list of joined groups */ + cpg_exec_send_joinlist(); + + /* Remove nodes from joined groups and add removed groups to the list */ + for (i = 0; i < left_list_entries; i++) { + remove_node_from_groups(&left_list[i], &removed_list); + } + + if (!list_empty(&removed_list)) { + struct list_head *iter; + + for (iter = removed_list.next; iter != &removed_list; iter = iter->next) { + struct removed_group *rg = list_entry(iter, struct removed_group, list); + + notify_lib_joinlist(rg->gi, NULL, + 0, NULL, + rg->left_list_entries, rg->left_list, + MESSAGE_RES_CPG_CONFCHG_CALLBACK); + rg->gi->rg = NULL; + free(rg); + } + } +} + +/* Can byteswap join & leave messages */ +static void exec_cpg_procjoin_endian_convert (void *msg) +{ + struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = (struct req_exec_cpg_procjoin *)msg; + + req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid); + req_exec_cpg_procjoin->groupName.length = swab32(req_exec_cpg_procjoin->groupName.length); +} + +static void exec_cpg_joinlist_endian_convert (void *msg) +{ + struct res_header *res = (struct res_header *)msg; + struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(struct res_header)); + + while ((void*)jle < msg + res->size) { + jle->pid = swab32(jle->pid); + jle->groupName.length = swab32(jle->groupName.length); + jle++; + } + +} + +static void exec_cpg_mcast_endian_convert (void *msg) +{ + struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)msg; + + req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid); + req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen); + req_exec_cpg_mcast->groupName.length = swab32(req_exec_cpg_mcast->groupName.length); + +} + +static void do_proc_join(struct cpg_name *name, uint32_t pid, struct totem_ip_address *node, int reason) +{ + struct group_info *gi; + struct process_info *pi; + struct list_head *iter; + struct cpg_groupinfo notify_info; + + gi = get_group(name); /* this will always succeed ! */ + assert(gi); + + /* See if it already exists in this group */ + for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { + pi = list_entry(iter, struct process_info, list); + if (pi->pid == pid && pi->node.nodeid == node->nodeid) { + return; + } + } + + pi = malloc(sizeof(struct process_info)); + if (!pi) { + log_printf(LOG_LEVEL_WARNING, "Unable to allocate process_info struct"); + return; + } + totemip_copy(&pi->node, node); + pi->pid = pid; + pi->group = gi; + pi->conn = NULL; + pi->trackerconn = NULL; + list_add_tail(&pi->list, &gi->members); + + notify_info.pid = pi->pid; + notify_info.nodeId = node->nodeid; + notify_info.reason = reason; + + notify_lib_joinlist(gi, NULL, + 1, ¬ify_info, + 0, NULL, + MESSAGE_RES_CPG_CONFCHG_CALLBACK); +} + +static void message_handler_req_exec_cpg_procjoin ( + void *message, + struct totem_ip_address *source_addr) +{ + struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = (struct req_exec_cpg_procjoin *)message; + + log_printf(LOG_LEVEL_DEBUG, "got procjoin message from cluster\n"); + + do_proc_join(&req_exec_cpg_procjoin->groupName, req_exec_cpg_procjoin->pid, source_addr, CONFCHG_CPG_REASON_JOIN); +} + +static void message_handler_req_exec_cpg_procleave ( + void *message, + struct totem_ip_address *source_addr) +{ + struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = (struct req_exec_cpg_procjoin *)message; + struct group_info *gi; + struct process_info *pi; + struct list_head *iter; + struct cpg_groupinfo notify_info; + + log_printf(LOG_LEVEL_DEBUG, "got procleave message from cluster\n"); + + gi = get_group(&req_exec_cpg_procjoin->groupName); /* this will always succeed ! */ + assert(gi); + + notify_info.pid = req_exec_cpg_procjoin->pid; + notify_info.nodeId = source_addr->nodeid; + notify_info.reason = req_exec_cpg_procjoin->reason; + + notify_lib_joinlist(gi, NULL, + 0, NULL, + 1, ¬ify_info, + MESSAGE_RES_CPG_CONFCHG_CALLBACK); + + /* Find the node/PID to remove */ + for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { + pi = list_entry(iter, struct process_info, list); + if (pi->pid == req_exec_cpg_procjoin->pid && totemip_equal(&pi->node, source_addr)) { + + list_del(&pi->list); + if (!pi->conn) + free(pi); + + if (list_empty(&gi->members)) { + remove_group(gi); + } + break; + } + } +} + + +/* Got a proclist from another node */ +static void message_handler_req_exec_cpg_joinlist ( + void *message, + struct totem_ip_address *source_addr) +{ + struct res_header *res = (struct res_header *)message; + struct join_list_entry *jle = (struct join_list_entry *)(message + sizeof(struct res_header)); + + log_printf(LOG_LEVEL_DEBUG, "got joinlist message from cluster\n"); + + /* Ignore our own messages */ + if (totemip_equal(source_addr, this_ip)) + return; + + while ((void*)jle < message + res->size) { + do_proc_join(&jle->groupName, jle->pid, source_addr, CONFCHG_CPG_REASON_NODEUP); + jle++; + } +} + +static void message_handler_req_exec_cpg_mcast ( + void *message, + struct totem_ip_address *source_addr) +{ + struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)message; + struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast; + int msglen = req_exec_cpg_mcast->msglen; + char buf[sizeof(*res_lib_cpg_mcast) + msglen]; + struct group_info *gi; + struct list_head *iter; + + gi = get_group(&req_exec_cpg_mcast->groupName); /* this will always succeed ! */ + assert(gi); + + res_lib_cpg_mcast = (struct res_lib_cpg_deliver_callback *)buf; + res_lib_cpg_mcast->header.id = MESSAGE_RES_CPG_DELIVER_CALLBACK; + res_lib_cpg_mcast->header.size = sizeof(*res_lib_cpg_mcast) + msglen; + res_lib_cpg_mcast->msglen = msglen; + res_lib_cpg_mcast->pid = req_exec_cpg_mcast->pid; + res_lib_cpg_mcast->nodeid = source_addr->nodeid; + memcpy(&res_lib_cpg_mcast->groupName, &gi->groupName, sizeof(struct cpg_name)); + memcpy(&res_lib_cpg_mcast->message, req_exec_cpg_mcast->message, msglen); + + /* Send to all interested members */ + for (iter = gi->members.next; iter != &gi->members; iter = iter->next) { + struct process_info *pi = list_entry(iter, struct process_info, list); + if (pi->trackerconn) { + openais_conn_send_response(pi->trackerconn, buf, res_lib_cpg_mcast->header.size); + } + } +} + + +static void cpg_exec_send_joinlist() +{ + int count = 0; + char *buf; + int i; + struct list_head *iter; + struct list_head *iter2; + struct group_info *gi; + struct res_header *res; + struct join_list_entry *jle; + struct iovec req_exec_cpg_iovec; + + log_printf(LOG_LEVEL_DEBUG, "sending joinlist to cluster\n"); + + /* Count the number of groups we are a member of */ + for (i=0; inext) { + gi = list_entry(iter, struct group_info, list); + for (iter2 = gi->members.next; iter2 != &gi->members; iter2 = iter2->next) { + struct process_info *pi = list_entry(iter2, struct process_info, list); + if (pi->pid) { + count++; + } + } + } + } + + /* Nothing to send */ + if (!count) + return; + + buf = alloca(sizeof(struct res_header) + sizeof(struct join_list_entry) * count); + if (!buf) { + log_printf(LOG_LEVEL_WARNING, "Unable to allocate joinlist buffer"); + return; + } + + jle = (struct join_list_entry *)(buf + sizeof(struct res_header)); + res = (struct res_header *)buf; + + for (i=0; inext) { + + gi = list_entry(iter, struct group_info, list); + for (iter2 = gi->members.next; iter2 != &gi->members; iter2 = iter2->next) { + + struct process_info *pi = list_entry(iter2, struct process_info, list); + if (pi->pid) { + memcpy(&jle->groupName, &gi->groupName, sizeof(struct cpg_name)); + jle->pid = pi->pid; + jle++; + } + } + } + } + + res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST); + res->size = sizeof(struct res_header)+sizeof(struct join_list_entry) * count; + + req_exec_cpg_iovec.iov_base = buf; + req_exec_cpg_iovec.iov_len = res->size; + + totempg_groups_mcast_joined (openais_group_handle, &req_exec_cpg_iovec, 1, TOTEMPG_AGREED); +} + +static int cpg_lib_init_fn (void *conn) +{ + struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn); + pi->conn = conn; + + log_printf(LOG_LEVEL_DEBUG, "lib_init_fn: conn=%p, pi=%p\n", conn, pi); + return (0); +} + +/* Join message from the library */ +static void message_handler_req_lib_cpg_join (void *conn, void *message) +{ + struct req_lib_cpg_join *req_lib_cpg_join = (struct req_lib_cpg_join *)message; + struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn); + struct res_lib_cpg_join res_lib_cpg_join; + struct group_info *gi; + SaAisErrorT error = SA_AIS_OK; + struct cpg_groupinfo notify_info; + + log_printf(LOG_LEVEL_DEBUG, "got join request on %p, pi=%p, pi->pid=%d\n", conn, pi, pi->pid); + + /* Already joined on this conn */ + if (pi->pid) { + error = SA_AIS_ERR_INVALID_PARAM; + goto join_err; + } + + gi = get_group(&req_lib_cpg_join->groupName); + if (!gi) { + error = SA_AIS_ERR_NO_SPACE; + goto join_err; + } + + /* Add a node entry for us */ + totemip_copy(&pi->node, this_ip); + pi->pid = req_lib_cpg_join->pid; + pi->group = gi; + list_add(&pi->list, &gi->members); + + /* Tell the rest of the cluster */ + cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCJOIN, CONFCHG_CPG_REASON_JOIN); + + /* Tell this node */ + notify_info.pid = pi->pid; + notify_info.nodeId = this_ip->nodeid; + notify_info.reason = CONFCHG_CPG_REASON_JOIN; + notify_lib_joinlist(gi, NULL, + 1, ¬ify_info, + 0, NULL, + MESSAGE_RES_CPG_CONFCHG_CALLBACK); + +join_err: + res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join); + res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN; + res_lib_cpg_join.header.error = error; + openais_conn_send_response(conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join)); +} + +/* Leave message from the library */ +static void message_handler_req_lib_cpg_leave (void *conn, void *message) +{ + struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn); + struct res_lib_cpg_leave res_lib_cpg_leave; + struct group_info *gi; + SaAisErrorT error = SA_AIS_OK; + + log_printf(LOG_LEVEL_DEBUG, "got leave request on %p\n", conn); + + if (!pi || !pi->pid || !pi->group) { + error = SA_AIS_ERR_INVALID_PARAM; + goto leave_ret; + } + gi = pi->group; + + /* Tell other nodes we are leaving. + When we get this message back we will leave too */ + cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE); + pi->group = NULL; + +leave_ret: + /* send return */ + res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave); + res_lib_cpg_leave.header.id = MESSAGE_RES_CPG_LEAVE; + res_lib_cpg_leave.header.error = error; + openais_conn_send_response(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave)); +} + +/* Mcast message from the library */ +static void message_handler_req_lib_cpg_mcast (void *conn, void *message) +{ + struct req_lib_cpg_mcast *req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)message; + struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn); + struct group_info *gi = pi->group; + struct iovec req_exec_cpg_iovec[2]; + struct req_exec_cpg_mcast req_exec_cpg_mcast; + struct res_header res; + int msglen = req_lib_cpg_mcast->msglen; + int result; + + log_printf(LOG_LEVEL_DEBUG, "got mcast request on %p\n", conn); + + /* Can't send if we're not joined */ + if (!gi) { + res.size = sizeof(res); + res.id = MESSAGE_RES_CPG_MCAST; + res.error = SA_AIS_ERR_ACCESS; /* TODO Better error code ?? */ + openais_conn_send_response(conn, &res, sizeof(res)); + return; + } + + req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen; + req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_MCAST); + req_exec_cpg_mcast.pid = pi->pid; + req_exec_cpg_mcast.msglen = msglen; + memcpy(&req_exec_cpg_mcast.groupName, &gi->groupName, sizeof(struct cpg_name)); + + req_exec_cpg_iovec[0].iov_base = &req_exec_cpg_mcast; + req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast); + req_exec_cpg_iovec[1].iov_base = &req_lib_cpg_mcast->message; + req_exec_cpg_iovec[1].iov_len = msglen; + + // TODO: guarantee type... + result = totempg_groups_mcast_joined (openais_group_handle, req_exec_cpg_iovec, 2, TOTEMPG_AGREED); + + res.size = sizeof(res); + res.id = MESSAGE_RES_CPG_MCAST; + res.error = SA_AIS_OK; + openais_conn_send_response(conn, &res, sizeof(res)); +} + +static void message_handler_req_lib_cpg_membership (void *conn, void *message) +{ + struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn); + + log_printf(LOG_LEVEL_DEBUG, "got membership request on %p\n", conn); + if (!pi->group) { + struct res_header res; + res.size = sizeof(res); + res.id = MESSAGE_RES_CPG_MEMBERSHIP; + res.error = SA_AIS_ERR_ACCESS; /* TODO Better error code */ + openais_conn_send_response(conn, &res, sizeof(res)); + return; + } + + notify_lib_joinlist(pi->group, conn, 0, NULL, 0, NULL, MESSAGE_RES_CPG_MEMBERSHIP); +} + + +static void message_handler_req_lib_cpg_trackstart (void *conn, void *message) +{ + struct req_lib_cpg_trackstart *req_lib_cpg_trackstart = (struct req_lib_cpg_trackstart *)message; + struct res_lib_cpg_trackstart res_lib_cpg_trackstart; + struct group_info *gi; + struct process_info *otherpi; + void *otherconn; + SaAisErrorT error = SA_AIS_OK; + + log_printf(LOG_LEVEL_DEBUG, "got trackstart request on %p\n", conn); + + gi = get_group(&req_lib_cpg_trackstart->groupName); + if (!gi) { + error = SA_AIS_ERR_NO_SPACE; + goto tstart_ret; + } + + /* Find the partner connection and add us to it's process_info struct */ + otherconn = openais_conn_partner_get (conn); + otherpi = (struct process_info *)openais_conn_private_data_get (conn); + otherpi->trackerconn = conn; + +tstart_ret: + res_lib_cpg_trackstart.header.size = sizeof(res_lib_cpg_trackstart); + res_lib_cpg_trackstart.header.id = MESSAGE_RES_CPG_TRACKSTART; + res_lib_cpg_trackstart.header.error = SA_AIS_OK; + openais_conn_send_response(conn, &res_lib_cpg_trackstart, sizeof(res_lib_cpg_trackstart)); +} + +static void message_handler_req_lib_cpg_trackstop (void *conn, void *message) +{ + struct req_lib_cpg_trackstop *req_lib_cpg_trackstop = (struct req_lib_cpg_trackstop *)message; + struct res_lib_cpg_trackstop res_lib_cpg_trackstop; + struct process_info *otherpi; + void *otherconn; + struct group_info *gi; + SaAisErrorT error = SA_AIS_OK; + + log_printf(LOG_LEVEL_DEBUG, "got trackstop request on %p\n", conn); + + gi = get_group(&req_lib_cpg_trackstop->groupName); + if (!gi) { + error = SA_AIS_ERR_NO_SPACE; + goto tstop_ret; + } + + /* Find the partner connection and add us to it's process_info struct */ + otherconn = openais_conn_partner_get (conn); + otherpi = (struct process_info *)openais_conn_private_data_get (conn); + otherpi->trackerconn = NULL; + +tstop_ret: + res_lib_cpg_trackstop.header.size = sizeof(res_lib_cpg_trackstop); + res_lib_cpg_trackstop.header.id = MESSAGE_RES_CPG_TRACKSTOP; + res_lib_cpg_trackstop.header.error = SA_AIS_OK; + openais_conn_send_response(conn, &res_lib_cpg_trackstop.header, sizeof(res_lib_cpg_trackstop)); +} diff --git a/exec/jhash.h b/exec/jhash.h new file mode 100644 index 00000000..cf768762 --- /dev/null +++ b/exec/jhash.h @@ -0,0 +1,146 @@ +#ifndef _LINUX_JHASH_H +#define _LINUX_JHASH_H + +/* jhash.h: Jenkins hash support. + * + * Copyright (C) 1996 Bob Jenkins (bob_jenkins@burtleburtle.net) + * + * http://burtleburtle.net/bob/hash/ + * + * These are the credits from Bob's sources: + * + * lookup2.c, by Bob Jenkins, December 1996, Public Domain. + * hash(), hash2(), hash3, and mix() are externally useful functions. + * Routines to test the hash are included if SELF_TEST is defined. + * You can use this free for any purpose. It has no warranty. + * + * Copyright (C) 2003 David S. Miller (davem@redhat.com) + * + * I've modified Bob's hash to be useful in the Linux kernel, and + * any bugs present are surely my fault. -DaveM + */ + +typedef uint32_t u32; +typedef uint8_t u8; + +/* NOTE: Arguments are modified. */ +#define __jhash_mix(a, b, c) \ +{ \ + a -= b; a -= c; a ^= (c>>13); \ + b -= c; b -= a; b ^= (a<<8); \ + c -= a; c -= b; c ^= (b>>13); \ + a -= b; a -= c; a ^= (c>>12); \ + b -= c; b -= a; b ^= (a<<16); \ + c -= a; c -= b; c ^= (b>>5); \ + a -= b; a -= c; a ^= (c>>3); \ + b -= c; b -= a; b ^= (a<<10); \ + c -= a; c -= b; c ^= (b>>15); \ +} + +/* The golden ration: an arbitrary value */ +#define JHASH_GOLDEN_RATIO 0x9e3779b9 + +/* The most generic version, hashes an arbitrary sequence + * of bytes. No alignment or length assumptions are made about + * the input key. + */ +static inline u32 jhash(const void *key, u32 length, u32 initval) +{ + u32 a, b, c, len; + const u8 *k = key; + + len = length; + a = b = JHASH_GOLDEN_RATIO; + c = initval; + + while (len >= 12) { + a += (k[0] +((u32)k[1]<<8) +((u32)k[2]<<16) +((u32)k[3]<<24)); + b += (k[4] +((u32)k[5]<<8) +((u32)k[6]<<16) +((u32)k[7]<<24)); + c += (k[8] +((u32)k[9]<<8) +((u32)k[10]<<16)+((u32)k[11]<<24)); + + __jhash_mix(a,b,c); + + k += 12; + len -= 12; + } + + c += length; + switch (len) { + case 11: c += ((u32)k[10]<<24); + case 10: c += ((u32)k[9]<<16); + case 9 : c += ((u32)k[8]<<8); + case 8 : b += ((u32)k[7]<<24); + case 7 : b += ((u32)k[6]<<16); + case 6 : b += ((u32)k[5]<<8); + case 5 : b += k[4]; + case 4 : a += ((u32)k[3]<<24); + case 3 : a += ((u32)k[2]<<16); + case 2 : a += ((u32)k[1]<<8); + case 1 : a += k[0]; + }; + + __jhash_mix(a,b,c); + + return c; +} + +/* A special optimized version that handles 1 or more of u32s. + * The length parameter here is the number of u32s in the key. + */ +static inline u32 jhash2(u32 *k, u32 length, u32 initval) +{ + u32 a, b, c, len; + + a = b = JHASH_GOLDEN_RATIO; + c = initval; + len = length; + + while (len >= 3) { + a += k[0]; + b += k[1]; + c += k[2]; + __jhash_mix(a, b, c); + k += 3; len -= 3; + } + + c += length * 4; + + switch (len) { + case 2 : b += k[1]; + case 1 : a += k[0]; + }; + + __jhash_mix(a,b,c); + + return c; +} + + +/* A special ultra-optimized versions that knows they are hashing exactly + * 3, 2 or 1 word(s). + * + * NOTE: In partilar the "c += length; __jhash_mix(a,b,c);" normally + * done at the end is not done here. + */ +static inline u32 jhash_3words(u32 a, u32 b, u32 c, u32 initval) +{ + a += JHASH_GOLDEN_RATIO; + b += JHASH_GOLDEN_RATIO; + c += initval; + + __jhash_mix(a, b, c); + + return c; +} + +static inline u32 jhash_2words(u32 a, u32 b, u32 initval) +{ + return jhash_3words(a, b, 0, initval); +} + +static inline u32 jhash_1word(u32 a, u32 initval) +{ + return jhash_3words(a, 0, 0, initval); +} + +#endif /* _LINUX_JHASH_H */ diff --git a/exec/print.c b/exec/print.c index fe0b4dfd..e45cc123 100644 --- a/exec/print.c +++ b/exec/print.c @@ -73,7 +73,8 @@ static char *log_services[] = { "[MSG ]", "[EVS ]", "[SYNC ]", - "[YKD ]" + "[YKD ]", + "[CPG ]" }; #define LOG_MODE_DEBUG 1 diff --git a/exec/print.h b/exec/print.h index 442ea4c9..9247bae1 100644 --- a/exec/print.h +++ b/exec/print.h @@ -66,6 +66,7 @@ #define LOG_SERVICE_EVS 9 #define LOG_SERVICE_SYNC 10 #define LOG_SERVICE_YKD 11 +#define LOG_SERVICE_CPG 12 extern void internal_log_printf (int logclass, char *format, ...); diff --git a/include/cpg.h b/include/cpg.h new file mode 100644 index 00000000..3a958b83 --- /dev/null +++ b/include/cpg.h @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2006 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Patrick Caulfield (pcaulfi@redhat.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef OPENAIS_CPG_H_DEFINED +#define OPENAIS_CPG_H_DEFINED + +#include + +typedef uint64_t cpg_handle_t; + +typedef enum { + CPG_DISPATCH_ONE, + CPG_DISPATCH_ALL, + CPG_DISPATCH_BLOCKING +} cpg_dispatch_t; + +typedef enum { + CPG_TYPE_UNORDERED, /* not implemented */ + CPG_TYPE_FIFO, /* same as agreed */ + CPG_TYPE_AGREED, + CPG_TYPE_SAFE /* not implemented */ +} cpg_guarantee_t; + +typedef enum { + CPG_OK = 1, + CPG_ERR_LIBRARY = 2, + CPG_ERR_TIMEOUT = 5, + CPG_ERR_TRY_AGAIN = 6, + CPG_ERR_INVALID_PARAM = 7, + CPG_ERR_NO_MEMORY = 8, + CPG_ERR_BAD_HANDLE = 9, + CPG_ERR_ACCESS = 11, + CPG_ERR_NOT_EXIST = 12, + CPG_ERR_EXIST = 14, + CPG_ERR_NOT_SUPPORTED = 20, + CPG_ERR_SECURITY = 29, + CPG_ERR_TOO_MANY_GROUPS=30 +} cpg_error_t; + +typedef enum { + CPG_REASON_JOIN = 1, + CPG_REASON_LEAVE = 2, + CPG_REASON_NODEDOWN = 3, + CPG_REASON_NODEUP = 4, + CPG_REASON_PROCDOWN = 5 +} cpg_reason_t; + +struct cpg_address { + uint32_t nodeId; + uint32_t pid; + uint32_t reason; +}; + +#ifndef CPG_MAX_NAME_LENGTH +#define CPG_MAX_NAME_LENGTH 128 +struct cpg_name { + uint32_t length; + char value[CPG_MAX_NAME_LENGTH]; +}; +#endif + +typedef void (*cpg_deliver_fn_t) ( + cpg_handle_t handle, + struct cpg_name *group_name, + uint32_t nodeid, + uint32_t pid, + void *msg, + int msg_len); + + +typedef void (*cpg_confchg_fn_t) ( + cpg_handle_t handle, + struct cpg_name *group_name, + struct cpg_address *member_list, int member_list_entries, + struct cpg_address *left_list, int left_list_entries, + struct cpg_address *joined_list, int joined_list_entries); + +typedef struct { + cpg_deliver_fn_t cpg_deliver_fn; + cpg_confchg_fn_t cpg_confchg_fn; +} cpg_callbacks_t; + +/* + * Create a new cpg connection + */ +cpg_error_t cpg_initialize ( + cpg_handle_t *handle, + cpg_callbacks_t *callbacks); + +/* + * Close the cpg handle + */ +cpg_error_t cpg_finalize ( + cpg_handle_t handle); + +/* + * Get a file descriptor on which to poll. cpg_handle_t is NOT a + * file descriptor and may not be used directly. + */ +cpg_error_t cpg_fd_get ( + cpg_handle_t handle, + int *fd); + +/* + * Dispatch messages and configuration changes + */ +cpg_error_t cpg_dispatch ( + cpg_handle_t handle, + cpg_dispatch_t dispatch_types); + +/* + * Join one or more groups. + * messages multicasted with cpg_mcast_joined will be sent to every + * group that has been joined on handle handle. Any message multicasted + * to a group that has been previously joined will be delivered in cpg_dispatch + */ +cpg_error_t cpg_join ( + cpg_handle_t handle, + struct cpg_name *group); + +/* + * Leave one or more groups + */ +cpg_error_t cpg_leave ( + cpg_handle_t handle, + struct cpg_name *group); + +/* + * Multicast to groups joined with cpg_join. + * The iovec described by iovec will be multicasted to all groups joined with + * the cpg_join interface for handle. + */ +cpg_error_t cpg_mcast_joined ( + cpg_handle_t handle, + cpg_guarantee_t guarantee, + struct iovec *iovec, + int iov_len); + + +/* + * Get membership information from cpg + */ +cpg_error_t cpg_membership_get ( + cpg_handle_t handle, + struct cpg_name *groupName, + struct cpg_address *member_list, + int *member_list_entries); + +#endif /* OPENAIS_CPG_H_DEFINED */ diff --git a/include/ipc_cpg.h b/include/ipc_cpg.h new file mode 100644 index 00000000..273d0c40 --- /dev/null +++ b/include/ipc_cpg.h @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2006 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Patrick Caulfield (pcaulfie@redhat.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef IPC_CPG_H_DEFINED +#define IPC_CPG_H_DEFINED + +#include +#include "saAis.h" +#include "ipc_gen.h" + +#define CPG_SERVICE 9 + +enum req_cpg_types { + MESSAGE_REQ_CPG_JOIN = 0, + MESSAGE_REQ_CPG_LEAVE = 1, + MESSAGE_REQ_CPG_MCAST = 2, + MESSAGE_REQ_CPG_MEMBERSHIP = 3, + MESSAGE_REQ_CPG_TRACKSTART = 4, + MESSAGE_REQ_CPG_TRACKSTOP = 5 +}; + +enum res_cpg_types { + MESSAGE_RES_CPG_JOIN = 0, + MESSAGE_RES_CPG_LEAVE = 1, + MESSAGE_RES_CPG_MCAST = 2, + MESSAGE_RES_CPG_MEMBERSHIP = 3, + MESSAGE_RES_CPG_CONFCHG_CALLBACK = 4, + MESSAGE_RES_CPG_DELIVER_CALLBACK = 5, + MESSAGE_RES_CPG_TRACKSTART = 6, + MESSAGE_RES_CPG_TRACKSTOP = 7 +}; + +enum lib_cpg_confchg_reason { + CONFCHG_CPG_REASON_JOIN = 1, + CONFCHG_CPG_REASON_LEAVE = 2, + CONFCHG_CPG_REASON_NODEDOWN = 3, + CONFCHG_CPG_REASON_NODEUP = 4, + CONFCHG_CPG_REASON_PROCDOWN = 5 +}; + +#ifndef CPG_MAX_NAME_LENGTH +#define CPG_MAX_NAME_LENGTH 128 +struct cpg_name { + uint32_t length; + char value[CPG_MAX_NAME_LENGTH]; +}; +#endif + +struct req_lib_cpg_join { + struct req_header header; + struct cpg_name groupName; + pid_t pid; +}; + +struct res_lib_cpg_join { + struct res_header header; +}; + +struct req_lib_cpg_trackstart { + struct req_header header; + struct cpg_name groupName; + pid_t pid; +}; + +struct res_lib_cpg_trackstart { + struct res_header header; +}; + +struct req_lib_cpg_trackstop { + struct req_header header; + struct cpg_name groupName; + pid_t pid; +}; + +struct res_lib_cpg_trackstop { + struct res_header header; +}; + +struct req_lib_cpg_mcast { + struct res_header header; + uint32_t guarantee; + uint32_t msglen; + char message[]; +}; + +/* Message from another node */ +struct res_lib_cpg_deliver_callback { + struct res_header header; + struct cpg_name groupName; + uint32_t msglen; + uint32_t nodeid; + uint32_t pid; + char message[]; +}; + +/* Notifications & join return a list of these */ +struct cpg_groupinfo { + uint32_t nodeId; + uint32_t pid; + uint32_t reason; /* How joined or left */ +}; + + +struct req_lib_cpg_membership { + struct req_header header; + struct cpg_name groupName; +}; + +struct res_lib_cpg_confchg_callback { + struct res_header header; + struct cpg_name groupName; + uint32_t member_list_entries; + uint32_t joined_list_entries; + uint32_t left_list_entries; + struct cpg_groupinfo member_list[]; +// struct cpg_groupinfo left_list[]; +// struct cpg_groupinfo joined_list[]; +}; + +struct req_lib_cpg_leave { + struct req_header header; + struct cpg_name groupName; + pid_t pid; +}; + +struct res_lib_cpg_leave { + struct res_header header; +}; + + +#endif /* IPC_CPG_H_DEFINED */ diff --git a/lib/Makefile b/lib/Makefile index a099d26b..284f44a6 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -37,6 +37,7 @@ LDFLAGS += -L./ all:libSaClm.a libSaClm.so.1.0 libSaAmf.a libSaAmf.so.1.0 libSaCkpt.a \ libSaCkpt.so.1.0 libSaEvt.a libSaEvt.so.1.0 libSaLck.a libSaLck.so.1.0 libSaMsg.a libSaMsg.so.1.0 \ + libcpg.a libcpg.so.1.0 \ libais.a libais.so.1.0 libevs.a libevs.so.1.0 LIBAIS_SRC = util.c amf.c clm.c ckpt.c evt.c @@ -96,6 +97,12 @@ libevs.so.1.0: util.o evs.o ln -s libevs.so.1.0 libevs.so.1 ln -s libevs.so.1.0 libevs.so +libcpg.so.1.0: util.o cpg.o + $(CC) -bundle -bind_at_load util.o cpg.o -o $@ + rm -f libcpg.so.1 libcpg.so + ln -s libcpg.so.1.0 libcpg.so.1 + ln -s libcpg.so.1.0 libcpg.so + else libSaClm.so.1.0: util.o clm.o @@ -146,6 +153,12 @@ libevs.so.1.0: util.o evs.o ln -s libevs.so.1.0 libevs.so.1 ln -s libevs.so.1.0 libevs.so +libcpg.so.1.0: util.o cpg.o + $(CC) -shared -Wl,-soname,libcpg.so.1,-version-script=libcpg.versions util.o cpg.o -o $@ + rm -f libcpg.so.1 libcpg.so + ln -s libcpg.so.1.0 libcpg.so.1 + ln -s libcpg.so.1.0 libcpg.so + endif libSaAmf.a: util.o amf.o @@ -165,16 +178,18 @@ libSaMsg.a: util.o msg.o libais.a: util.o amf.o clm.o ckpt.o evt.o msg.o $(AR) -rc libais.a util.o amf.o clm.o ckpt.o evt.o msg.o - + libevs.a: util.o evs.o $(AR) -rc libevs.a util.o evs.o +libcpg.a: util.o cpg.o + $(AR) -rc libcpg.a util.o cpg.o clean: rm -f *.o libais.so* libais.a libSaClm.so* libSaClm.a* libSaAmf.so* libSaAmf.a \ libSaCkpt.so* libSaCkpt.a* libSaEvt.so* libSaEvt.a libSaLck.so* libSaLck.a \ libSaMsg.so* libSaMsg.a libOpenaisCfg.so* libOpenaisCfg.a \ - libevs.so* libevs.a *.da *.bb *.bbg + libevs.so* libevs.a libcpg.so* libcpg.a *.da *.bb *.bbg # -fPIC rules required for all libraries %.o: %.c @@ -184,18 +199,19 @@ depend: makedepend -Y -- $(CFLAGS) $(LIBAIS_SRC) > /dev/null 2>&1 # DO NOT DELETE -util.o: ../include/saAis.h ../include/ipc_evs.h ../include/saAis.h -util.o: ../include/evs.h ../include/saClm.h ../include/ipc_gen.h util.h -amf.o: ../include/saAis.h ../include/ais_amf.h ../include/saAis.h -amf.o: ../include/ipc_evs.h ../include/evs.h ../include/saClm.h -amf.o: ../include/ipc_gen.h util.h +util.o: ../include/saAis.h ../include/ipc_gen.h ../exec/totemip.h util.h +amf.o: ../include/saAis.h ../include/saAmf.h ../include/saAis.h +amf.o: ../include/ipc_gen.h ../exec/totemip.h ../include/ipc_amf.h +amf.o: ../include/ipc_gen.h ../include/ais_amf.h util.h clm.o: ../include/saAis.h ../include/saClm.h ../include/saAis.h -clm.o: ../include/ipc_evs.h ../include/evs.h ../include/saClm.h -clm.o: ../include/ipc_gen.h ../include/ipc_clm.h util.h -ckpt.o: ../include/list.h ../include/saAis.h ../include/saCkpt.h -ckpt.o: ../include/saAis.h ../include/ipc_evs.h ../include/evs.h -ckpt.o: ../include/saClm.h ../include/ipc_gen.h util.h +clm.o: ../include/ipc_gen.h ../exec/totemip.h ../include/ipc_clm.h +clm.o: ../include/saClm.h ../include/ipc_gen.h util.h +ckpt.o: ../include/saAis.h ../include/list.h ../include/saCkpt.h +ckpt.o: ../include/ipc_gen.h ../exec/totemip.h ../include/ipc_ckpt.h +ckpt.o: ../include/saAis.h ../include/saCkpt.h ../include/ipc_gen.h util.h evt.o: ../include/ipc_evt.h ../include/saAis.h ../include/saEvt.h -evt.o: ../include/saClm.h ../include/ipc_gen.h util.h ../include/ipc_evs.h -evt.o: ../include/evs.h ../exec/totempg.h ../exec/aispoll.h -evt.o: ../exec/totemsrp.h +evt.o: ../include/saClm.h ../include/ipc_gen.h util.h ../include/ipc_gen.h +evt.o: ../exec/totemip.h ../exec/totem.h ../exec/totemip.h ../include/list.h +cpg.o: ../include/saAis.h ../include/ipc_gen.h ../exec/totemip.h +cpg.o: ../include/ipc_cpg.h ../include/saAis.h ../include/saClm.h +cpg.o: ../include/ipc_gen.h util.h diff --git a/lib/cpg.c b/lib/cpg.c new file mode 100644 index 00000000..6d3bc974 --- /dev/null +++ b/lib/cpg.c @@ -0,0 +1,552 @@ +/* + * vi: set autoindent tabstop=4 shiftwidth=4 : + * + * Copyright (c) 2006 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Patrick Caulfield (pcaulfie@redhat.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ +/* + * Provides a closed process group API using the openais executive + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "../include/saAis.h" +#include "../include/ipc_cpg.h" +#include "../include/cpg.h" +#include "util.h" + +struct cpg_inst { + int response_fd; + int dispatch_fd; + int finalize; + cpg_callbacks_t callbacks; + pthread_mutex_t response_mutex; + pthread_mutex_t dispatch_mutex; +}; + +static void cpg_instance_destructor (void *instance); + +static struct saHandleDatabase cpg_handle_t_db = { + .handleCount = 0, + .handles = 0, + .mutex = PTHREAD_MUTEX_INITIALIZER, + .handleInstanceDestructor = cpg_instance_destructor +}; + +/* + * Clean up function for a cpg instance (cpg_nitialize) handle + */ +static void cpg_instance_destructor (void *instance) +{ +} + + +cpg_error_t cpg_initialize ( + cpg_handle_t *handle, + cpg_callbacks_t *callbacks) +{ + SaAisErrorT error; + struct cpg_inst *cpg_inst; + + error = saHandleCreate (&cpg_handle_t_db, sizeof (struct cpg_inst), handle); + if (error != SA_AIS_OK) { + goto error_no_destroy; + } + + error = saHandleInstanceGet (&cpg_handle_t_db, *handle, (void *)&cpg_inst); + if (error != SA_AIS_OK) { + goto error_destroy; + } + + error = saServiceConnectTwo (&cpg_inst->dispatch_fd, + &cpg_inst->response_fd, + CPG_SERVICE); + if (error != SA_AIS_OK) { + goto error_put_destroy; + } + + memcpy (&cpg_inst->callbacks, callbacks, sizeof (cpg_callbacks_t)); + + pthread_mutex_init (&cpg_inst->response_mutex, NULL); + + pthread_mutex_init (&cpg_inst->dispatch_mutex, NULL); + + saHandleInstancePut (&cpg_handle_t_db, *handle); + + return (SA_AIS_OK); + +error_put_destroy: + saHandleInstancePut (&cpg_handle_t_db, *handle); +error_destroy: + saHandleDestroy (&cpg_handle_t_db, *handle); +error_no_destroy: + return (error); +} + +cpg_error_t cpg_finalize ( + cpg_handle_t handle) +{ + struct cpg_inst *cpg_inst; + SaAisErrorT error; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != SA_AIS_OK) { + return (error); + } + + pthread_mutex_lock (&cpg_inst->response_mutex); + + /* + * Another thread has already started finalizing + */ + if (cpg_inst->finalize) { + pthread_mutex_unlock (&cpg_inst->response_mutex); + saHandleInstancePut (&cpg_handle_t_db, handle); + return (CPG_ERR_BAD_HANDLE); + } + + cpg_inst->finalize = 1; + + pthread_mutex_unlock (&cpg_inst->response_mutex); + + saHandleDestroy (&cpg_handle_t_db, handle); + + /* + * Disconnect from the server + */ + if (cpg_inst->response_fd != -1) { + shutdown(cpg_inst->response_fd, 0); + close(cpg_inst->response_fd); + } + if (cpg_inst->dispatch_fd != -1) { + shutdown(cpg_inst->dispatch_fd, 0); + close(cpg_inst->dispatch_fd); + } + saHandleInstancePut (&cpg_handle_t_db, handle); + + return (CPG_OK); +} + +cpg_error_t cpg_fd_get ( + cpg_handle_t handle, + int *fd) +{ + SaAisErrorT error; + struct cpg_inst *cpg_inst; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != SA_AIS_OK) { + return (error); + } + + *fd = cpg_inst->dispatch_fd; + + saHandleInstancePut (&cpg_handle_t_db, handle); + + return (SA_AIS_OK); +} + +struct res_overlay { + struct res_header header; + char data[512000]; +}; + +cpg_error_t cpg_dispatch ( + cpg_handle_t handle, + cpg_dispatch_t dispatch_types) +{ + struct pollfd ufds; + int timeout = -1; + SaAisErrorT error; + int cont = 1; /* always continue do loop except when set to 0 */ + int dispatch_avail; + struct cpg_inst *cpg_inst; + struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback; + struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback; + cpg_callbacks_t callbacks; + struct res_overlay dispatch_data; + int ignore_dispatch = 0; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != SA_AIS_OK) { + return (error); + } + + /* + * Timeout instantly for SA_DISPATCH_ONE or SA_DISPATCH_ALL and + * wait indefinately for SA_DISPATCH_BLOCKING + */ + if (dispatch_types == CPG_DISPATCH_ALL) { + timeout = 0; + } + + do { + ufds.fd = cpg_inst->dispatch_fd; + ufds.events = POLLIN; + ufds.revents = 0; + + error = saPollRetry (&ufds, 1, timeout); + if (error != SA_AIS_OK) { + goto error_nounlock; + } + + pthread_mutex_lock (&cpg_inst->dispatch_mutex); + + /* + * Regather poll data in case ufds has changed since taking lock + */ + error = saPollRetry (&ufds, 1, timeout); + if (error != SA_AIS_OK) { + goto error_nounlock; + } + + /* + * Handle has been finalized in another thread + */ + if (cpg_inst->finalize == 1) { + error = CPG_OK; + pthread_mutex_unlock (&cpg_inst->dispatch_mutex); + goto error_unlock; + } + + dispatch_avail = ufds.revents & POLLIN; + if (dispatch_avail == 0 && dispatch_types == CPG_DISPATCH_ALL) { + pthread_mutex_unlock (&cpg_inst->dispatch_mutex); + break; /* exit do while cont is 1 loop */ + } else + if (dispatch_avail == 0) { + pthread_mutex_unlock (&cpg_inst->dispatch_mutex); + continue; /* next poll */ + } + + if (ufds.revents & POLLIN) { + /* + * Queue empty, read response from socket + */ + error = saRecvRetry (cpg_inst->dispatch_fd, &dispatch_data.header, + sizeof (struct res_header)); + if (error != SA_AIS_OK) { + goto error_unlock; + } + if (dispatch_data.header.size > sizeof (struct res_header)) { + error = saRecvRetry (cpg_inst->dispatch_fd, &dispatch_data.data, + dispatch_data.header.size - sizeof (struct res_header)); + + if (error != SA_AIS_OK) { + goto error_unlock; + } + } + } else { + pthread_mutex_unlock (&cpg_inst->dispatch_mutex); + continue; + } + + /* + * Make copy of callbacks, message data, unlock instance, and call callback + * A risk of this dispatch method is that the callback routines may + * operate at the same time that cpgFinalize has been called. + */ + memcpy (&callbacks, &cpg_inst->callbacks, sizeof (cpg_callbacks_t)); + + pthread_mutex_unlock (&cpg_inst->dispatch_mutex); + /* + * Dispatch incoming message + */ + switch (dispatch_data.header.id) { + case MESSAGE_RES_CPG_DELIVER_CALLBACK: + res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data; + callbacks.cpg_deliver_fn (handle, + &res_cpg_deliver_callback->groupName, + res_cpg_deliver_callback->nodeid, + res_cpg_deliver_callback->pid, + &res_cpg_deliver_callback->message, + res_cpg_deliver_callback->msglen); + break; + + case MESSAGE_RES_CPG_CONFCHG_CALLBACK: + res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)&dispatch_data; + callbacks.cpg_confchg_fn (handle, + &res_cpg_confchg_callback->groupName, + (struct cpg_address *)res_cpg_confchg_callback->member_list, + res_cpg_confchg_callback->member_list_entries, + (struct cpg_address *)res_cpg_confchg_callback->member_list + res_cpg_confchg_callback->member_list_entries, + res_cpg_confchg_callback->left_list_entries, + (struct cpg_address *)res_cpg_confchg_callback->member_list + res_cpg_confchg_callback->member_list_entries + res_cpg_confchg_callback->left_list_entries, + res_cpg_confchg_callback->joined_list_entries); + break; + + + default: + error = SA_AIS_ERR_LIBRARY; + goto error_nounlock; + break; + } + + /* + * Determine if more messages should be processed + * */ + switch (dispatch_types) { + case CPG_DISPATCH_ONE: + if (ignore_dispatch) { + ignore_dispatch = 0; + } else { + cont = 0; + } + break; + case CPG_DISPATCH_ALL: + if (ignore_dispatch) { + ignore_dispatch = 0; + } + break; + case CPG_DISPATCH_BLOCKING: + break; + } + } while (cont); + +error_unlock: + saHandleInstancePut (&cpg_handle_t_db, handle); +error_nounlock: + return (error); +} + +cpg_error_t cpg_join ( + cpg_handle_t handle, + struct cpg_name *group) +{ + cpg_error_t error; + struct cpg_inst *cpg_inst; + struct iovec iov[2]; + struct req_lib_cpg_join req_lib_cpg_join; + struct res_lib_cpg_join res_lib_cpg_join; + struct req_lib_cpg_trackstart req_lib_cpg_trackstart; + struct res_lib_cpg_trackstart res_lib_cpg_trackstart; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != SA_AIS_OK) { + return (error); + } + + pthread_mutex_lock (&cpg_inst->response_mutex); + + /* Automatically add a tracker */ + req_lib_cpg_trackstart.header.size = sizeof (struct req_lib_cpg_trackstart); + req_lib_cpg_trackstart.header.id = MESSAGE_REQ_CPG_TRACKSTART; + memcpy(&req_lib_cpg_trackstart.groupName, group, sizeof(struct cpg_name)); + + iov[0].iov_base = &req_lib_cpg_trackstart; + iov[0].iov_len = sizeof (struct req_lib_cpg_trackstart); + + error = saSendMsgReceiveReply (cpg_inst->dispatch_fd, iov, 1, + &res_lib_cpg_trackstart, sizeof (struct res_lib_cpg_trackstart)); + + if (error != SA_AIS_OK) { + pthread_mutex_unlock (&cpg_inst->response_mutex); + goto error_exit; + } + + /* Now join */ + req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join); + req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN; + req_lib_cpg_join.pid = getpid(); + memcpy(&req_lib_cpg_join.groupName, group, sizeof(struct cpg_name)); + + iov[0].iov_base = &req_lib_cpg_join; + iov[0].iov_len = sizeof (struct req_lib_cpg_join); + + error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, 1, + &res_lib_cpg_join, sizeof (struct res_lib_cpg_join)); + + pthread_mutex_unlock (&cpg_inst->response_mutex); + + if (error != SA_AIS_OK) { + goto error_exit; + } + + error = res_lib_cpg_join.header.error; + +error_exit: + saHandleInstancePut (&cpg_handle_t_db, handle); + + return (error); +} + +cpg_error_t cpg_leave ( + cpg_handle_t handle, + struct cpg_name *group) +{ + cpg_error_t error; + struct cpg_inst *cpg_inst; + struct iovec iov[2]; + struct req_lib_cpg_leave req_lib_cpg_leave; + struct res_lib_cpg_leave res_lib_cpg_leave; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != SA_AIS_OK) { + return (error); + } + + req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave); + req_lib_cpg_leave.header.id = MESSAGE_REQ_CPG_LEAVE; + req_lib_cpg_leave.pid = getpid(); + memcpy(&req_lib_cpg_leave.groupName, group, sizeof(struct cpg_name)); + + iov[0].iov_base = &req_lib_cpg_leave; + iov[0].iov_len = sizeof (struct req_lib_cpg_leave); + + pthread_mutex_lock (&cpg_inst->response_mutex); + + error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, 1, + &res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave)); + + pthread_mutex_unlock (&cpg_inst->response_mutex); + if (error != SA_AIS_OK) { + goto error_exit; + } + + error = res_lib_cpg_leave.header.error; + +error_exit: + saHandleInstancePut (&cpg_handle_t_db, handle); + + return (error); +} + +cpg_error_t cpg_mcast_joined ( + cpg_handle_t handle, + cpg_guarantee_t guarantee, + struct iovec *iovec, + int iov_len) +{ + int i; + cpg_error_t error; + struct cpg_inst *cpg_inst; + struct iovec iov[64]; + struct req_lib_cpg_mcast req_lib_cpg_mcast; + struct res_header res_lib_cpg_mcast; + int msg_len = 0; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != SA_AIS_OK) { + return (error); + } + + for (i = 0; i < iov_len; i++ ) { + msg_len += iovec[i].iov_len; + } + + req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) + + msg_len; + + req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST; + req_lib_cpg_mcast.guarantee = guarantee; + req_lib_cpg_mcast.msglen = msg_len; + + iov[0].iov_base = &req_lib_cpg_mcast; + iov[0].iov_len = sizeof (struct req_lib_cpg_mcast); + memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec)); + + pthread_mutex_lock (&cpg_inst->response_mutex); + + error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, iov_len + 1, + &res_lib_cpg_mcast, sizeof (struct res_header)); + + pthread_mutex_unlock (&cpg_inst->response_mutex); + + if (error != SA_AIS_OK) { + goto error_exit; + } + + error = res_lib_cpg_mcast.error; + +error_exit: + saHandleInstancePut (&cpg_handle_t_db, handle); + + return (error); +} + +cpg_error_t cpg_membership_get ( + cpg_handle_t handle, + struct cpg_name *groupName, + struct cpg_address *member_list, + int *member_list_entries) +{ + cpg_error_t error; + struct cpg_inst *cpg_inst; + struct iovec iov; + struct req_lib_cpg_membership req_lib_cpg_membership_get; + struct res_lib_cpg_confchg_callback res_lib_cpg_membership_get; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != SA_AIS_OK) { + return (error); + } + + req_lib_cpg_membership_get.header.size = sizeof (struct req_header); + req_lib_cpg_membership_get.header.id = MESSAGE_REQ_CPG_MEMBERSHIP; + memcpy(&req_lib_cpg_membership_get.groupName, groupName, sizeof(struct cpg_name)); + + iov.iov_base = &req_lib_cpg_membership_get; + iov.iov_len = sizeof (struct req_header); + + pthread_mutex_lock (&cpg_inst->response_mutex); + + error = saSendMsgReceiveReply (cpg_inst->response_fd, &iov, 1, + &res_lib_cpg_membership_get, sizeof (struct res_header)); + + pthread_mutex_unlock (&cpg_inst->response_mutex); + + if (error != SA_AIS_OK) { + goto error_exit; + } + + error = res_lib_cpg_membership_get.header.error; + + /* + * Copy results to caller + */ + *member_list_entries = res_lib_cpg_membership_get.member_list_entries; + if (member_list) { + memcpy (member_list, &res_lib_cpg_membership_get.member_list, + *member_list_entries * sizeof (struct cpg_address)); + } + +error_exit: + saHandleInstancePut (&cpg_handle_t_db, handle); + + return (error); +} diff --git a/lib/libcpg.versions b/lib/libcpg.versions new file mode 100644 index 00000000..d6b3f6ae --- /dev/null +++ b/lib/libcpg.versions @@ -0,0 +1,29 @@ +# Version and symbol export for libcpg.so + +OPENAIS_CPG_1.0 { + global: + cpg_initialize; + cpg_finalize; + cpg_fd_get; + cpg_dispatch; + cpg_join; + cpg_leave; + cpg_mcast_joined; + cpg_membership_get; + + local: + saHandleCreate; + saHandleDestroy; + saHandleInstanceGet; + saHandleInstancePut; + saRecvRetry; + saSelectRetry; + saSendMsgReceiveReply; + saSendMsgRetry; + saSendReceiveReply; + saSendRetry; + saServiceConnect; + saServiceConnectTwo; + saVersionVerify; + clustTimeNow; +}; diff --git a/test/Makefile b/test/Makefile index e1df95ff..9506a85a 100644 --- a/test/Makefile +++ b/test/Makefile @@ -32,7 +32,7 @@ # include ../Makefile.inc -LIBRARIES= ../lib/libSaClm.a ../lib/libSaAmf.a ../lib/libSaCkpt.a ../lib/libSaEvt.a ../lib/libSaLck.a ../lib/libSaMsg.a ../lib/libevs.a +LIBRARIES= ../lib/libSaClm.a ../lib/libSaAmf.a ../lib/libSaCkpt.a ../lib/libSaEvt.a ../lib/libSaLck.a ../lib/libSaMsg.a ../lib/libevs.a ../lib/libcpg.a LIBS = $(LIBRARIES) CFLAGS += -I../include @@ -49,7 +49,7 @@ TEST_SRC = testclm.c testamf1.c \ all: testclm testamf1 \ testckpt ckptstress ckptbench \ ckptbenchth ckpt-rd ckpt-wr testevt testevs \ - evsbench subscription publish evtbench unlink testclm2 testlck testmsg + evsbench subscription publish evtbench unlink testclm2 testlck testmsg testcpg testtimer: testtimer.o $(LIBRARIES) $(CC) $(LDFLAGS) -o testtimer testtimer.o ../exec/timer.o @@ -135,11 +135,14 @@ testlck: testlck.o $(LIBRARIES) testmsg: testmsg.o $(LIBRARIES) $(CC) $(LDFLAGS) -o testmsg testmsg.o $(LIBS) +testcpg: testcpg.o $(LIBRARIES) + $(CC) $(LDFLAGS) -o testcpg testcpg.o $(LIBS) + clean: rm -f *.o testclm testamf testamf1 testamf2 testamf3 testamf4 \ testamf5 testamf6 testamfth testckpt ckptstress testtimer \ ckptbench ckptbenchth testevt testevs ckpt-wr ckpt-rd \ - evsbench subscription publish evtbench unlink testmsg + evsbench subscription publish evtbench unlink testmsg testcpg %.o: %.c $(CC) $(CFLAGS) $(EXTRA_CFLAGS) -c -o $@ $< diff --git a/test/testcpg.c b/test/testcpg.c new file mode 100644 index 00000000..b994126f --- /dev/null +++ b/test/testcpg.c @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2006 Red Hat Inc + * + * All rights reserved. + * + * Author: Patrick Caulfield + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "saAis.h" +#include "cpg.h" + +void print_cpgname (struct cpg_name *name) +{ + int i; + + for (i = 0; i < name->length; i++) { + printf ("%c", name->value[i]); + } +} + +void DeliverCallback ( + cpg_handle_t handle, + struct cpg_name *groupName, + uint32_t nodeid, + uint32_t pid, + void *msg, + int msg_len) +{ + printf("DeliverCallback: message (len=%d)from node/pid %d/%d: '%s'\n", msg_len, nodeid, pid, (char *)msg); +} + +void ConfchgCallback ( + cpg_handle_t handle, + struct cpg_name *groupName, + uint64_t sequence, + struct cpg_address *member_list, int member_list_entries, + struct cpg_address *left_list, int left_list_entries, + struct cpg_address *joined_list, int joined_list_entries) +{ + int i; + + + printf("\nConfchgCallback: group '"); print_cpgname(groupName); printf("' seq=%lld\n", sequence); + for (i=0; i 1) { + strcpy(group_name.value, argv[1]); + group_name.length = strlen(argv[1])+1; + } + else { + strcpy(group_name.value, "GROUP"); + group_name.length = 6; + } + + result = cpg_initialize (&handle, &callbacks); + if (result != SA_AIS_OK) { + printf ("Could not initialize Cluster Process Group API instance error %d\n", result); + exit (1); + } + result = cpg_join(handle, &group_name); + if (result != SA_AIS_OK) { + printf ("Could not join process group, error %d\n", result); + exit (1); + } + + FD_ZERO (&read_fds); + cpg_fd_get(handle, &select_fd); + printf ("Type EXIT to finish\n"); + do { + FD_SET (select_fd, &read_fds); + FD_SET (STDIN_FILENO, &read_fds); + result = select (select_fd + 1, &read_fds, 0, 0, 0); + if (result == -1) { + perror ("select\n"); + } + if (FD_ISSET (STDIN_FILENO, &read_fds)) { + char inbuf[132]; + struct iovec iov; + + gets(inbuf); + if (strncmp(inbuf, "EXIT", 4) == 0) { + cpg_leave(handle, &group_name); + } + else { + iov.iov_base = inbuf; + iov.iov_len = strlen(inbuf)+1; + cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1); + } + } + if (FD_ISSET (select_fd, &read_fds)) { + if (cpg_dispatch (handle, CPG_DISPATCH_ALL) != SA_AIS_OK) + exit(1); + } + } while (result); + + + result = cpg_finalize (handle); + printf ("Finalize result is %d (should be 1)\n", result); + return (0); +}