mirror of
				https://git.proxmox.com/git/mirror_corosync
				synced 2025-10-26 04:06:05 +00:00 
			
		
		
		
	 aa62c2c028
			
		
	
	
		aa62c2c028
		
	
	
	
	
		
			
			It turns out that there are some legitimate cases where fragmented messages might be interrupted during sending (e.g. CS_ERR_TRY_AGAIN or as in my case: CS_ERR_INTERRUPT). This creates a situation where LIBCPG_PARTIAL_FIRST is sent multiple times before receiving LIBCPG_PARTIAL_LAST. Solution is to drop incomplete message and start assembly of new message as libcpg should have reported error during sending of that incomplete message. Signed-off-by: Rytis Karpuška <rytisk@neurotechnology.com> Reviewed-by: Christine Caulfield <ccaulfie@redhat.com> Reviewed-by: Jan Friesse <jfriesse@redhat.com>
		
			
				
	
	
		
			1427 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			1427 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*
 | |
|  * vi: set autoindent tabstop=4 shiftwidth=4 :
 | |
|  *
 | |
|  * Copyright (c) 2006-2015 Red Hat, Inc.
 | |
|  *
 | |
|  * All rights reserved.
 | |
|  *
 | |
|  * Author: Christine Caulfield (ccaulfi@redhat.com)
 | |
|  * Author: Jan Friesse (jfriesse@redhat.com)
 | |
|  *
 | |
|  * This software licensed under BSD license, the text of which follows:
 | |
|  *
 | |
|  * Redistribution and use in source and binary forms, with or without
 | |
|  * modification, are permitted provided that the following conditions are met:
 | |
|  *
 | |
|  * - Redistributions of source code must retain the above copyright notice,
 | |
|  *   this list of conditions and the following disclaimer.
 | |
|  * - Redistributions in binary form must reproduce the above copyright notice,
 | |
|  *   this list of conditions and the following disclaimer in the documentation
 | |
|  *   and/or other materials provided with the distribution.
 | |
|  * - Neither the name of the MontaVista Software, Inc. nor the names of its
 | |
|  *   contributors may be used to endorse or promote products derived from this
 | |
|  *   software without specific prior written permission.
 | |
|  *
 | |
|  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 | |
|  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 | |
|  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 | |
|  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 | |
|  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 | |
|  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 | |
|  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 | |
|  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 | |
|  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 | |
|  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
 | |
|  * THE POSSIBILITY OF SUCH DAMAGE.
 | |
|  */
 | |
| /*
 | |
|  * Provides a closed process group API using the coroipcc executive
 | |
|  */
 | |
| 
 | |
| #include <config.h>
 | |
| 
 | |
| #include <stdlib.h>
 | |
| #include <stdio.h>
 | |
| #include <string.h>
 | |
| #include <unistd.h>
 | |
| #include <sys/types.h>
 | |
| #include <sys/socket.h>
 | |
| #include <sys/mman.h>
 | |
| #include <sys/uio.h>
 | |
| #include <sys/stat.h>
 | |
| #include <errno.h>
 | |
| #include <limits.h>
 | |
| 
 | |
| #include <qb/qblist.h>
 | |
| #include <qb/qbdefs.h>
 | |
| #include <qb/qbipcc.h>
 | |
| #include <qb/qblog.h>
 | |
| 
 | |
| #include <corosync/hdb.h>
 | |
| #include <corosync/corotypes.h>
 | |
| #include <corosync/corodefs.h>
 | |
| #include <corosync/cpg.h>
 | |
| #include <corosync/ipc_cpg.h>
 | |
| 
 | |
| #include "util.h"
 | |
| 
 | |
| #ifndef MAP_ANONYMOUS
 | |
| #define MAP_ANONYMOUS MAP_ANON
 | |
| #endif
 | |
| 
 | |
| /*
 | |
|  * Maximum number of times to retry a send when transmitting
 | |
|  * a large message fragment
 | |
|  */
 | |
| #define MAX_RETRIES 100
 | |
| 
 | |
| /*
 | |
|  * ZCB files have following umask (umask is same as used in libqb)
 | |
|  */
 | |
| #define CPG_MEMORY_MAP_UMASK		077
 | |
| 
 | |
| struct cpg_assembly_data
 | |
| {
 | |
| 	struct qb_list_head list;
 | |
| 	uint32_t nodeid;
 | |
| 	uint32_t pid;
 | |
| 	char *assembly_buf;
 | |
| 	uint32_t assembly_buf_ptr;
 | |
| };
 | |
| 
 | |
| struct cpg_inst {
 | |
| 	qb_ipcc_connection_t *c;
 | |
| 	int finalize;
 | |
| 	void *context;
 | |
| 	union {
 | |
| 		cpg_model_data_t model_data;
 | |
| 		cpg_model_v1_data_t model_v1_data;
 | |
| 	};
 | |
| 	struct qb_list_head iteration_list_head;
 | |
| 	uint32_t max_msg_size;
 | |
| 	struct qb_list_head assembly_list_head;
 | |
| };
 | |
| static void cpg_inst_free (void *inst);
 | |
| 
 | |
| DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free);
 | |
| 
 | |
| struct cpg_iteration_instance_t {
 | |
| 	cpg_iteration_handle_t cpg_iteration_handle;
 | |
| 	qb_ipcc_connection_t *conn;
 | |
| 	hdb_handle_t executive_iteration_handle;
 | |
| 	struct qb_list_head list;
 | |
| };
 | |
| 
 | |
| DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
 | |
| 
 | |
| 
 | |
| /*
 | |
|  * Internal (not visible by API) functions
 | |
|  */
 | |
| 
 | |
| static cs_error_t
 | |
| coroipcc_msg_send_reply_receive (
 | |
| 	qb_ipcc_connection_t *c,
 | |
| 	const struct iovec *iov,
 | |
| 	unsigned int iov_len,
 | |
| 	void *res_msg,
 | |
| 	size_t res_len)
 | |
| {
 | |
| 	return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
 | |
| 				CS_IPC_TIMEOUT_MS));
 | |
| }
 | |
| 
 | |
| static void cpg_iteration_instance_finalize (struct cpg_iteration_instance_t *cpg_iteration_instance)
 | |
| {
 | |
| 	qb_list_del (&cpg_iteration_instance->list);
 | |
| 	hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
 | |
| }
 | |
