diff --git a/exec/evs.c b/exec/evs.c index e69de29b..24e26188 100644 --- a/exec/evs.c +++ b/exec/evs.c @@ -0,0 +1,447 @@ +/* + * Copyright (c) 2004 MontaVista Software, Inc. + * + * All rights reserved. + * + * Author: Steven Dake (sdake@mvista.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../include/ais_types.h" +#include "../include/ais_msg.h" +#include "../include/list.h" +#include "../include/queue.h" +#include "aispoll.h" +#include "gmi.h" +#include "parse.h" +#include "main.h" +#include "print.h" +#include "mempool.h" +#include "handlers.h" + +static DECLARE_LIST_INIT (confchg_notify); + +/* + * Service Interfaces required by service_message_handler struct + */ +static int evs_executive_initialize (void); + +static int evs_confchg_fn ( + struct sockaddr_in *member_list, int member_list_entries, + struct sockaddr_in *left_list, int left_list_entries, + struct sockaddr_in *joined_list, int joined_list_entries); + +static int message_handler_req_exec_mcast (void *message, struct in_addr source_addr); + +static int message_handler_req_evs_init (struct conn_info *conn_info, + void *message); + +static int message_handler_req_lib_activatepoll (struct conn_info *conn_info, + void *message); + +static int message_handler_req_evs_join (struct conn_info *conn_info, void *message); +static int message_handler_req_evs_leave (struct conn_info *conn_info, void *message); +static int message_handler_req_evs_mcast_joined (struct conn_info *conn_info, void *message); +static int message_handler_req_evs_mcast_groups (struct conn_info *conn_info, void *message); + +static int evs_exit_fn (struct conn_info *conn_info); + +struct libais_handler evs_libais_handlers[] = +{ + { /* 0 */ + .libais_handler_fn = message_handler_req_lib_activatepoll, + .response_size = sizeof (struct res_lib_activatepoll), + .response_id = MESSAGE_RES_LIB_ACTIVATEPOLL, // TODO RESPONSE + .gmi_prio = GMI_PRIO_RECOVERY + }, + { /* 1 */ + .libais_handler_fn = message_handler_req_evs_join, + .response_size = sizeof (struct res_lib_evs_join), + .response_id = MESSAGE_RES_EVS_JOIN, + .gmi_prio = GMI_PRIO_RECOVERY + }, + { /* 2 */ + .libais_handler_fn = message_handler_req_evs_leave, + .response_size = sizeof (struct res_lib_evs_leave), + .response_id = MESSAGE_RES_EVS_LEAVE, + .gmi_prio = GMI_PRIO_RECOVERY + }, + { /* 3 */ + .libais_handler_fn = message_handler_req_evs_mcast_joined, + .response_size = sizeof (struct res_lib_evs_mcast_joined), + .response_id = MESSAGE_RES_EVS_MCAST_JOINED, + .gmi_prio = GMI_PRIO_LOW + }, + { /* 4 */ + .libais_handler_fn = message_handler_req_evs_mcast_groups, + .response_size = sizeof (struct res_lib_evs_mcast_groups), + .response_id = MESSAGE_RES_EVS_MCAST_GROUPS, + .gmi_prio = GMI_PRIO_LOW + } +}; + +static int (*evs_aisexec_handler_fns[]) (void *, struct in_addr source_addr) = { + message_handler_req_exec_mcast +}; + +struct service_handler evs_service_handler = { + .libais_handlers = evs_libais_handlers, + .libais_handlers_count = sizeof (evs_libais_handlers) / sizeof (struct libais_handler), + .aisexec_handler_fns = evs_aisexec_handler_fns, + .aisexec_handler_fns_count = sizeof (evs_aisexec_handler_fns) / sizeof (int (*)), + .confchg_fn = evs_confchg_fn, + .libais_init_fn = message_handler_req_evs_init, + .libais_exit_fn = evs_exit_fn, + .aisexec_init_fn = evs_executive_initialize +}; + +static int evs_executive_initialize (void) +{ + return (0); +} + +static int evs_exit_fn (struct conn_info *conn_info) +{ + list_del (&conn_info->conn_list); + return (0); +} + +static int evs_confchg_fn ( + struct sockaddr_in *member_list, int member_list_entries, + struct sockaddr_in *left_list, int left_list_entries, + struct sockaddr_in *joined_list, int joined_list_entries) { + + int i; + struct list_head *list; + struct res_evs_confchg_callback res_evs_confchg_callback; + struct conn_info *conn_info; + + /* + * Build configuration change message + */ + res_evs_confchg_callback.header.size = sizeof (struct res_evs_confchg_callback); + res_evs_confchg_callback.header.id = MESSAGE_RES_EVS_CONFCHG_CALLBACK; + res_evs_confchg_callback.header.error = SA_OK; + + for (i = 0; i < member_list_entries; i++) { + res_evs_confchg_callback.member_list[i].s_addr = member_list[i].sin_addr.s_addr; + } + res_evs_confchg_callback.member_list_entries = member_list_entries; + for (i = 0; i < left_list_entries; i++) { + res_evs_confchg_callback.left_list[i].s_addr = left_list[i].sin_addr.s_addr; + } + res_evs_confchg_callback.left_list_entries = left_list_entries; + for (i = 0; i < joined_list_entries; i++) { + res_evs_confchg_callback.joined_list[i].s_addr = joined_list[i].sin_addr.s_addr; + } + res_evs_confchg_callback.joined_list_entries = joined_list_entries; + + /* + * Send configuration change message to every EVS library user + */ + for (list = confchg_notify.next; list != &confchg_notify; list = list->next) { + conn_info = list_entry (list, struct conn_info, conn_list); + libais_send_response (conn_info, &res_evs_confchg_callback, + sizeof (res_evs_confchg_callback)); + } + + return (0); +} + +static int message_handler_req_evs_init (struct conn_info *conn_info, void *message) +{ + SaErrorT error = SA_ERR_SECURITY; + struct res_lib_init res_lib_init; + + log_printf (LOG_LEVEL_DEBUG, "Got request to initalize evs service.\n"); + if (conn_info->authenticated) { + conn_info->service = SOCKET_SERVICE_EVS; + error = SA_OK; + } + + res_lib_init.header.size = sizeof (struct res_lib_init); + res_lib_init.header.id = MESSAGE_RES_INIT; + res_lib_init.header.error = error; + + libais_send_response (conn_info, &res_lib_init, sizeof (res_lib_init)); + + + list_add (&conn_info->conn_list, &confchg_notify); + + if (conn_info->authenticated) { + return (0); + } + + return (-1); +} + +static int message_handler_req_lib_activatepoll (struct conn_info *conn_info, void *message) +{ + struct res_lib_activatepoll res_lib_activatepoll; + + res_lib_activatepoll.header.size = sizeof (struct res_lib_activatepoll); + res_lib_activatepoll.header.id = MESSAGE_RES_LIB_ACTIVATEPOLL; + res_lib_activatepoll.header.error = SA_OK; + libais_send_response (conn_info, &res_lib_activatepoll, + sizeof (struct res_lib_activatepoll)); + + return (0); +} + +static int message_handler_req_evs_join (struct conn_info *conn_info, void *message) +{ + evs_error_t error = EVS_OK; + struct req_lib_evs_join *req_lib_evs_join = (struct req_lib_evs_join *)message; + struct res_lib_evs_join res_lib_evs_join; + void *addr; + + if (req_lib_evs_join->group_entries > 50) { + error = EVS_ERR_TOO_MANY_GROUPS; + goto exit_error; + } + +#ifdef DEBUG + for (i = 0; i < req_lib_evs_join->group_entries; i++) { + printf ("Joining group %s\n", req_lib_evs_join->groups[i].key); + } +#endif + addr = realloc (conn_info->ais_ci.u.libevs_ci.groups, + sizeof (struct evs_group) * + (conn_info->ais_ci.u.libevs_ci.group_entries + req_lib_evs_join->group_entries)); + if (addr == 0) { + error = SA_ERR_NO_MEMORY; + goto exit_error; + } + conn_info->ais_ci.u.libevs_ci.groups = addr; + + memcpy (&conn_info->ais_ci.u.libevs_ci.groups[conn_info->ais_ci.u.libevs_ci.group_entries], + req_lib_evs_join->groups, + sizeof (struct evs_group) * req_lib_evs_join->group_entries); + + conn_info->ais_ci.u.libevs_ci.group_entries += req_lib_evs_join->group_entries; + +exit_error: + res_lib_evs_join.header.size = sizeof (struct res_lib_evs_join); + res_lib_evs_join.header.id = MESSAGE_RES_EVS_JOIN; + res_lib_evs_join.header.error = error; + + libais_send_response (conn_info, &res_lib_evs_join, + sizeof (struct res_lib_evs_join)); + + return (0); +} + +static int message_handler_req_evs_leave (struct conn_info *conn_info, void *message) +{ + struct req_lib_evs_leave *req_lib_evs_leave = (struct req_lib_evs_leave *)message; + struct res_lib_evs_leave res_lib_evs_leave; + evs_error_t error = EVS_OK; + int error_index; + int i, j; + int found; + + for (i = 0; i < req_lib_evs_leave->group_entries; i++) { + found = 0; + for (j = 0; j < conn_info->ais_ci.u.libevs_ci.group_entries;) { + if (memcmp (&req_lib_evs_leave->groups[i], + &conn_info->ais_ci.u.libevs_ci.groups[j], + sizeof (struct evs_group)) == 0) { + + /* + * Delete entry + */ + memmove (&conn_info->ais_ci.u.libevs_ci.groups[j], + &conn_info->ais_ci.u.libevs_ci.groups[j + 1], + (conn_info->ais_ci.u.libevs_ci.group_entries - j - 1) * + sizeof (struct evs_group)); + + conn_info->ais_ci.u.libevs_ci.group_entries -= 1; + + found = 1; + break; + } else { + j++; + } + } + if (found == 0) { + error = EVS_ERR_NOT_EXIST; + error_index = i; + break; + } + } + +#ifdef DEBUG + for (i = 0; i < conn_info->ais_ci.u.libevs_ci.group_entries; i++) { + printf ("Groups Left %s\n", + &conn_info->ais_ci.u.libevs_ci.groups[i].key); + } +#endif + res_lib_evs_leave.header.size = sizeof (struct res_lib_evs_leave); + res_lib_evs_leave.header.id = MESSAGE_RES_EVS_LEAVE; + res_lib_evs_leave.header.error = error; + + libais_send_response (conn_info, &res_lib_evs_leave, + sizeof (struct res_lib_evs_leave)); + + return (0); +} + +static int message_handler_req_evs_mcast_joined (struct conn_info *conn_info, void *message) +{ + evs_error_t error = EVS_OK; + struct req_lib_evs_mcast_joined *req_lib_evs_mcast_joined = (struct req_lib_evs_mcast_joined *)message; + struct res_lib_evs_mcast_joined res_lib_evs_mcast_joined; + struct iovec req_exec_evs_mcast_iovec[3]; + struct req_exec_evs_mcast req_exec_evs_mcast; + + req_exec_evs_mcast.header.size = sizeof (struct req_exec_evs_mcast); + req_exec_evs_mcast.header.id = MESSAGE_REQ_EXEC_EVS_MCAST; + req_exec_evs_mcast.msg_len = req_lib_evs_mcast_joined->msg_len; + req_exec_evs_mcast.group_entries = conn_info->ais_ci.u.libevs_ci.group_entries; + + req_exec_evs_mcast_iovec[0].iov_base = &req_exec_evs_mcast; + req_exec_evs_mcast_iovec[0].iov_len = sizeof (req_exec_evs_mcast); + req_exec_evs_mcast_iovec[1].iov_base = conn_info->ais_ci.u.libevs_ci.groups; + req_exec_evs_mcast_iovec[1].iov_len = conn_info->ais_ci.u.libevs_ci.group_entries * sizeof (struct evs_group); + req_exec_evs_mcast_iovec[2].iov_base = &req_lib_evs_mcast_joined->msg; + req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_joined->msg_len; + + gmi_mcast (&aisexec_groupname, req_exec_evs_mcast_iovec, 3, + req_lib_evs_mcast_joined->priority); + + res_lib_evs_mcast_joined.header.size = sizeof (struct res_lib_evs_mcast_joined); + res_lib_evs_mcast_joined.header.id = MESSAGE_RES_EVS_MCAST_JOINED; + res_lib_evs_mcast_joined.header.error = error; + + libais_send_response (conn_info, &res_lib_evs_mcast_joined, + sizeof (struct res_lib_evs_mcast_joined)); + + return (0); +} + +static int message_handler_req_evs_mcast_groups (struct conn_info *conn_info, void *message) +{ + evs_error_t error = EVS_OK; + struct req_lib_evs_mcast_groups *req_lib_evs_mcast_groups = (struct req_lib_evs_mcast_groups *)message; + struct res_lib_evs_mcast_groups res_lib_evs_mcast_groups; + struct iovec req_exec_evs_mcast_iovec[3]; + struct req_exec_evs_mcast req_exec_evs_mcast; + char *msg_addr; + + req_exec_evs_mcast.header.size = sizeof (struct req_exec_evs_mcast); + req_exec_evs_mcast.header.id = MESSAGE_REQ_EXEC_EVS_MCAST; + req_exec_evs_mcast.msg_len = req_lib_evs_mcast_groups->msg_len; + req_exec_evs_mcast.group_entries = req_lib_evs_mcast_groups->group_entries; + + msg_addr = (char *)req_lib_evs_mcast_groups + + sizeof (struct req_lib_evs_mcast_groups) + + (sizeof (struct evs_group) * req_lib_evs_mcast_groups->group_entries); + + req_exec_evs_mcast_iovec[0].iov_base = &req_exec_evs_mcast; + req_exec_evs_mcast_iovec[0].iov_len = sizeof (req_exec_evs_mcast); + req_exec_evs_mcast_iovec[1].iov_base = &req_lib_evs_mcast_groups->groups; + req_exec_evs_mcast_iovec[1].iov_len = sizeof (struct evs_group) * req_lib_evs_mcast_groups->group_entries; + req_exec_evs_mcast_iovec[2].iov_base = msg_addr; + req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_groups->msg_len; + + gmi_mcast (&aisexec_groupname, req_exec_evs_mcast_iovec, 3, + req_lib_evs_mcast_groups->priority); + + res_lib_evs_mcast_groups.header.size = sizeof (struct res_lib_evs_mcast_groups); + res_lib_evs_mcast_groups.header.id = MESSAGE_RES_EVS_MCAST_GROUPS; + res_lib_evs_mcast_groups.header.error = error; + + libais_send_response (conn_info, &res_lib_evs_mcast_groups, + sizeof (struct res_lib_evs_mcast_groups)); + + return (0); +} +static int message_handler_req_exec_mcast (void *message, struct in_addr source_addr) +{ + struct req_exec_evs_mcast *req_exec_evs_mcast = (struct req_exec_evs_mcast *)message; + struct res_evs_deliver_callback res_evs_deliver_callback; + char *msg_addr; + struct conn_info *conn_info; + struct list_head *list; + int found = 0; + int i, j; + + res_evs_deliver_callback.header.size = sizeof (struct res_evs_deliver_callback) + + req_exec_evs_mcast->msg_len; + res_evs_deliver_callback.header.id = MESSAGE_RES_EVS_DELIVER_CALLBACK; + res_evs_deliver_callback.header.error = SA_OK; + res_evs_deliver_callback.msglen = req_exec_evs_mcast->msg_len; + + msg_addr = (char *)req_exec_evs_mcast + sizeof (struct req_exec_evs_mcast) + + (sizeof (struct evs_group) * req_exec_evs_mcast->group_entries); + + for (list = confchg_notify.next; list != &confchg_notify; list = list->next) { + found = 0; + conn_info = list_entry (list, struct conn_info, conn_list); + + for (i = 0; i < conn_info->ais_ci.u.libevs_ci.group_entries; i++) { + for (j = 0; j < req_exec_evs_mcast->group_entries; j++) { + if (memcmp (&conn_info->ais_ci.u.libevs_ci.groups[i], + &req_exec_evs_mcast->groups[j], + sizeof (struct evs_group)) == 0) { + + found = 1; + break; + } + } + if (found) { + break; + } + } + + if (found) { + libais_send_response (conn_info, &res_evs_deliver_callback, + sizeof (struct res_evs_deliver_callback)); + libais_send_response (conn_info, msg_addr, + req_exec_evs_mcast->msg_len); + } + } + + return (0); +} diff --git a/exec/evs.h b/exec/evs.h index e69de29b..f2da8f72 100644 --- a/exec/evs.h +++ b/exec/evs.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2004 MontaVista Software, Inc. + * + * All rights reserved. + * + * Author: Steven Dake (sdake@mvista.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "../include/ais_types.h" + +#ifndef EVS_H_DEFINED +#define EVS_H_DEFINED + +#include + +struct libevs_ci { + struct evs_group *groups; + int group_entries; +}; + +extern struct service_handler evs_service_handler; + +#endif /* EVS_H_DEFINED */ diff --git a/include/evs.h b/include/evs.h index e69de29b..d188e3ef 100644 --- a/include/evs.h +++ b/include/evs.h @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2004 MontaVista Software, Inc. + * + * All rights reserved. + * + * Author: Steven Dake (sdake@mvista.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef OPENAIS_EVS_H_DEFINED +#define OPENAIS_EVS_H_DEFINED + +#include + +typedef unsigned int evs_handle_t; + +typedef enum { + EVS_DISPATCH_ONE, + EVS_DISPATCH_ALL, + EVS_DISPATCH_BLOCKING +} evs_dispatch_t; + +typedef enum { + EVS_PRIO_RECOVERY, + EVS_PRIO_HIGH, + EVS_PRIO_MED, + EVS_PRIO_LOW +} evs_priority_t; + +typedef enum { + EVS_TYPE_UNORDERED, /* not implemented */ + EVS_TYPE_FIFO, /* same as agreed */ + EVS_TYPE_AGREED, + EVS_TYPE_SAFE /* not implemented */ +} evs_guarantee_t; + +typedef enum { + EVS_OK = 1, + EVS_ERR_LIBRARY = 2, + EVS_ERR_TIMEOUT = 5, + EVS_ERR_TRY_AGAIN = 6, + EVS_ERR_INVALID_PARAM = 7, + EVS_ERR_NO_MEMORY = 8, + EVS_ERR_BAD_HANDLE = 9, + EVS_ERR_ACCESS = 11, + EVS_ERR_NOT_EXIST = 12, + EVS_ERR_EXIST = 14, + EVS_ERR_NOT_SUPPORTED = 20, + EVS_ERR_SECURITY = 29, + EVS_ERR_TOO_MANY_GROUPS=30 +} evs_error_t; + +struct evs_group { + char key[32]; +}; + +typedef void (*evs_deliver_fn_t) ( + struct in_addr source_addr, + void *msg, + int msg_len); + +typedef void (*evs_confchg_fn_t) ( + struct in_addr *member_list, int member_list_entries, + struct in_addr *left_list, int left_list_entries, + struct in_addr *joined_list, int joined_list_entries); + +typedef struct { + evs_deliver_fn_t evs_deliver_fn; + evs_confchg_fn_t evs_confchg_fn; +} evs_callbacks_t; + +/* + * Create a new evs connection + */ +evs_error_t evs_initialize ( + evs_handle_t *handle, + evs_callbacks_t *callbacks); + +/* + * Close the evs handle + */ +evs_error_t evs_finalize ( + evs_handle_t *handle); + +/* + * Get a file descriptor on which to poll. evs_handle_t is NOT a + * file descriptor and may not be used directly. + */ +evs_error_t evs_fd_get ( + evs_handle_t *handle, + int *fd); + +/* + * Dispatch messages and configuration changes + */ +evs_error_t evs_dispatch ( + evs_handle_t *handle, + evs_dispatch_t dispatch_types); + +/* + * Join one or more groups. + * messages multicasted with evs_mcast_joined will be sent to every + * group that has been joined on handle handle. Any message multicasted + * to a group that has been previously joined will be delivered in evs_dispatch + */ +evs_error_t evs_join ( + evs_handle_t *handle, + struct evs_group *groups, + int group_cnt); + +/* + * Leave one or more groups + */ +evs_error_t evs_leave ( + evs_handle_t *handle, + struct evs_group *groups, + int group_cnt); + +/* + * Multicast to groups joined with evs_join. + * The iovec described by iovec will be multicasted to all groups joined with + * the evs_join interface for handle. + */ +evs_error_t evs_mcast_joined ( + evs_handle_t *handle, + evs_guarantee_t guarantee, + evs_priority_t priority, + struct iovec *iovec, + int iov_len); + +/* + * Multicast to specified groups. + * Messages will be multicast to groups specified in the api call and not those + * that have been joined (unless they are in the groups parameter). + */ +evs_error_t evs_mcast_groups ( + evs_handle_t *handle, + evs_guarantee_t guarantee, + evs_priority_t priority, + struct evs_group *groups, + int group_cnt, + struct iovec *iovec, + int iov_len); + +#endif /* OPENAIS_EVS_H_DEFINED */ diff --git a/lib/evs.c b/lib/evs.c index e69de29b..028cac2f 100644 --- a/lib/evs.c +++ b/lib/evs.c @@ -0,0 +1,543 @@ +/* + * Copyright (c) 2004 MontaVista Software, Inc. + * + * All rights reserved. + * + * Author: Steven Dake (sdake@mvista.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ +/* + * Provides an extended virtual synchrony API using the openais executive + */ + +#include +#include +#include +#include +#include +#include "util.h" + +#include "../include/ais_msg.h" +#include "../include/ais_types.h" +#include "../include/evs.h" + +struct evs_inst { + int fd; + int finalize; + evs_callbacks_t callbacks; + struct queue inq; + char dispatch_buffer[512000]; + pthread_mutex_t mutex; +}; + +static struct saHandleDatabase evs_handle_t_db = { + .handleCount = 0, + .handles = 0, + .mutex = PTHREAD_MUTEX_INITIALIZER, + .handleInstanceDestructor = 0 +}; + +evs_error_t evs_initialize ( + evs_handle_t *handle, + evs_callbacks_t *callbacks) +{ + SaErrorT error; + struct evs_inst *evs_inst; + + error = saHandleCreate (&evs_handle_t_db, sizeof (struct evs_inst), handle); + if (error != SA_OK) { + goto error_no_destroy; + } + + error = saHandleInstanceGet (&evs_handle_t_db, *handle, (void *)&evs_inst); + if (error != SA_OK) { + goto error_destroy; + } + + /* + * An inq is needed to store async messages while waiting for a + * sync response + */ + error = saQueueInit (&evs_inst->inq, 128, sizeof (void *)); + if (error != SA_OK) { + goto error_put_destroy; + } + + error = saServiceConnect (&evs_inst->fd, MESSAGE_REQ_EVS_INIT); + if (error != SA_OK) { + goto error_put_destroy_free; + } + + memcpy (&evs_inst->callbacks, callbacks, sizeof (evs_callbacks_t)); + + pthread_mutex_init (&evs_inst->mutex, NULL); + + saHandleInstancePut (&evs_handle_t_db, *handle); + + return (SA_OK); + +error_put_destroy_free: + free (evs_inst->inq.items); +error_put_destroy: + saHandleInstancePut (&evs_handle_t_db, *handle); +error_destroy: + saHandleDestroy (&evs_handle_t_db, *handle); +error_no_destroy: + return (error); +} + +evs_error_t evs_finalize ( + evs_handle_t *handle) +{ + struct evs_inst *evs_inst; + SaErrorT error; + + error = saHandleInstanceGet (&evs_handle_t_db, *handle, (void *)&evs_inst); + if (error != SA_OK) { + return (error); + } + /* + * Another thread has already started finalizing + */ + if (evs_inst->finalize) { + pthread_mutex_unlock (&evs_inst->mutex); + saHandleInstancePut (&evs_handle_t_db, *handle); + return (EVS_ERR_BAD_HANDLE); + } + + evs_inst->finalize = 1; + + saActivatePoll (evs_inst->fd); + + pthread_mutex_unlock (&evs_inst->mutex); + + saHandleInstancePut (&evs_handle_t_db, *handle); + + saHandleDestroy (&evs_handle_t_db, *handle); + + return (EVS_OK); +} + +evs_error_t evs_fd_get ( + evs_handle_t *handle, + int *fd) +{ + SaErrorT error; + struct evs_inst *evs_inst; + + error = saHandleInstanceGet (&evs_handle_t_db, *handle, (void *)&evs_inst); + if (error != SA_OK) { + return (error); + } + + *fd = evs_inst->fd; + + saHandleInstancePut (&evs_handle_t_db, *handle); + + return (SA_OK); +} + +struct message_overlay { + struct res_header header; + char data[4096]; +}; + +evs_error_t evs_dispatch ( + evs_handle_t *handle, + evs_dispatch_t dispatch_types) +{ + struct pollfd ufds; + int timeout = -1; + SaErrorT error; + int cont = 1; /* always continue do loop except when set to 0 */ + int dispatch_avail; + int poll_fd; + struct evs_inst *evs_inst; + struct res_evs_confchg_callback *res_evs_confchg_callback; + struct res_evs_deliver_callback *res_evs_deliver_callback; + evs_callbacks_t callbacks; + struct message_overlay *dispatch_data; + int empty; + struct res_header **queue_msg; + struct res_header *msg; + int ignore_dispatch = 0; + + error = saHandleInstanceGet (&evs_handle_t_db, *handle, (void *)&evs_inst); + if (error != SA_OK) { + return (error); + } + + /* + * Timeout instantly for SA_DISPATCH_ONE or SA_DISPATCH_ALL and + * wait indefinately for SA_DISPATCH_BLOCKING + */ + if (dispatch_types == EVS_DISPATCH_ALL) { + timeout = 0; + } + + do { + poll_fd = evs_inst->fd; + + ufds.fd = poll_fd; + ufds.events = POLLIN; + ufds.revents = 0; + + pthread_mutex_lock (&evs_inst->mutex); + saQueueIsEmpty (&evs_inst->inq, &empty); + if (empty == 1) { + pthread_mutex_unlock (&evs_inst->mutex); + + error = saPollRetry (&ufds, 1, timeout); + if (error != SA_OK) { + goto error_nounlock; + } + + pthread_mutex_lock (&evs_inst->mutex); + } + + /* + * Handle has been finalized in another thread + */ + if (evs_inst->finalize == 1) { + error = SA_OK; + pthread_mutex_unlock (&evs_inst->mutex); + goto error_unlock; + } + + dispatch_avail = (ufds.revents & POLLIN) | (empty == 0); + if (dispatch_avail == 0 && dispatch_types == EVS_DISPATCH_ALL) { + pthread_mutex_unlock (&evs_inst->mutex); + break; /* exit do while cont is 1 loop */ + } else + if (dispatch_avail == 0) { + pthread_mutex_unlock (&evs_inst->mutex); + continue; /* next poll */ + } + + saQueueIsEmpty (&evs_inst->inq, &empty); + if (empty == 0) { + /* + * Queue is not empty, read data from queue + */ + saQueueItemGet (&evs_inst->inq, (void *)&queue_msg); + msg = *queue_msg; + dispatch_data = (struct message_overlay *)msg; + res_evs_deliver_callback = (struct res_evs_deliver_callback *)msg; + res_evs_confchg_callback = (struct res_evs_confchg_callback *)msg; + + saQueueItemRemove (&evs_inst->inq); + } else { + dispatch_data = (struct message_overlay *)evs_inst->dispatch_buffer; + res_evs_deliver_callback = (struct res_evs_deliver_callback *)dispatch_data; + res_evs_confchg_callback = (struct res_evs_confchg_callback *)dispatch_data; + /* + * Queue empty, read response from socket + */ + error = saRecvRetry (evs_inst->fd, &dispatch_data->header, + sizeof (struct res_header), MSG_WAITALL | MSG_NOSIGNAL); + if (error != SA_OK) { + goto error_unlock; + } + if (dispatch_data->header.size > sizeof (struct res_header)) { + error = saRecvRetry (evs_inst->fd, &dispatch_data->data, + dispatch_data->header.size - sizeof (struct res_header), + MSG_WAITALL | MSG_NOSIGNAL); + if (error != SA_OK) { + goto error_unlock; + } + } + } + + /* + * Make copy of callbacks, message data, unlock instance, and call callback + * A risk of this dispatch method is that the callback routines may + * operate at the same time that evsFinalize has been called. + */ + memcpy (&callbacks, &evs_inst->callbacks, sizeof (evs_callbacks_t)); + + pthread_mutex_unlock (&evs_inst->mutex); + /* + * Dispatch incoming message + */ + switch (dispatch_data->header.id) { + case MESSAGE_RES_LIB_ACTIVATEPOLL: + ignore_dispatch = 1; + break; + + case MESSAGE_RES_EVS_DELIVER_CALLBACK: + callbacks.evs_deliver_fn ( + res_evs_deliver_callback->source_addr, + &res_evs_deliver_callback->msg, + res_evs_deliver_callback->msglen); + break; + + case MESSAGE_RES_EVS_CONFCHG_CALLBACK: + callbacks.evs_confchg_fn ( + res_evs_confchg_callback->member_list, + res_evs_confchg_callback->member_list_entries, + res_evs_confchg_callback->left_list, + res_evs_confchg_callback->left_list_entries, + res_evs_confchg_callback->joined_list, + res_evs_confchg_callback->joined_list_entries); + break; + + default: + error = SA_ERR_LIBRARY; + goto error_nounlock; + break; + } + if (empty == 0) { + free (msg); + } + + /* + * Determine if more messages should be processed + * */ + switch (dispatch_types) { + case EVS_DISPATCH_ONE: + if (ignore_dispatch) { + ignore_dispatch = 0; + } else { + cont = 0; + } + break; + case EVS_DISPATCH_ALL: + if (ignore_dispatch) { + ignore_dispatch = 0; + } + break; + case EVS_DISPATCH_BLOCKING: + break; + } + } while (cont); + +error_unlock: + saHandleInstancePut (&evs_handle_t_db, *handle); +error_nounlock: + return (error); +} + +evs_error_t evs_join ( + evs_handle_t *handle, + struct evs_group *groups, + int group_entries) +{ + evs_error_t error; + struct evs_inst *evs_inst; + struct iovec iov[2]; + struct req_lib_evs_join req_lib_evs_join; + struct res_lib_evs_join res_lib_evs_join; + + error = saHandleInstanceGet (&evs_handle_t_db, *handle, (void *)&evs_inst); + if (error != SA_OK) { + return (error); + } + req_lib_evs_join.header.size = sizeof (struct req_lib_evs_join) + + (group_entries * sizeof (struct evs_group)); + req_lib_evs_join.header.id = MESSAGE_REQ_EVS_JOIN; + req_lib_evs_join.group_entries = group_entries; + + iov[0].iov_base = &req_lib_evs_join; + iov[0].iov_len = sizeof (struct req_lib_evs_join); + iov[1].iov_base = groups; + iov[1].iov_len = (group_entries * sizeof (struct evs_group)); + + error = saSendMsgRetry (evs_inst->fd, iov, 2); + if (error != SA_OK) { + goto error_exit; + } + + error = saRecvRetry (evs_inst->fd, &res_lib_evs_join, + sizeof (struct res_lib_evs_join), MSG_WAITALL | MSG_NOSIGNAL); + if (error != SA_OK) { + goto error_exit; + } + + error = res_lib_evs_join.header.error; + +error_exit: + saHandleInstancePut (&evs_handle_t_db, *handle); + + return (error); +} + +evs_error_t evs_leave ( + evs_handle_t *handle, + struct evs_group *groups, + int group_entries) +{ + evs_error_t error; + struct evs_inst *evs_inst; + struct iovec iov[2]; + struct req_lib_evs_leave req_lib_evs_leave; + struct res_lib_evs_leave res_lib_evs_leave; + + error = saHandleInstanceGet (&evs_handle_t_db, *handle, (void *)&evs_inst); + if (error != SA_OK) { + return (error); + } + req_lib_evs_leave.header.size = sizeof (struct req_lib_evs_leave) + + (group_entries * sizeof (struct evs_group)); + req_lib_evs_leave.header.id = MESSAGE_REQ_EVS_LEAVE; + req_lib_evs_leave.group_entries = group_entries; + + iov[0].iov_base = &req_lib_evs_leave; + iov[0].iov_len = sizeof (struct req_lib_evs_leave); + iov[1].iov_base = groups; + iov[1].iov_len = (group_entries * sizeof (struct evs_group)); + + error = saSendMsgRetry (evs_inst->fd, iov, 2); + if (error != SA_OK) { + goto error_exit; + } + + error = saRecvRetry (evs_inst->fd, &res_lib_evs_leave, + sizeof (struct res_lib_evs_leave), MSG_WAITALL | MSG_NOSIGNAL); + if (error != SA_OK) { + goto error_exit; + } + + error = res_lib_evs_leave.header.error; + +error_exit: + saHandleInstancePut (&evs_handle_t_db, *handle); + + return (error); +} + +evs_error_t evs_mcast_joined ( + evs_handle_t *handle, + evs_guarantee_t guarantee, + evs_priority_t priority, + struct iovec *iovec, + int iov_len) +{ + int i; + evs_error_t error; + struct evs_inst *evs_inst; + struct iovec iov[64]; + struct req_lib_evs_mcast_joined req_lib_evs_mcast_joined; + struct res_lib_evs_mcast_joined res_lib_evs_mcast_joined; + int msg_len = 0; + + error = saHandleInstanceGet (&evs_handle_t_db, *handle, (void *)&evs_inst); + if (error != SA_OK) { + return (error); + } + + for (i = 0; i < iov_len; i++ ) { + msg_len += iovec[i].iov_len; + } + + req_lib_evs_mcast_joined.header.size = sizeof (struct req_lib_evs_mcast_joined) + + msg_len; + + req_lib_evs_mcast_joined.header.id = MESSAGE_REQ_EVS_MCAST_JOINED; + req_lib_evs_mcast_joined.priority = priority; + req_lib_evs_mcast_joined.guarantee = guarantee; + req_lib_evs_mcast_joined.msg_len = msg_len; + + iov[0].iov_base = &req_lib_evs_mcast_joined; + iov[0].iov_len = sizeof (struct req_lib_evs_mcast_joined); + memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec)); + + error = saSendMsgRetry (evs_inst->fd, iov, 1 + iov_len); + if (error != SA_OK) { + goto error_exit; + } + + error = saRecvQueue (evs_inst->fd, &res_lib_evs_mcast_joined, &evs_inst->inq, + MESSAGE_RES_EVS_MCAST_JOINED); + if (error != SA_OK) { + goto error_exit; + } + + error = res_lib_evs_mcast_joined.header.error; + +error_exit: + saHandleInstancePut (&evs_handle_t_db, *handle); + + return (error); +} + +evs_error_t evs_mcast_groups ( + evs_handle_t *handle, + evs_guarantee_t guarantee, + evs_priority_t priority, + struct evs_group *groups, + int group_entries, + struct iovec *iovec, + int iov_len) +{ + int i; + evs_error_t error; + struct evs_inst *evs_inst; + struct iovec iov[64]; + struct req_lib_evs_mcast_groups req_lib_evs_mcast_groups; + struct res_lib_evs_mcast_groups res_lib_evs_mcast_groups; + int msg_len = 0; + + error = saHandleInstanceGet (&evs_handle_t_db, *handle, (void *)&evs_inst); + if (error != SA_OK) { + return (error); + } + for (i = 0; i < iov_len; i++) { + msg_len += iovec[i].iov_len; + } + req_lib_evs_mcast_groups.header.size = sizeof (struct req_lib_evs_mcast_groups) + + (group_entries * sizeof (struct evs_group)) + msg_len; + req_lib_evs_mcast_groups.header.id = MESSAGE_REQ_EVS_MCAST_GROUPS; + req_lib_evs_mcast_groups.priority = priority; + req_lib_evs_mcast_groups.guarantee = guarantee; + req_lib_evs_mcast_groups.msg_len = msg_len; + req_lib_evs_mcast_groups.group_entries = group_entries; + + iov[0].iov_base = &req_lib_evs_mcast_groups; + iov[0].iov_len = sizeof (struct req_lib_evs_mcast_groups); + iov[1].iov_base = groups; + iov[1].iov_len = (group_entries * sizeof (struct evs_group)); + memcpy (&iov[2], iovec, iov_len * sizeof (struct iovec)); + + error = saSendMsgRetry (evs_inst->fd, iov, 2 + iov_len); + if (error != SA_OK) { + goto error_exit; + } + + error = saRecvQueue (evs_inst->fd, &res_lib_evs_mcast_groups, &evs_inst->inq, + MESSAGE_RES_EVS_MCAST_GROUPS); + if (error != SA_OK) { + goto error_exit; + } + + error = res_lib_evs_mcast_groups.header.error; + +error_exit: + saHandleInstancePut (&evs_handle_t_db, *handle); + + return (error); +} diff --git a/test/evsbench.c b/test/evsbench.c index e69de29b..d37fcc93 100644 --- a/test/evsbench.c +++ b/test/evsbench.c @@ -0,0 +1,159 @@ +#define _BSD_SOURCE +/* + * Copyright (c) 2004 MontaVista Software, Inc. + * + * All rights reserved. + * + * Author: Steven Dake (sdake@mvista.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ais_types.h" +#include "evs.h" + +void evs_deliver_fn (struct in_addr source_addr, void *msg, int msg_len) +{ +// printf ("Delivering message %s\n", buf); +} + +void evs_confchg_fn ( + struct in_addr *member_list, int member_list_entries, + struct in_addr *left_list, int left_list_entries, + struct in_addr *joined_list, int joined_list_entries) +{ + int i; + + printf ("CONFIGURATION CHANGE\n"); + printf ("--------------------\n"); + printf ("New configuration\n"); + for (i = 0; i < member_list_entries; i++) { + printf ("%s\n", inet_ntoa (member_list[i])); + } + printf ("Members Left:\n"); + for (i = 0; i < left_list_entries; i++) { + printf ("%s\n", inet_ntoa (left_list[i])); + } + printf ("Members Joined:\n"); + for (i = 0; i < joined_list_entries; i++) { + printf ("%s\n", inet_ntoa (joined_list[i])); + } +} + +evs_callbacks_t callbacks = { + evs_deliver_fn, + evs_confchg_fn +}; + +struct evs_group groups[3] = { + { "key1" }, + { "key2" }, + { "key3" } +}; + +char buffer[200000]; + +struct iovec iov = { + .iov_base = buffer, + .iov_len = sizeof (buffer) +}; + +void ckpt_benchmark (evs_handle_t handle, + int write_count, int write_size) +{ + struct timeval tv1, tv2, tv_elapsed; + evs_error_t result; + int i = 0; + + gettimeofday (&tv1, NULL); + + iov.iov_len = write_size; + for (i = 0; i < write_count; i++) { + sprintf (buffer, "This is message %d\n", i); +try_again: + result = evs_mcast_joined (&handle, EVS_TYPE_AGREED, EVS_PRIO_LOW, &iov, 1); + if (result == EVS_ERR_TRY_AGAIN) { + goto try_again; + } + + result = evs_dispatch (&handle, EVS_DISPATCH_ALL); + } + gettimeofday (&tv2, NULL); + timersub (&tv2, &tv1, &tv_elapsed); + + printf ("%5d Writes ", write_count); + printf ("%5d bytes per write ", write_size); + printf ("%7.3f Seconds runtime ", + (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); + printf ("%9.3f TP/s ", + ((float)write_count) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); + printf ("%7.3f MB/s.\n", + ((float)write_count) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0)); +} + +int main (void) { + int size; + int count; + int i; + evs_error_t result; + evs_handle_t handle; + + result = evs_initialize (&handle, &callbacks); + printf ("Init result %d\n", result); + result = evs_join (&handle, groups, 3); + printf ("Join result %d\n", result); + result = evs_leave (&handle, &groups[0], 1); + printf ("Leave result %d\n", result); + + count = 100000; + size = 1300; + + for (i = 0; i < 35; i++) { /* number of repetitions */ + ckpt_benchmark (handle, count, size); + /* + * Adjust count to 95% of previous count + * Adjust bytes to write per checkpoint up by 1500 + */ + count = (((float)count) * 0.80); + size += 100; + } + return (0); +} diff --git a/test/testevs.c b/test/testevs.c index e69de29b..bec734bb 100644 --- a/test/testevs.c +++ b/test/testevs.c @@ -0,0 +1,105 @@ +#include +#include +#include +#include +#include +#include "../include/evs.h" + +char *delivery_string; + +void evs_deliver_fn (struct in_addr source_addr, void *msg, int msg_len) +{ + char *buf = msg; + +// buf += 100000; +// printf ("Delivery callback\n"); + printf ("API '%s' msg '%s'\n", delivery_string, buf); +} + +void evs_confchg_fn ( + struct in_addr *member_list, int member_list_entries, + struct in_addr *left_list, int left_list_entries, + struct in_addr *joined_list, int joined_list_entries) +{ + int i; + + printf ("CONFIGURATION CHANGE\n"); + printf ("--------------------\n"); + printf ("New configuration\n"); + for (i = 0; i < member_list_entries; i++) { + printf ("%s\n", inet_ntoa (member_list[i])); + } + printf ("Members Left:\n"); + for (i = 0; i < left_list_entries; i++) { + printf ("%s\n", inet_ntoa (left_list[i])); + } + printf ("Members Joined:\n"); + for (i = 0; i < joined_list_entries; i++) { + printf ("%s\n", inet_ntoa (joined_list[i])); + } +} + +evs_callbacks_t callbacks = { + evs_deliver_fn, + evs_confchg_fn +}; + +struct evs_group groups[3] = { + { "key1" }, + { "key2" }, + { "key3" } +}; + +char buffer[1000]; +struct iovec iov = { + .iov_base = buffer, + .iov_len = sizeof (buffer) +}; + +int main (void) +{ + unsigned int handle; + evs_error_t result; + int i = 0; + + result = evs_initialize (&handle, &callbacks); + printf ("Init result %d\n", result); + result = evs_join (&handle, groups, 3); + printf ("Join result %d\n", result); + result = evs_leave (&handle, &groups[0], 1); + printf ("Leave result %d\n", result); + delivery_string = "evs_mcast_joined"; + + /* + * Demonstrate evs_mcast_joined + */ + for (i = 0; i < 500; i++) { + sprintf (buffer, "evs_mcast_joined: This is message %d", i); +try_again_one: + result = evs_mcast_joined (&handle, EVS_TYPE_AGREED, EVS_PRIO_LOW, + &iov, 1); + if (result == EVS_ERR_TRY_AGAIN) { + goto try_again_one; + } + result = evs_dispatch (&handle, EVS_DISPATCH_ALL); + } + +// result = evs_leave (&handle, &groups[1], 2); This causes an assertion + + /* + * Demonstrate evs_mcast_joined + */ + delivery_string = "evs_mcast_groups"; + for (i = 0; i < 500; i++) { + sprintf (buffer, "evs_mcast_groups: This is message %d", i); +try_again_two: + result = evs_mcast_groups (&handle, EVS_TYPE_AGREED, EVS_PRIO_LOW, + &groups[1], 1, &iov, 1); + if (result == EVS_ERR_TRY_AGAIN) { + goto try_again_two; + } + + result = evs_dispatch (&handle, EVS_DISPATCH_ALL); + } + return (0); +}