| 
 | |
| static void cpg_inst_free (void *inst)
 | |
| {
 | |
| 	struct cpg_inst *cpg_inst = (struct cpg_inst *)inst;
 | |
| 	qb_ipcc_disconnect(cpg_inst->c);
 | |
| }
 | |
| 
 | |
| static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
 | |
| {
 | |
| 	struct qb_list_head *iter, *tmp_iter;
 | |
| 	struct cpg_iteration_instance_t *cpg_iteration_instance;
 | |
| 
 | |
| 	/*
 | |
| 	 * Traverse thru iteration instances and delete them
 | |
| 	 */
 | |
| 	qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->iteration_list_head)) {
 | |
| 		cpg_iteration_instance = qb_list_entry (iter, struct cpg_iteration_instance_t, list);
 | |
| 
 | |
| 		cpg_iteration_instance_finalize (cpg_iteration_instance);
 | |
| 	}
 | |
| 	hdb_handle_destroy (&cpg_handle_t_db, handle);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @defgroup cpg_coroipcc The closed process group API
 | |
|  * @ingroup coroipcc
 | |
|  *
 | |
|  * @{
 | |
|  */
 | |
| 
 | |
| cs_error_t cpg_initialize (
 | |
| 	cpg_handle_t *handle,
 | |
| 	cpg_callbacks_t *callbacks)
 | |
| {
 | |
| 	cpg_model_v1_data_t model_v1_data;
 | |
| 
 | |
| 	memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
 | |
| 
 | |
| 	if (callbacks) {
 | |
| 		model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
 | |
| 		model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
 | |
| 	}
 | |
| 
 | |
| 	return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_model_initialize (
 | |
| 	cpg_handle_t *handle,
 | |
| 	cpg_model_t model,
 | |
| 	cpg_model_data_t *model_data,
 | |
| 	void *context)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 
 | |
| 	if (model != CPG_MODEL_V1) {
 | |
| 		error = CS_ERR_INVALID_PARAM;
 | |
| 		goto error_no_destroy;
 | |
| 	}
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_no_destroy;
 | |
| 	}
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_destroy;
 | |
| 	}
 | |
| 
 | |
| 	cpg_inst->c = qb_ipcc_connect ("cpg", IPC_REQUEST_SIZE);
 | |
| 	if (cpg_inst->c == NULL) {
 | |
| 		error = qb_to_cs_error(-errno);
 | |
| 		goto error_put_destroy;
 | |
| 	}
 | |
| 
 | |
| 	if (model_data != NULL) {
 | |
| 		switch (model) {
 | |
| 		case CPG_MODEL_V1:
 | |
| 			memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
 | |
| 			if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
 | |
| 				error = CS_ERR_INVALID_PARAM;
 | |
| 
 | |
| 				goto error_destroy;
 | |
| 			}
 | |
| 			break;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	/* Allow space for corosync internal headers */
 | |
| 	cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
 | |
| 	cpg_inst->model_data.model = model;
 | |
| 	cpg_inst->context = context;
 | |
| 
 | |
| 	qb_list_init(&cpg_inst->iteration_list_head);
 | |
| 
 | |
| 	qb_list_init(&cpg_inst->assembly_list_head);
 | |
| 
 | |
| 	hdb_handle_put (&cpg_handle_t_db, *handle);
 | |
| 
 | |
| 	return (CS_OK);
 | |
| 
 | |
| error_put_destroy:
 | |
| 	hdb_handle_put (&cpg_handle_t_db, *handle);
 | |
| error_destroy:
 | |
| 	hdb_handle_destroy (&cpg_handle_t_db, *handle);
 | |
| error_no_destroy:
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_finalize (
 | |
| 	cpg_handle_t handle)
 | |
| {
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 	struct iovec iov;
 | |
| 	struct req_lib_cpg_finalize req_lib_cpg_finalize;
 | |
| 	struct res_lib_cpg_finalize res_lib_cpg_finalize;
 | |
| 	cs_error_t error;
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	/*
 | |
| 	 * Another thread has already started finalizing
 | |
| 	 */
 | |
| 	if (cpg_inst->finalize) {
 | |
| 		hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 		return (CS_ERR_BAD_HANDLE);
 | |
| 	}
 | |
| 
 | |
| 	cpg_inst->finalize = 1;
 | |
| 
 | |
| 	/*
 | |
| 	 * Send service request
 | |
| 	 */
 | |
| 	req_lib_cpg_finalize.header.size = sizeof (struct req_lib_cpg_finalize);
 | |
| 	req_lib_cpg_finalize.header.id = MESSAGE_REQ_CPG_FINALIZE;
 | |
| 
 | |
| 	iov.iov_base = (void *)&req_lib_cpg_finalize;
 | |
| 	iov.iov_len = sizeof (struct req_lib_cpg_finalize);
 | |
| 
 | |
| 	error = coroipcc_msg_send_reply_receive (cpg_inst->c,
 | |
| 		&iov,
 | |
| 		1,
 | |
| 		&res_lib_cpg_finalize,
 | |
| 		sizeof (struct res_lib_cpg_finalize));
 | |
| 
 | |
| 	cpg_inst_finalize (cpg_inst, handle);
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_fd_get (
 | |
| 	cpg_handle_t handle,
 | |
| 	int *fd)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	error = qb_to_cs_error (qb_ipcc_fd_get (cpg_inst->c, fd));
 | |
| 
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_max_atomic_msgsize_get (
 | |
| 	cpg_handle_t handle,
 | |
| 	uint32_t *size)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	*size = cpg_inst->max_msg_size;
 | |
| 
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_context_get (
 | |
| 	cpg_handle_t handle,
 | |
| 	void **context)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	*context = cpg_inst->context;
 | |
| 
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (CS_OK);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_context_set (
 | |
| 	cpg_handle_t handle,
 | |
| 	void *context)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	cpg_inst->context = context;
 | |
| 
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (CS_OK);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_dispatch (
 | |
| 	cpg_handle_t handle,
 | |
| 	cs_dispatch_flags_t dispatch_types)
 | |
| {
 | |
| 	int timeout = -1;
 | |
| 	cs_error_t error;
 | |
| 	int cont = 1; /* always continue do loop except when set to 0 */
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
 | |
| 	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
 | |
| 	struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
 | |
| 	struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
 | |
| 	struct cpg_inst cpg_inst_copy;
 | |
| 	struct qb_ipc_response_header *dispatch_data;
 | |
| 	struct cpg_address member_list[CPG_MEMBERS_MAX];
 | |
| 	struct cpg_address left_list[CPG_MEMBERS_MAX];
 | |
| 	struct cpg_address joined_list[CPG_MEMBERS_MAX];
 | |
| 	struct cpg_name group_name;
 | |
| 	struct cpg_assembly_data *assembly_data;
 | |
| 	struct qb_list_head *iter, *tmp_iter;
 | |
| 	mar_cpg_address_t *left_list_start;
 | |
| 	mar_cpg_address_t *joined_list_start;
 | |
| 	unsigned int i;
 | |
| 	struct cpg_ring_id ring_id;
 | |
| 	uint32_t totem_member_list[CPG_MEMBERS_MAX];
 | |
| 	int32_t errno_res;
 | |
| 	char dispatch_buf[IPC_DISPATCH_SIZE];
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	/*
 | |
| 	 * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
 | |
| 	 * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
 | |
| 	 */
 | |
| 	if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
 | |
| 		timeout = 0;
 | |
| 	}
 | |
| 
 | |
| 	dispatch_data = (struct qb_ipc_response_header *)dispatch_buf;
 | |
| 	do {
 | |
| 		errno_res = qb_ipcc_event_recv (
 | |
| 			cpg_inst->c,
 | |
| 			dispatch_buf,
 | |
| 			IPC_DISPATCH_SIZE,
 | |
| 			timeout);
 | |
| 		error = qb_to_cs_error (errno_res);
 | |
| 		if (error == CS_ERR_BAD_HANDLE) {
 | |
| 			error = CS_OK;
 | |
| 			goto error_put;
 | |
| 		}
 | |
| 		if (error == CS_ERR_TRY_AGAIN) {
 | |
| 			if (dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
 | |
| 				/*
 | |
| 				 * Don't mask error
 | |
| 				 */
 | |
| 				goto error_put;
 | |
| 			}
 | |
| 			error = CS_OK;
 | |
| 			if (dispatch_types == CS_DISPATCH_ALL) {
 | |
| 				break; /* exit do while cont is 1 loop */
 | |
| 			} else {
 | |
| 				continue; /* next poll */
 | |
| 			}
 | |
| 		}
 | |
| 		if (error != CS_OK) {
 | |
| 			goto error_put;
 | |
| 		}
 | |
| 
 | |
| 		/*
 | |
| 		 * Make copy of callbacks, message data, unlock instance, and call callback
 | |
| 		 * A risk of this dispatch method is that the callback routines may
 | |
| 		 * operate at the same time that cpgFinalize has been called.
 | |
| 		 */
 | |
| 		memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
 | |
| 		switch (cpg_inst_copy.model_data.model) {
 | |
| 		case CPG_MODEL_V1:
 | |
| 			/*
 | |
| 			 * Dispatch incoming message
 | |
| 			 */
 | |
| 			switch (dispatch_data->id) {
 | |
| 			case MESSAGE_RES_CPG_DELIVER_CALLBACK:
 | |
| 				if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
 | |
| 					break;
 | |
| 				}
 | |
| 
 | |
| 				res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
 | |
| 
 | |
| 				marshall_from_mar_cpg_name_t (
 | |
| 					&group_name,
 | |
| 					&res_cpg_deliver_callback->group_name);
 | |
| 
 | |
| 				cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
 | |
| 					&group_name,
 | |
| 					res_cpg_deliver_callback->nodeid,
 | |
| 					res_cpg_deliver_callback->pid,
 | |
| 					&res_cpg_deliver_callback->message,
 | |
| 					res_cpg_deliver_callback->msglen);
 | |
| 				break;
 | |
| 
 | |
| 			case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK:
 | |
| 				res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
 | |
| 
 | |
| 				marshall_from_mar_cpg_name_t (
 | |
| 					&group_name,
 | |
| 					&res_cpg_partial_deliver_callback->group_name);
 | |
| 
 | |
| 				/*
 | |
| 				 * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
 | |
| 				 */
 | |
| 				assembly_data = NULL;
 | |
| 				qb_list_for_each(iter, &(cpg_inst->assembly_list_head)) {
 | |
| 					struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
 | |
| 					if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
 | |
| 						assembly_data = current_assembly_data;
 | |
| 						break;
 | |
| 					}
 | |
| 				}
 | |
| 
 | |
| 				if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
 | |
| 
 | |
| 					/*
 | |
| 					 * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly.
 | |
| 					 * Otherwise the sending of packet must have been interrupted and error should have
 | |
| 					 * been reported to sending client. Therefore here last assembly will be dropped.
 | |
| 					 */
 | |
| 					if (assembly_data) {
 | |
| 						qb_list_del (&assembly_data->list);
 | |
| 						free(assembly_data->assembly_buf);
 | |
| 						free(assembly_data);
 | |
| 						assembly_data = NULL;
 | |
| 					}
 | |
| 
 | |
| 					assembly_data = malloc(sizeof(struct cpg_assembly_data));
 | |
| 					if (!assembly_data) {
 | |
| 						error = CS_ERR_NO_MEMORY;
 | |
| 						goto error_put;
 | |
| 					}
 | |
| 
 | |
| 					assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
 | |
| 					assembly_data->pid = res_cpg_partial_deliver_callback->pid;
 | |
| 					assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
 | |
| 					if (!assembly_data->assembly_buf) {
 | |
| 						free(assembly_data);
 | |
| 						error = CS_ERR_NO_MEMORY;
 | |
| 						goto error_put;
 | |
| 					}
 | |
| 					assembly_data->assembly_buf_ptr = 0;
 | |
| 					qb_list_init (&assembly_data->list);
 | |
| 
 | |
| 					qb_list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
 | |
| 				}
 | |
| 				if (assembly_data) {
 | |
| 					memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
 | |
| 						res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
 | |
| 					assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
 | |
| 
 | |
| 					if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
 | |
| 						cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
 | |
| 							&group_name,
 | |
| 							res_cpg_partial_deliver_callback->nodeid,
 | |
| 							res_cpg_partial_deliver_callback->pid,
 | |
| 							assembly_data->assembly_buf,
 | |
| 							res_cpg_partial_deliver_callback->msglen);
 | |
| 
 | |
| 						qb_list_del (&assembly_data->list);
 | |
| 						free(assembly_data->assembly_buf);
 | |
| 						free(assembly_data);
 | |
| 					}
 | |
| 				}
 | |
| 				break;
 | |
| 
 | |
| 			case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
 | |
| 				if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
 | |
| 					break;
 | |
| 				}
 | |
| 
 | |
| 				res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
 | |
| 
 | |
| 				for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
 | |
| 					marshall_from_mar_cpg_address_t (&member_list[i],
 | |
| 						&res_cpg_confchg_callback->member_list[i]);
 | |
| 				}
 | |
| 				left_list_start = res_cpg_confchg_callback->member_list +
 | |
| 					res_cpg_confchg_callback->member_list_entries;
 | |
| 				for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
 | |
| 					marshall_from_mar_cpg_address_t (&left_list[i],
 | |
| 						&left_list_start[i]);
 | |
| 				}
 | |
| 				joined_list_start = res_cpg_confchg_callback->member_list +
 | |
| 					res_cpg_confchg_callback->member_list_entries +
 | |
| 					res_cpg_confchg_callback->left_list_entries;
 | |
| 				for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
 | |
| 					marshall_from_mar_cpg_address_t (&joined_list[i],
 | |
| 						&joined_list_start[i]);
 | |
| 				}
 | |
| 				marshall_from_mar_cpg_name_t (
 | |
| 					&group_name,
 | |
| 					&res_cpg_confchg_callback->group_name);
 | |
| 
 | |
| 				cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
 | |
| 					&group_name,
 | |
| 					member_list,
 | |
| 					res_cpg_confchg_callback->member_list_entries,
 | |
| 					left_list,
 | |
| 					res_cpg_confchg_callback->left_list_entries,
 | |
| 					joined_list,
 | |
| 					res_cpg_confchg_callback->joined_list_entries);
 | |
| 
 | |
| 				/*
 | |
| 				 * If member left while his partial packet was being assembled, assembly data must be removed from list
 | |
| 				 */
 | |
| 				for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
 | |
| 					qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->assembly_list_head)) {
 | |
| 						struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
 | |
| 						if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
 | |
| 							continue;
 | |
| 
 | |
| 						qb_list_del (¤t_assembly_data->list);
 | |
| 						free(current_assembly_data->assembly_buf);
 | |
| 						free(current_assembly_data);
 | |
| 					}
 | |
| 				}
 | |
| 
 | |
| 				break;
 | |
| 			case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
 | |
| 				if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
 | |
| 					break;
 | |
| 				}
 | |
| 
 | |
| 				res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
 | |
| 
 | |
| 				marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
 | |
| 				for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
 | |
| 					totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
 | |
| 				}
 | |
| 
 | |
| 				cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
 | |
| 					ring_id,
 | |
| 					res_cpg_totem_confchg_callback->member_list_entries,
 | |
| 					totem_member_list);
 | |
| 				break;
 | |
| 			default:
 | |
| 				error = CS_ERR_LIBRARY;
 | |
| 				goto error_put;
 | |
| 				break;
 | |
| 			} /* - switch (dispatch_data->id) */
 | |
| 			break; /* case CPG_MODEL_V1 */
 | |
| 		} /* - switch (cpg_inst_copy.model_data.model) */
 | |
| 
 | |
| 		if (cpg_inst_copy.finalize || cpg_inst->finalize) {
 | |
| 			/*
 | |
| 			 * If the finalize has been called then get out of the dispatch.
 | |
| 			 */
 | |
| 			cpg_inst->finalize = 1;
 | |
| 			error = CS_ERR_BAD_HANDLE;
 | |
| 			goto error_put;
 | |
| 		}
 | |
| 
 | |
| 		/*
 | |
| 		 * Determine if more messages should be processed
 | |
| 		 */
 | |
| 		if (dispatch_types == CS_DISPATCH_ONE || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
 | |
| 			cont = 0;
 | |
| 		}
 | |
| 	} while (cont);
 | |
| 
 | |
| error_put:
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_join (
 | |
|     cpg_handle_t handle,
 | |
|     const struct cpg_name *group)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 	struct iovec iov[2];
 | |
| 	struct req_lib_cpg_join req_lib_cpg_join;
 | |
| 	struct res_lib_cpg_join response;
 | |
| 
 | |
| 	if (group->length > CPG_MAX_NAME_LENGTH) {
 | |
| 		return (CS_ERR_NAME_TOO_LONG);
 | |
| 	}
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	/* Now join */
 | |
| 	req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
 | |
| 	req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
 | |
| 	req_lib_cpg_join.pid = getpid();
 | |
| 	req_lib_cpg_join.flags = 0;
 | |
| 
 | |
| 	switch (cpg_inst->model_data.model) {
 | |
| 	case CPG_MODEL_V1:
 | |
| 		req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
 | |
| 		break;
 | |
| 	}
 | |
| 
 | |
| 	marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
 | |
| 		group);
 | |
| 
 | |
| 	iov[0].iov_base = (void *)&req_lib_cpg_join;
 | |
| 	iov[0].iov_len = sizeof (struct req_lib_cpg_join);
 | |
| 
 | |
| 	do {
 | |
| 		error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
 | |
| 			&response, sizeof (struct res_lib_cpg_join));
 | |
| 
 | |
| 		if (error != CS_OK) {
 | |
| 			goto error_exit;
 | |
| 		}
 | |
| 	} while (response.header.error == CS_ERR_BUSY);
 | |
| 
 | |
| 	error = response.header.error;
 | |
| 
 | |
| error_exit:
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_leave (
 | |
|     cpg_handle_t handle,
 | |
|     const struct cpg_name *group)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 	struct iovec iov[2];
 | |
| 	struct req_lib_cpg_leave req_lib_cpg_leave;
 | |
| 	struct res_lib_cpg_leave res_lib_cpg_leave;
 | |
| 
 | |
|         if (group->length > CPG_MAX_NAME_LENGTH) {
 | |
| 		return (CS_ERR_NAME_TOO_LONG);
 | |
|         }
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave);
 | |
| 	req_lib_cpg_leave.header.id = MESSAGE_REQ_CPG_LEAVE;
 | |
| 	req_lib_cpg_leave.pid = getpid();
 | |
| 	marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
 | |
| 		group);
 | |
| 
 | |
| 	iov[0].iov_base = (void *)&req_lib_cpg_leave;
 | |
| 	iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
 | |
| 
 | |
| 	do {
 | |
| 		error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
 | |
| 			&res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
 | |
| 
 | |
| 		if (error != CS_OK) {
 | |
| 			goto error_exit;
 | |
| 		}
 | |
| 	} while (res_lib_cpg_leave.header.error == CS_ERR_BUSY);
 | |
| 
 | |
| 	error = res_lib_cpg_leave.header.error;
 | |
| 
 | |
| error_exit:
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_membership_get (
 | |
| 	cpg_handle_t handle,
 | |
| 	struct cpg_name *group_name,
 | |
| 	struct cpg_address *member_list,
 | |
| 	int *member_list_entries)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 	struct iovec iov;
 | |
| 	struct req_lib_cpg_membership_get req_lib_cpg_membership_get;
 | |
| 	struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
 | |
| 	unsigned int i;
 | |
| 
 | |
| 	if (group_name->length > CPG_MAX_NAME_LENGTH) {
 | |
| 		return (CS_ERR_NAME_TOO_LONG);
 | |
| 	}
 | |
| 	if (member_list == NULL) {
 | |
| 		return (CS_ERR_INVALID_PARAM);
 | |
| 	}
 | |
| 	if (member_list_entries == NULL) {
 | |
| 		return (CS_ERR_INVALID_PARAM);
 | |
| 	}
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	req_lib_cpg_membership_get.header.size = sizeof (struct req_lib_cpg_membership_get);
 | |
| 	req_lib_cpg_membership_get.header.id = MESSAGE_REQ_CPG_MEMBERSHIP;
 | |
| 
 | |
| 	marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
 | |
| 		group_name);
 | |
| 
 | |
| 	iov.iov_base = (void *)&req_lib_cpg_membership_get;
 | |
| 	iov.iov_len = sizeof (struct req_lib_cpg_membership_get);
 | |
| 
 | |
| 	error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
 | |
| 			&res_lib_cpg_membership_get, sizeof (res_lib_cpg_membership_get));
 | |
| 
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_exit;
 | |
| 	}
 | |
| 
 | |
| 	error = res_lib_cpg_membership_get.header.error;
 | |
| 
 | |
| 	/*
 | |
| 	 * Copy results to caller
 | |
| 	 */
 | |
| 	*member_list_entries = res_lib_cpg_membership_get.member_count;
 | |
| 	if (member_list) {
 | |
| 		for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
 | |
| 			marshall_from_mar_cpg_address_t (&member_list[i],
 | |
| 				&res_lib_cpg_membership_get.member_list[i]);
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| error_exit:
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_local_get (
 | |
| 	cpg_handle_t handle,
 | |
| 	unsigned int *local_nodeid)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 	struct iovec iov;
 | |
| 	struct req_lib_cpg_local_get req_lib_cpg_local_get;
 | |
| 	struct res_lib_cpg_local_get res_lib_cpg_local_get;
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	req_lib_cpg_local_get.header.size = sizeof (struct qb_ipc_request_header);
 | |
| 	req_lib_cpg_local_get.header.id = MESSAGE_REQ_CPG_LOCAL_GET;
 | |
| 
 | |
| 	iov.iov_base = (void *)&req_lib_cpg_local_get;
 | |
| 	iov.iov_len = sizeof (struct req_lib_cpg_local_get);
 | |
| 
 | |
| 	error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
 | |
| 		&res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
 | |
| 
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_exit;
 | |
| 	}
 | |
| 
 | |
| 	error = res_lib_cpg_local_get.header.error;
 | |
| 
 | |
| 	*local_nodeid = res_lib_cpg_local_get.local_nodeid;
 | |
| 
 | |
| error_exit:
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_flow_control_state_get (
 | |
| 	cpg_handle_t handle,
 | |
| 	cpg_flow_control_state_t *flow_control_state)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 	*flow_control_state = CPG_FLOW_CONTROL_DISABLED;
 | |
| 	error = CS_OK;
 | |
| 
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| static int
 | |
| memory_map (char *path, const char *file, void **buf, size_t bytes)
 | |
| {
 | |
| 	int32_t fd;
 | |
| 	void *addr;
 | |
| 	int32_t res;
 | |
| 	char *buffer;
 | |
| 	int32_t i;
 | |
| 	size_t written;
 | |
| 	size_t page_size; 
 | |
| 	long int sysconf_page_size;
 | |
| 	mode_t old_umask;
 | |
| 
 | |
| 	snprintf (path, PATH_MAX, "/dev/shm/%s", file);
 | |
| 
 | |
| 	old_umask = umask(CPG_MEMORY_MAP_UMASK);
 | |
| 	fd = mkstemp (path);
 | |
| 	(void)umask(old_umask);
 | |
| 	if (fd == -1) {
 | |
| 		snprintf (path, PATH_MAX, LOCALSTATEDIR "/run/%s", file);
 | |
| 		old_umask = umask(CPG_MEMORY_MAP_UMASK);
 | |
| 		fd = mkstemp (path);
 | |
| 		(void)umask(old_umask);
 | |
| 		if (fd == -1) {
 | |
| 			return (-1);
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	res = ftruncate (fd, bytes);
 | |
| 	if (res == -1) {
 | |
| 		goto error_close_unlink;
 | |
| 	}
 | |
| 	sysconf_page_size = sysconf(_SC_PAGESIZE);
 | |
| 	if (sysconf_page_size <= 0) {
 | |
| 		goto error_close_unlink;
 | |
| 	}
 | |
| 	page_size = sysconf_page_size;
 | |
| 	buffer = malloc (page_size);
 | |
| 	if (buffer == NULL) {
 | |
| 		goto error_close_unlink;
 | |
| 	}
 | |
| 	memset (buffer, 0, page_size);
 | |
| 	for (i = 0; i < (bytes / page_size); i++) {
 | |
| retry_write:
 | |
| 		written = write (fd, buffer, page_size);
 | |
| 		if (written == -1 && errno == EINTR) {
 | |
| 			goto retry_write;
 | |
| 		}
 | |
| 		if (written != page_size) {
 | |
| 			free (buffer);
 | |
| 			goto error_close_unlink;
 | |
| 		}
 | |
| 	}
 | |
| 	free (buffer);
 | |
| 
 | |
| 	addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
 | |
| 		MAP_SHARED, fd, 0);
 | |
| 
 | |
| 	if (addr == MAP_FAILED) {
 | |
| 		goto error_close_unlink;
 | |
| 	}
 | |
| #ifdef MADV_NOSYNC
 | |
| 	madvise(addr, bytes, MADV_NOSYNC);
 | |
| #endif
 | |
| 
 | |
| 	res = close (fd);
 | |
| 	if (res) {
 | |
| 		munmap(addr, bytes);
 | |
| 
 | |
| 		return (-1);
 | |
| 	}
 | |
| 	*buf = addr;
 | |
| 
 | |
| 	return 0;
 | |
| 
 | |
| error_close_unlink:
 | |
| 	close (fd);
 | |
| 	unlink(path);
 | |
| 	return -1;
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_zcb_alloc (
 | |
| 	cpg_handle_t handle,
 | |
| 	size_t size,
 | |
| 	void **buffer)
 | |
| {
 | |
| 	void *buf = NULL;
 | |
| 	char path[PATH_MAX];
 | |
| 	mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc;
 | |
| 	struct qb_ipc_response_header res_coroipcs_zc_alloc;
 | |
| 	size_t map_size;
 | |
| 	struct iovec iovec;
 | |
| 	struct coroipcs_zc_header *hdr;
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	map_size = size + sizeof (struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
 | |
| 	assert(memory_map (path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
 | |
| 
 | |
| 	if (strlen(path) >= CPG_ZC_PATH_LEN) {
 | |
| 		unlink(path);
 | |
| 		munmap (buf, map_size);
 | |
| 		return (CS_ERR_NAME_TOO_LONG);
 | |
| 	}
 | |
| 
 | |
| 	req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
 | |
| 	req_coroipcc_zc_alloc.header.id = MESSAGE_REQ_CPG_ZC_ALLOC;
 | |
| 	req_coroipcc_zc_alloc.map_size = map_size;
 | |
| 	strcpy (req_coroipcc_zc_alloc.path_to_file, path);
 | |
| 
 | |
| 	iovec.iov_base = (void *)&req_coroipcc_zc_alloc;
 | |
| 	iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
 | |
| 
 | |
| 	error = coroipcc_msg_send_reply_receive (
 | |
| 		cpg_inst->c,
 | |
| 		&iovec,
 | |
| 		1,
 | |
| 		&res_coroipcs_zc_alloc,
 | |
| 		sizeof (struct qb_ipc_response_header));
 | |
| 
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_exit;
 | |
| 	}
 | |
| 
 | |
| 	hdr = (struct coroipcs_zc_header *)buf;
 | |
| 	hdr->map_size = map_size;
 | |
| 	*buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header) + sizeof (struct req_lib_cpg_mcast);
 | |
| 
 | |
| error_exit:
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_zcb_free (
 | |
| 	cpg_handle_t handle,
 | |
| 	void *buffer)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	unsigned int res;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 	mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
 | |
| 	struct qb_ipc_response_header res_coroipcs_zc_free;
 | |
| 	struct iovec iovec;
 | |
| 	struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header) - sizeof (struct req_lib_cpg_mcast));
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
 | |
| 	req_coroipcc_zc_free.header.id = MESSAGE_REQ_CPG_ZC_FREE;
 | |
| 	req_coroipcc_zc_free.map_size = header->map_size;
 | |
| 	req_coroipcc_zc_free.server_address = header->server_address;
 | |
| 
 | |
| 	iovec.iov_base = (void *)&req_coroipcc_zc_free;
 | |
| 	iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
 | |
| 
 | |
| 	error = coroipcc_msg_send_reply_receive (
 | |
| 		cpg_inst->c,
 | |
| 		&iovec,
 | |
| 		1,
 | |
| 		&res_coroipcs_zc_free,
 | |
| 		sizeof (struct qb_ipc_response_header));
 | |
| 
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_exit;
 | |
| 	}
 | |
| 
 | |
| 	res = munmap ((void *)header, header->map_size);
 | |
| 	if (res == -1) {
 | |
| 		error = qb_to_cs_error(-errno);
 | |
| 
 | |
| 		goto error_exit;
 | |
| 	}
 | |
| 
 | |
| error_exit:
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_zcb_mcast_joined (
 | |
| 	cpg_handle_t handle,
 | |
| 	cpg_guarantee_t guarantee,
 | |
| 	void *msg,
 | |
| 	size_t msg_len)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 	struct req_lib_cpg_mcast *req_lib_cpg_mcast;
 | |
| 	struct res_lib_cpg_mcast res_lib_cpg_mcast;
 | |
| 	mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
 | |
| 	struct coroipcs_zc_header *hdr;
 | |
| 	struct iovec iovec;
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	if (msg_len > IPC_REQUEST_SIZE) {
 | |
| 		error = CS_ERR_TOO_BIG;
 | |
| 		goto error_exit;
 | |
| 	}
 | |
| 
 | |
| 	req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
 | |
| 	req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
 | |
| 		msg_len;
 | |
| 
 | |
| 	req_lib_cpg_mcast->header.id = MESSAGE_REQ_CPG_MCAST;
 | |
| 	req_lib_cpg_mcast->guarantee = guarantee;
 | |
| 	req_lib_cpg_mcast->msglen = msg_len;
 | |
| 
 | |
| 	hdr = (struct coroipcs_zc_header *)(((char *)req_lib_cpg_mcast) - sizeof (struct coroipcs_zc_header));
 | |
| 
 | |
| 	req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
 | |
| 	req_coroipcc_zc_execute.header.id = MESSAGE_REQ_CPG_ZC_EXECUTE;
 | |
| 	req_coroipcc_zc_execute.server_address = hdr->server_address;
 | |
| 
 | |
| 	iovec.iov_base = (void *)&req_coroipcc_zc_execute;
 | |
| 	iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
 | |
| 
 | |
| 	error = coroipcc_msg_send_reply_receive (
 | |
| 		cpg_inst->c,
 | |
| 		&iovec,
 | |
| 		1,
 | |
| 		&res_lib_cpg_mcast,
 | |
| 		sizeof(res_lib_cpg_mcast));
 | |
| 
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_exit;
 | |
| 	}
 | |
| 
 | |
| 	error = res_lib_cpg_mcast.header.error;
 | |
| 
 | |
| error_exit:
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| static cs_error_t send_fragments (
 | |
| 	struct cpg_inst *cpg_inst,
 | |
| 	cpg_guarantee_t guarantee,
 | |
| 	size_t msg_len,
 | |
| 	const struct iovec *iovec,
 | |
| 	unsigned int iov_len)
 | |
| {
 | |
| 	int i;
 | |
| 	cs_error_t error = CS_OK;
 | |
| 	struct iovec iov[2];
 | |
| 	struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
 | |
| 	struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
 | |
| 	size_t sent = 0;
 | |
| 	size_t iov_sent = 0;
 | |
| 	int retry_count;
 | |
| 
 | |
| 	req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST;
 | |
| 	req_lib_cpg_mcast.guarantee = guarantee;
 | |
| 	req_lib_cpg_mcast.msglen = msg_len;
 | |
| 
 | |
| 	iov[0].iov_base = (void *)&req_lib_cpg_mcast;
 | |
| 	iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
 | |
| 
 | |
| 	i=0;
 | |
| 	iov_sent = 0 ;
 | |
| 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  2);
 | |
| 
 | |
| 	while (error == CS_OK && sent < msg_len) {
 | |
| 
 | |
| 		retry_count = 0;
 | |
| 		if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
 | |
| 			iov[1].iov_len = cpg_inst->max_msg_size;
 | |
| 		}
 | |
| 		else {
 | |
| 			iov[1].iov_len = iovec[i].iov_len - iov_sent;
 | |
| 		}
 | |
| 
 | |
| 		if (sent == 0) {
 | |
| 			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
 | |
| 		}
 | |
| 		else if ((sent + iov[1].iov_len) == msg_len) {
 | |
| 			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
 | |
| 		}
 | |
| 		else {
 | |
| 			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
 | |
| 		}
 | |
| 
 | |
| 		req_lib_cpg_mcast.fraglen = iov[1].iov_len;
 | |
| 		req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
 | |
| 		iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
 | |
| 
 | |
| 	resend:
 | |
| 		error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
 | |
| 							 &res_lib_cpg_partial_send,
 | |
| 							 sizeof (res_lib_cpg_partial_send));
 | |
| 
 | |
| 		if (error == CS_ERR_TRY_AGAIN) {
 | |
| 			fprintf(stderr, "sleep. counter=%d\n", retry_count);
 | |
| 			if (++retry_count > MAX_RETRIES) {
 | |
| 				goto error_exit;
 | |
| 			}
 | |
| 			usleep(10000);
 | |
| 			goto resend;
 | |
| 		}
 | |
| 
 | |
| 		iov_sent += iov[1].iov_len;
 | |
| 		sent += iov[1].iov_len;
 | |
| 
 | |
| 		/* Next iovec */
 | |
| 		if (iov_sent >= iovec[i].iov_len) {
 | |
| 			i++;
 | |
| 			iov_sent = 0;
 | |
| 		}
 | |
| 		error = res_lib_cpg_partial_send.header.error;
 | |
| 	}
 | |
| error_exit:
 | |
| 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
 | |
| 
 | |
| 	return error;
 | |
| }
 | |
| 
 | |
| 
 | |
| cs_error_t cpg_mcast_joined (
 | |
| 	cpg_handle_t handle,
 | |
| 	cpg_guarantee_t guarantee,
 | |
| 	const struct iovec *iovec,
 | |
| 	unsigned int iov_len)
 | |
| {
 | |
| 	int i;
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 	struct iovec iov[64];
 | |
| 	struct req_lib_cpg_mcast req_lib_cpg_mcast;
 | |
| 	size_t msg_len = 0;
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	for (i = 0; i < iov_len; i++ ) {
 | |
| 		msg_len += iovec[i].iov_len;
 | |
| 	}
 | |
| 
 | |
| 	if (msg_len > cpg_inst->max_msg_size) {
 | |
| 		error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
 | |
| 		goto error_exit;
 | |
| 	}
 | |
| 
 | |
| 	req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
 | |
| 		msg_len;
 | |
| 
 | |
| 	req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST;
 | |
| 	req_lib_cpg_mcast.guarantee = guarantee;
 | |
| 	req_lib_cpg_mcast.msglen = msg_len;
 | |
| 
 | |
| 	iov[0].iov_base = (void *)&req_lib_cpg_mcast;
 | |
| 	iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
 | |
| 	memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
 | |
| 
 | |
| 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  2);
 | |
| 	error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
 | |
| 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
 | |
| 
 | |
| error_exit:
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_iteration_initialize(
 | |
| 	cpg_handle_t handle,
 | |
| 	cpg_iteration_type_t iteration_type,
 | |
| 	const struct cpg_name *group,
 | |
| 	cpg_iteration_handle_t *cpg_iteration_handle)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct iovec iov;
 | |
| 	struct cpg_inst *cpg_inst;
 | |
| 	struct cpg_iteration_instance_t *cpg_iteration_instance;
 | |
| 	struct req_lib_cpg_iterationinitialize req_lib_cpg_iterationinitialize;
 | |
| 	struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
 | |
| 
 | |
| 	if (group && group->length > CPG_MAX_NAME_LENGTH) {
 | |
| 		return (CS_ERR_NAME_TOO_LONG);
 | |
| 	}
 | |
| 	if (cpg_iteration_handle == NULL) {
 | |
| 		return (CS_ERR_INVALID_PARAM);
 | |
| 	}
 | |
| 
 | |
| 	if ((iteration_type == CPG_ITERATION_ONE_GROUP && group == NULL) ||
 | |
| 		(iteration_type != CPG_ITERATION_ONE_GROUP && group != NULL)) {
 | |
| 		return (CS_ERR_INVALID_PARAM);
 | |
| 	}
 | |
| 
 | |
| 	if (iteration_type != CPG_ITERATION_NAME_ONLY && iteration_type != CPG_ITERATION_ONE_GROUP &&
 | |
| 	    iteration_type != CPG_ITERATION_ALL) {
 | |
| 
 | |
| 		return (CS_ERR_INVALID_PARAM);
 | |
| 	}
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 | |
| 	if (error != CS_OK) {
 | |
| 		return (error);
 | |
| 	}
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
 | |
| 		sizeof (struct cpg_iteration_instance_t), cpg_iteration_handle));
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_put_cpg_db;
 | |
| 	}
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
 | |
| 		(void *)&cpg_iteration_instance));
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_destroy;
 | |
| 	}
 | |
| 
 | |
| 	cpg_iteration_instance->conn = cpg_inst->c;
 | |
| 
 | |
| 	qb_list_init (&cpg_iteration_instance->list);
 | |
| 
 | |
| 	req_lib_cpg_iterationinitialize.header.size = sizeof (struct req_lib_cpg_iterationinitialize);
 | |
| 	req_lib_cpg_iterationinitialize.header.id = MESSAGE_REQ_CPG_ITERATIONINITIALIZE;
 | |
| 	req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
 | |
| 	if (group) {
 | |
| 		marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
 | |
| 	}
 | |
| 
 | |
| 	iov.iov_base = (void *)&req_lib_cpg_iterationinitialize;
 | |
| 	iov.iov_len = sizeof (struct req_lib_cpg_iterationinitialize);
 | |
| 
 | |
| 	error = coroipcc_msg_send_reply_receive (cpg_inst->c,
 | |
| 		&iov,
 | |
| 		1,
 | |
| 		&res_lib_cpg_iterationinitialize,
 | |
| 		sizeof (struct res_lib_cpg_iterationinitialize));
 | |
| 
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_put_destroy;
 | |
| 	}
 | |
| 
 | |
| 	cpg_iteration_instance->executive_iteration_handle =
 | |
| 		res_lib_cpg_iterationinitialize.iteration_handle;
 | |
| 	cpg_iteration_instance->cpg_iteration_handle = *cpg_iteration_handle;
 | |
| 
 | |
| 	qb_list_add (&cpg_iteration_instance->list, &cpg_inst->iteration_list_head);
 | |
| 
 | |
| 	hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (res_lib_cpg_iterationinitialize.header.error);
 | |
| 
 | |
| error_put_destroy:
 | |
| 	hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
 | |
| error_destroy:
 | |
| 	hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
 | |
| error_put_cpg_db:
 | |
| 	hdb_handle_put (&cpg_handle_t_db, handle);
 | |
| 
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_iteration_next(
 | |
| 	cpg_iteration_handle_t handle,
 | |
| 	struct cpg_iteration_description_t *description)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct cpg_iteration_instance_t *cpg_iteration_instance;
 | |
| 	struct req_lib_cpg_iterationnext req_lib_cpg_iterationnext;
 | |
| 	struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
 | |
| 
 | |
| 	if (description == NULL) {
 | |
| 		return CS_ERR_INVALID_PARAM;
 | |
| 	}
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
 | |
| 		(void *)&cpg_iteration_instance));
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_exit;
 | |
| 	}
 | |
| 
 | |
| 	req_lib_cpg_iterationnext.header.size = sizeof (struct req_lib_cpg_iterationnext);
 | |
| 	req_lib_cpg_iterationnext.header.id = MESSAGE_REQ_CPG_ITERATIONNEXT;
 | |
| 	req_lib_cpg_iterationnext.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
 | |
| 
 | |
| 	error = qb_to_cs_error (qb_ipcc_send (cpg_iteration_instance->conn,
 | |
| 				&req_lib_cpg_iterationnext,
 | |
| 				req_lib_cpg_iterationnext.header.size));
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_put;
 | |
| 	}
 | |
| 
 | |
| 	error = qb_to_cs_error (qb_ipcc_recv (cpg_iteration_instance->conn,
 | |
| 				&res_lib_cpg_iterationnext,
 | |
| 				sizeof(struct res_lib_cpg_iterationnext), -1));
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_put;
 | |
| 	}
 | |
| 
 | |
| 	marshall_from_mar_cpg_iteration_description_t(
 | |
| 			description,
 | |
| 			&res_lib_cpg_iterationnext.description);
 | |
| 
 | |
| 	error = res_lib_cpg_iterationnext.header.error;
 | |
| 
 | |
| error_put:
 | |
| 	hdb_handle_put (&cpg_iteration_handle_t_db, handle);
 | |
| 
 | |
| error_exit:
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| cs_error_t cpg_iteration_finalize (
 | |
| 	cpg_iteration_handle_t handle)
 | |
| {
 | |
| 	cs_error_t error;
 | |
| 	struct iovec iov;
 | |
| 	struct cpg_iteration_instance_t *cpg_iteration_instance;
 | |
| 	struct req_lib_cpg_iterationfinalize req_lib_cpg_iterationfinalize;
 | |
| 	struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
 | |
| 
 | |
| 	error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
 | |
| 		(void *)&cpg_iteration_instance));
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_exit;
 | |
| 	}
 | |
| 
 | |
| 	req_lib_cpg_iterationfinalize.header.size = sizeof (struct req_lib_cpg_iterationfinalize);
 | |
| 	req_lib_cpg_iterationfinalize.header.id = MESSAGE_REQ_CPG_ITERATIONFINALIZE;
 | |
| 	req_lib_cpg_iterationfinalize.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
 | |
| 
 | |
| 	iov.iov_base = (void *)&req_lib_cpg_iterationfinalize;
 | |
| 	iov.iov_len = sizeof (struct req_lib_cpg_iterationfinalize);
 | |
| 
 | |
| 	error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->conn,
 | |
| 		&iov,
 | |
| 		1,
 | |
| 		&res_lib_cpg_iterationfinalize,
 | |
| 		sizeof (struct req_lib_cpg_iterationfinalize));
 | |
| 
 | |
| 	if (error != CS_OK) {
 | |
| 		goto error_put;
 | |
| 	}
 | |
| 
 | |
| 	cpg_iteration_instance_finalize (cpg_iteration_instance);
 | |
| 	hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
 | |
| 
 | |
| 	return (res_lib_cpg_iterationfinalize.header.error);
 | |
| 
 | |
| error_put:
 | |
| 	hdb_handle_put (&cpg_iteration_handle_t_db, handle);
 | |
| error_exit:
 | |
| 	return (error);
 | |
| }
 | |
| 
 | |
| /** @} */
 |