diff --git a/exec/coroipcs.c b/exec/coroipcs.c index d510d986..7ae50767 100644 --- a/exec/coroipcs.c +++ b/exec/coroipcs.c @@ -91,6 +91,12 @@ struct outq_item { struct list_head list; }; +struct zcb_mapped { + struct list_head list; + void *addr; + size_t size; +}; + #if defined(_SEM_SEMUN_UNDEFINED) union semun { int val; @@ -130,6 +136,7 @@ struct conn_info { struct list_head list; char setup_msg[sizeof (mar_req_setup_t)]; unsigned int setup_bytes_read; + struct list_head zcb_mapped_list_head; char *sending_allowed_private_data[64]; }; @@ -144,8 +151,84 @@ static void ipc_disconnect (struct conn_info *conn_info); static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len, int locked); +static int +memory_map (const char *path, void **buf, size_t bytes) +{ + int fd; + void *addr_orig; + void *addr; + int res; + + fd = open (path, O_RDWR, 0600); + + unlink (path); + + res = ftruncate (fd, bytes); + + addr_orig = mmap (NULL, bytes, PROT_NONE, + MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); + + if (addr_orig == MAP_FAILED) { + return (-1); + } + + addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE, + MAP_FIXED | MAP_SHARED, fd, 0); + + if (addr != addr_orig) { + return (-1); + } + + res = close (fd); + if (res) { + return (-1); + } + *buf = addr_orig; + return (0); +} + +static int +circular_memory_map (const char *path, void **buf, size_t bytes) +{ + int fd; + void *addr_orig; + void *addr; + int res; + + fd = open (path, O_RDWR, 0600); + + unlink (path); + + res = ftruncate (fd, bytes); + + addr_orig = mmap (NULL, bytes << 1, PROT_NONE, + MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); + + if (addr_orig == MAP_FAILED) { + return (-1); + } + + addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE, + MAP_FIXED | MAP_SHARED, fd, 0); + + if (addr != addr_orig) { + return (-1); + } + + addr = mmap (((char *)addr_orig) + bytes, + bytes, PROT_READ | PROT_WRITE, + MAP_FIXED | MAP_SHARED, fd, 0); + + res = close (fd); + if (res) { + return (-1); + } + *buf = addr_orig; + return (0); +} + static inline int -coroipcs_circular_memory_unmap (void *buf, size_t bytes) +circular_memory_unmap (void *buf, size_t bytes) { int res; @@ -154,6 +237,83 @@ coroipcs_circular_memory_unmap (void *buf, size_t bytes) return (res); } +static inline int zcb_free (struct zcb_mapped *zcb_mapped) +{ + unsigned int res; + + res = munmap (zcb_mapped->addr, zcb_mapped->size); + list_del (&zcb_mapped->list); + free (zcb_mapped); + return (res); +} + +static inline int zcb_by_addr_free (struct conn_info *conn_info, void *addr) +{ + struct list_head *list; + struct zcb_mapped *zcb_mapped; + unsigned int res = 0; + + for (list = conn_info->zcb_mapped_list_head.next; + list != &conn_info->zcb_mapped_list_head; list = list->next) { + + zcb_mapped = list_entry (list, struct zcb_mapped, list); + + if (zcb_mapped->addr == addr) { + res = zcb_free (zcb_mapped); + break; + } + + } + return (res); +} + +static inline int zcb_all_free ( + struct conn_info *conn_info) +{ + struct list_head *list; + struct zcb_mapped *zcb_mapped; + + for (list = conn_info->zcb_mapped_list_head.next; + list != &conn_info->zcb_mapped_list_head;) { + + zcb_mapped = list_entry (list, struct zcb_mapped, list); + + list = list->next; + + zcb_free (zcb_mapped); + } + return (0); +} + +static inline int zcb_alloc ( + struct conn_info *conn_info, + const char *path_to_file, + size_t size, + void **addr) +{ + struct zcb_mapped *zcb_mapped; + unsigned int res; + + zcb_mapped = malloc (sizeof (struct zcb_mapped)); + if (zcb_mapped == NULL) { + return (-1); + } + + res = memory_map ( + path_to_file, + addr, + size); + if (res == -1) { + return (-1); + } + + list_init (&zcb_mapped->list); + zcb_mapped->addr = *addr; + zcb_mapped->size = size; + list_add_tail (&zcb_mapped->list, &conn_info->zcb_mapped_list_head); + return (0); +} + static int ipc_thread_active (void *conn) { struct conn_info *conn_info = (struct conn_info *)conn; @@ -250,7 +410,8 @@ static inline int conn_info_destroy (struct conn_info *conn_info) api->free (conn_info->private_data); } close (conn_info->fd); - res = coroipcs_circular_memory_unmap (conn_info->dispatch_buffer, DISPATCH_SIZE); + res = circular_memory_unmap (conn_info->dispatch_buffer, DISPATCH_SIZE); + zcb_all_free (conn_info); api->free (conn_info); api->serialize_unlock (); return (-1); @@ -261,6 +422,83 @@ struct res_overlay { char buf[4096]; }; +union u { + uint64_t server_addr; + void *server_ptr; +}; + +static uint64_t void2serveraddr (void *server_ptr) +{ + union u u; + + u.server_ptr = server_ptr; + return (u.server_addr); +} + +static void *serveraddr2void (uint64_t server_addr) +{ + union u u; + + u.server_addr = server_addr; + return (u.server_ptr); +}; + +static inline void zerocopy_operations_process ( + struct conn_info *conn_info, + mar_req_header_t **header_out, + unsigned int *new_message) +{ + mar_req_header_t *header; + + header = (mar_req_header_t *)conn_info->mem->req_buffer; + if (header->id == ZC_ALLOC_HEADER) { + mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)header; + mar_res_header_t res_header; + void *addr = NULL; + struct coroipcs_zc_header *zc_header; + unsigned int res; + + res = zcb_alloc (conn_info, hdr->path_to_file, hdr->map_size, + &addr); + + zc_header = (struct coroipcs_zc_header *)addr; + zc_header->server_address = void2serveraddr(addr); + + res_header.size = sizeof (mar_res_header_t); + res_header.id = 0; + coroipcs_response_send ( + conn_info, &res_header, + res_header.size); + *new_message = 0; + return; + } else + if (header->id == ZC_FREE_HEADER) { + mar_req_coroipcc_zc_free_t *hdr = (mar_req_coroipcc_zc_free_t *)header; + mar_res_header_t res_header; + void *addr = NULL; + + addr = serveraddr2void (hdr->server_address); + + zcb_by_addr_free (conn_info, addr); + + res_header.size = sizeof (mar_res_header_t); + res_header.id = 0; + coroipcs_response_send ( + conn_info, &res_header, + res_header.size); + + *new_message = 0; + return; + } else + if (header->id == ZC_EXECUTE_HEADER) { + mar_req_coroipcc_zc_execute_t *hdr = (mar_req_coroipcc_zc_execute_t *)header; + + header = (mar_req_header_t *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header))); + } + *header_out = header; + *new_message = 1; +} + static void *pthread_ipc_consumer (void *conn) { struct conn_info *conn_info = (struct conn_info *)conn; @@ -269,6 +507,7 @@ static void *pthread_ipc_consumer (void *conn) mar_req_header_t *header; struct res_overlay res_overlay; int send_ok; + unsigned int new_message; if (api->sched_priority != 0) { struct sched_param sched_param; @@ -295,9 +534,15 @@ retry_semop: pthread_exit (0); } - coroipcs_refcount_inc (conn_info); + zerocopy_operations_process (conn_info, &header, &new_message); + /* + * There is no new message to process, continue for loop + */ + if (new_message == 0) { + continue; + } - header = (mar_req_header_t *)conn_info->mem->req_buffer; + coroipcs_refcount_inc (conn); send_ok = api->sending_allowed (conn_info->service, header->id, @@ -497,6 +742,7 @@ static int conn_info_create (int fd) conn_info->state = CONN_STATE_THREAD_INACTIVE; list_init (&conn_info->outq_head); list_init (&conn_info->list); + list_init (&conn_info->zcb_mapped_list_head); list_add (&conn_info->list, &conn_info_list_head); api->poll_dispatch_add (fd, conn_info); @@ -933,46 +1179,6 @@ retry_accept: return (0); } -static int -coroipcs_memory_map (char *path, void **buf, size_t bytes) -{ - int fd; - void *addr_orig; - void *addr; - int res; - - fd = open (path, O_RDWR, 0600); - - unlink (path); - - res = ftruncate (fd, bytes); - - addr_orig = mmap (NULL, bytes << 1, PROT_NONE, - MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); - - if (addr_orig == MAP_FAILED) { - return (-1); - } - - addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE, - MAP_FIXED | MAP_SHARED, fd, 0); - - if (addr != addr_orig) { - return (-1); - } - - addr = mmap (((char *)addr_orig) + bytes, - bytes, PROT_READ | PROT_WRITE, - MAP_FIXED | MAP_SHARED, fd, 0); - - res = close (fd); - if (res) { - return (-1); - } - *buf = addr_orig; - return (0); -} - int coroipcs_handler_dispatch ( int fd, int revent, @@ -1026,7 +1232,7 @@ int coroipcs_handler_dispatch ( conn_info->shmkey = req_setup->shmkey; conn_info->semkey = req_setup->semkey; - res = coroipcs_memory_map ( + res = circular_memory_map ( req_setup->dispatch_file, (void *)&conn_info->dispatch_buffer, DISPATCH_SIZE); diff --git a/include/corosync/coroipcc.h b/include/corosync/coroipcc.h index fcf4e979..d57d3f66 100644 --- a/include/corosync/coroipcc.h +++ b/include/corosync/coroipcc.h @@ -87,8 +87,27 @@ coroipcc_msg_send_reply_receive_in_buf ( unsigned int iov_len, void **res_msg); +extern cs_error_t +coroipcc_zcb_alloc ( + void *ipc_context, + void **buffer, + size_t size, + size_t header_size); + +extern cs_error_t +coroipcc_zcb_free ( + void *ipc_context, + void *buffer); + +extern cs_error_t +coroipcc_zcb_msg_send_reply_receive ( + void *ipc_context, + void *msg, + void *res_msg, + size_t res_len); + /* - * This needs to be removed + * TODO This needs to be removed */ struct saHandleDatabase { unsigned int handleCount; @@ -115,7 +134,6 @@ static void database_name##_init(void) \ saHandleDatabaseLock_init (&(database_name)); \ } - extern cs_error_t saHandleCreate ( struct saHandleDatabase *handleDatabase, diff --git a/include/corosync/cpg.h b/include/corosync/cpg.h index 8093dfba..f4fdcee4 100644 --- a/include/corosync/cpg.h +++ b/include/corosync/cpg.h @@ -198,4 +198,19 @@ cs_error_t cpg_flow_control_state_get ( cpg_handle_t handle, cpg_flow_control_state_t *flow_control_enabled); +cs_error_t cpg_zcb_alloc ( + cpg_handle_t handle, + size_t size, + void **buffer); + +cs_error_t cpg_zcb_free ( + cpg_handle_t handle, + void *buffer); + +cs_error_t cpg_zcb_mcast_joined ( + cpg_handle_t handle, + cpg_guarantee_t guarantee, + void *msg, + size_t msg_len); + #endif /* COROSYNC_CPG_H_DEFINED */ diff --git a/include/corosync/ipc_gen.h b/include/corosync/ipc_gen.h index 37615919..01fee2c6 100644 --- a/include/corosync/ipc_gen.h +++ b/include/corosync/ipc_gen.h @@ -126,6 +126,27 @@ typedef struct { void *conn __attribute__((aligned(8))); } mar_message_source_t __attribute__((aligned(8))); +typedef struct { + mar_req_header_t header __attribute__((aligned(8))); + size_t map_size __attribute__((aligned(8))); + char path_to_file[128] __attribute__((aligned(8))); +} mar_req_coroipcc_zc_alloc_t __attribute__((aligned(8))); + +typedef struct { + mar_req_header_t header __attribute__((aligned(8))); + size_t map_size __attribute__((aligned(8))); + uint64_t server_address __attribute__((aligned(8))); +} mar_req_coroipcc_zc_free_t __attribute__((aligned(8))); + +typedef struct { + mar_req_header_t header __attribute__((aligned(8))); + uint64_t server_address __attribute__((aligned(8))); +} mar_req_coroipcc_zc_execute_t __attribute__((aligned(8))); + +struct coroipcs_zc_header { + int map_size; + uint64_t server_address; +}; static inline void swab_mar_message_source_t (mar_message_source_t *to_swab) { swab_mar_uint32_t (&to_swab->nodeid); @@ -137,4 +158,8 @@ static inline void swab_mar_message_source_t (mar_message_source_t *to_swab) to_swab->conn = NULL; } +#define ZC_ALLOC_HEADER 0xFFFFFFFF +#define ZC_FREE_HEADER 0xFFFFFFFE +#define ZC_EXECUTE_HEADER 0xFFFFFFFD + #endif /* IPC_GEN_H_DEFINED */ diff --git a/lib/coroipcc.c b/lib/coroipcc.c index dc1429f1..679072b8 100644 --- a/lib/coroipcc.c +++ b/lib/coroipcc.c @@ -278,7 +278,7 @@ union semun { #endif static int -coroipcc_memory_map (char *path, const char *file, void **buf, size_t bytes) +circular_memory_map (char *path, const char *file, void **buf, size_t bytes) { int fd; void *addr_orig; @@ -325,13 +325,56 @@ coroipcc_memory_map (char *path, const char *file, void **buf, size_t bytes) } static void -coroipcc_memory_unmap (void *addr, size_t bytes) +memory_unmap (void *addr, size_t bytes) { int res; res = munmap (addr, bytes); } +static int +memory_map (char *path, const char *file, void **buf, size_t bytes) +{ + int fd; + void *addr_orig; + void *addr; + int res; + + sprintf (path, "/dev/shm/%s", file); + + fd = mkstemp (path); + if (fd == -1) { + sprintf (path, "/var/run/%s", file); + fd = mkstemp (path); + if (fd == -1) { + return (-1); + } + } + + res = ftruncate (fd, bytes); + + addr_orig = mmap (NULL, bytes, PROT_NONE, + MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); + + if (addr_orig == MAP_FAILED) { + return (-1); + } + + addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE, + MAP_FIXED | MAP_SHARED, fd, 0); + + if (addr != addr_orig) { + return (-1); + } + + res = close (fd); + if (res) { + return (-1); + } + *buf = addr_orig; + return (0); +} + cs_error_t coroipcc_service_connect ( const char *socket_name, @@ -431,7 +474,7 @@ coroipcc_service_connect ( goto error_exit; } - res = coroipcc_memory_map (dispatch_map_path, + res = circular_memory_map (dispatch_map_path, "dispatch_bufer-XXXXXX", &ipc_segment->dispatch_buffer, DISPATCH_SIZE); strcpy (req_setup.dispatch_file, dispatch_map_path); @@ -481,7 +524,10 @@ coroipcc_service_disconnect ( shutdown (ipc_segment->fd, SHUT_RDWR); close (ipc_segment->fd); shmdt (ipc_segment->shared_memory); - coroipcc_memory_unmap (ipc_segment->dispatch_buffer, (DISPATCH_SIZE) << 1); + /* + * << 1 (or multiplied by 2) because this is a wrapped memory buffer + */ + memory_unmap (ipc_segment->dispatch_buffer, (DISPATCH_SIZE) << 1); free (ipc_segment); return (CS_OK); } @@ -796,6 +842,111 @@ void saHandleDatabaseLock_init (struct saHandleDatabase *hdb) #endif +cs_error_t +coroipcc_zcb_alloc ( + void *ipc_context, + void **buffer, + size_t size, + size_t header_size) +{ + void *buf = NULL; + char path[128]; + unsigned int res; + mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc; + mar_res_header_t res_coroipcs_zc_alloc; + size_t map_size; + struct iovec iovec; + struct coroipcs_zc_header *hdr; + + map_size = size + header_size + sizeof (struct coroipcs_zc_header); + res = memory_map (path, "cpg_zc-XXXXXX", &buf, size); + assert (res != -1); + + req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t); + req_coroipcc_zc_alloc.header.id = ZC_ALLOC_HEADER; + req_coroipcc_zc_alloc.map_size = map_size; + strcpy (req_coroipcc_zc_alloc.path_to_file, path); + + + iovec.iov_base = &req_coroipcc_zc_alloc; + iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t); + + res = coroipcc_msg_send_reply_receive ( + ipc_context, + &iovec, + 1, + &res_coroipcs_zc_alloc, + sizeof (mar_res_header_t)); + + hdr = (struct coroipcs_zc_header *)buf; + hdr->map_size = map_size; + *buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header); + return (CS_OK); +} + +cs_error_t +coroipcc_zcb_free ( + void *ipc_context, + void *buffer) +{ + mar_req_coroipcc_zc_free_t req_coroipcc_zc_free; + mar_res_header_t res_coroipcs_zc_free; + struct iovec iovec; + unsigned int res; + + struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header)); + + req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t); + req_coroipcc_zc_free.header.id = ZC_FREE_HEADER; + req_coroipcc_zc_free.map_size = header->map_size; + req_coroipcc_zc_free.server_address = header->server_address; + + iovec.iov_base = &req_coroipcc_zc_free; + iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t); + + res = coroipcc_msg_send_reply_receive ( + ipc_context, + &iovec, + 1, + &res_coroipcs_zc_free, + sizeof (mar_res_header_t)); + + munmap (header, header->map_size); + + return (CS_OK); +} + +cs_error_t +coroipcc_zcb_msg_send_reply_receive ( + void *ipc_context, + void *msg, + void *res_msg, + size_t res_len) +{ + mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute; + struct coroipcs_zc_header *hdr; + struct iovec iovec; + cs_error_t res; + + hdr = (struct coroipcs_zc_header *)(((char *)msg) - sizeof (struct coroipcs_zc_header)); + + req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t); + req_coroipcc_zc_execute.header.id = ZC_EXECUTE_HEADER; + req_coroipcc_zc_execute.server_address = hdr->server_address; + + iovec.iov_base = &req_coroipcc_zc_execute; + iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t); + + res = coroipcc_msg_send_reply_receive ( + ipc_context, + &iovec, + 1, + res_msg, + res_len); + + return (res); +} + cs_error_t saHandleCreate ( struct saHandleDatabase *handleDatabase, diff --git a/lib/cpg.c b/lib/cpg.c index 10238c54..e11175da 100644 --- a/lib/cpg.c +++ b/lib/cpg.c @@ -477,59 +477,6 @@ error_exit: return (error); } -cs_error_t cpg_mcast_joined ( - cpg_handle_t handle, - cpg_guarantee_t guarantee, - const struct iovec *iovec, - unsigned int iov_len) -{ - int i; - cs_error_t error; - struct cpg_inst *cpg_inst; - struct iovec iov[64]; - struct req_lib_cpg_mcast req_lib_cpg_mcast; - struct res_lib_cpg_mcast res_lib_cpg_mcast; - size_t msg_len = 0; - - error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); - if (error != CS_OK) { - return (error); - } - - for (i = 0; i < iov_len; i++ ) { - msg_len += iovec[i].iov_len; - } - - req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) + - msg_len; - - req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST; - req_lib_cpg_mcast.guarantee = guarantee; - req_lib_cpg_mcast.msglen = msg_len; - - iov[0].iov_base = &req_lib_cpg_mcast; - iov[0].iov_len = sizeof (struct req_lib_cpg_mcast); - memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec)); - - pthread_mutex_lock (&cpg_inst->response_mutex); - - error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, iov, - iov_len + 1, &res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast)); - - pthread_mutex_unlock (&cpg_inst->response_mutex); - - if (error != CS_OK) { - goto error_exit; - } - - error = res_lib_cpg_mcast.header.error; - -error_exit: - saHandleInstancePut (&cpg_handle_t_db, handle); - - return (error); -} - cs_error_t cpg_membership_get ( cpg_handle_t handle, struct cpg_name *group_name, @@ -644,4 +591,146 @@ cs_error_t cpg_flow_control_state_get ( return (error); } + +cs_error_t cpg_zcb_alloc ( + cpg_handle_t handle, + size_t size, + void **buffer) +{ + cs_error_t error; + struct cpg_inst *cpg_inst; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != CS_OK) { + return (error); + } + + error = coroipcc_zcb_alloc (cpg_inst->ipc_ctx, + buffer, + size, + sizeof (struct req_lib_cpg_mcast)); + + saHandleInstancePut (&cpg_handle_t_db, handle); + *buffer = ((char *)*buffer) + sizeof (struct req_lib_cpg_mcast); + + return (error); +} + +cs_error_t cpg_zcb_free ( + cpg_handle_t handle, + void *buffer) +{ + cs_error_t error; + struct cpg_inst *cpg_inst; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != CS_OK) { + return (error); + } + + coroipcc_zcb_free (cpg_inst->ipc_ctx, ((char *)buffer) - sizeof (struct req_lib_cpg_mcast)); + + saHandleInstancePut (&cpg_handle_t_db, handle); + + return (error); +} + +cs_error_t cpg_zcb_mcast_joined ( + cpg_handle_t handle, + cpg_guarantee_t guarantee, + void *msg, + size_t msg_len) +{ + cs_error_t error; + struct cpg_inst *cpg_inst; + struct req_lib_cpg_mcast *req_lib_cpg_mcast; + struct res_lib_cpg_mcast res_lib_cpg_mcast; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != CS_OK) { + return (error); + } + + req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast)); + req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) + + msg_len; + + req_lib_cpg_mcast->header.id = MESSAGE_REQ_CPG_MCAST; + req_lib_cpg_mcast->guarantee = guarantee; + req_lib_cpg_mcast->msglen = msg_len; + + pthread_mutex_lock (&cpg_inst->response_mutex); + + error = coroipcc_zcb_msg_send_reply_receive ( + cpg_inst->ipc_ctx, + req_lib_cpg_mcast, + &res_lib_cpg_mcast, + sizeof (res_lib_cpg_mcast)); + + pthread_mutex_unlock (&cpg_inst->response_mutex); + + if (error != CS_OK) { + goto error_exit; + } + + error = res_lib_cpg_mcast.header.error; + +error_exit: + saHandleInstancePut (&cpg_handle_t_db, handle); + + return (error); +} + +cs_error_t cpg_mcast_joined ( + cpg_handle_t handle, + cpg_guarantee_t guarantee, + const struct iovec *iovec, + unsigned int iov_len) +{ + int i; + cs_error_t error; + struct cpg_inst *cpg_inst; + struct iovec iov[64]; + struct req_lib_cpg_mcast req_lib_cpg_mcast; + struct res_lib_cpg_mcast res_lib_cpg_mcast; + size_t msg_len = 0; + + error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst); + if (error != CS_OK) { + return (error); + } + + for (i = 0; i < iov_len; i++ ) { + msg_len += iovec[i].iov_len; + } + + req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) + + msg_len; + + req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST; + req_lib_cpg_mcast.guarantee = guarantee; + req_lib_cpg_mcast.msglen = msg_len; + + iov[0].iov_base = &req_lib_cpg_mcast; + iov[0].iov_len = sizeof (struct req_lib_cpg_mcast); + memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec)); + + pthread_mutex_lock (&cpg_inst->response_mutex); + + error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, iov, + iov_len + 1, &res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast)); + + pthread_mutex_unlock (&cpg_inst->response_mutex); + + if (error != CS_OK) { + goto error_exit; + } + + error = res_lib_cpg_mcast.header.error; + +error_exit: + saHandleInstancePut (&cpg_handle_t_db, handle); + + return (error); +} /** @} */ diff --git a/lib/libcfg.versions b/lib/libcfg.versions index 13ba406a..1549d767 100644 --- a/lib/libcfg.versions +++ b/lib/libcfg.versions @@ -24,6 +24,9 @@ COROSYNC_CFG_0.82 { coroipcc_dispatch_recv; coroipcc_msg_send_reply_receive; coroipcc_msg_send_reply_receive_in_buf; + coroipcc_zcb_alloc; + coroipcc_zcb_free; + coroipcc_zcb_msg_send_reply_receive; saHandleCreate; saHandleDestroy; saHandleInstanceGet; diff --git a/lib/libconfdb.versions b/lib/libconfdb.versions index dab4e8fd..91217ebc 100644 --- a/lib/libconfdb.versions +++ b/lib/libconfdb.versions @@ -30,6 +30,9 @@ COROSYNC_CONFDB_1.0 { coroipcc_dispatch_recv; coroipcc_msg_send_reply_receive; coroipcc_msg_send_reply_receive_in_buf; + coroipcc_zcb_alloc; + coroipcc_zcb_free; + coroipcc_zcb_msg_send_reply_receive; saHandleCreate; saHandleDestroy; saHandleInstanceGet; diff --git a/lib/libcoroipcc.versions b/lib/libcoroipcc.versions index 7720599a..9fc42a05 100644 --- a/lib/libcoroipcc.versions +++ b/lib/libcoroipcc.versions @@ -1,6 +1,6 @@ -# Version and symbol export for libipcutil.so +# Version and symbol export for libcoroipcc.so -COROSYNC_IPCUTIL_2.0 { +COROSYNC_COROIPCC_3.0 { global: coroipcc_service_connect; coroipcc_service_disconnect; @@ -9,6 +9,9 @@ COROSYNC_IPCUTIL_2.0 { coroipcc_dispatch_recv; coroipcc_msg_send_reply_receive; coroipcc_msg_send_reply_receive_in_buf; + coroipcc_zcb_alloc; + coroipcc_zcb_free; + coroipcc_zcb_msg_send_reply_receive; saHandleCreate; saHandleDestroy; saHandleInstanceGet; diff --git a/lib/libcpg.versions b/lib/libcpg.versions index 784a8487..c9e5da4a 100644 --- a/lib/libcpg.versions +++ b/lib/libcpg.versions @@ -12,6 +12,8 @@ COROSYNC_CPG_1.0 { cpg_membership_get; cpg_context_get; cpg_context_set; + cpg_zcb_alloc; + cpg_zcb_free; local: coroipcc_service_connect; @@ -21,6 +23,9 @@ COROSYNC_CPG_1.0 { coroipcc_dispatch_recv; coroipcc_msg_send_reply_receive; coroipcc_msg_send_reply_receive_in_buf; + coroipcc_zcb_alloc; + coroipcc_zcb_free; + coroipcc_zcb_msg_send_reply_receive; saHandleCreate; saHandleDestroy; saHandleInstanceGet; diff --git a/lib/libevs.versions b/lib/libevs.versions index cddc9a7f..f0827a5b 100644 --- a/lib/libevs.versions +++ b/lib/libevs.versions @@ -20,6 +20,9 @@ COROSYNC_EVS_2.0 { coroipcc_dispatch_recv; coroipcc_msg_send_reply_receive; coroipcc_msg_send_reply_receive_in_buf; + coroipcc_zcb_alloc; + coroipcc_zcb_free; + coroipcc_zcb_msg_send_reply_receive; saHandleCreate; saHandleDestroy; saHandleInstanceGet; diff --git a/lib/libpload.versions b/lib/libpload.versions index f5b4aa6e..63cc17e9 100644 --- a/lib/libpload.versions +++ b/lib/libpload.versions @@ -12,6 +12,9 @@ COROSYNC_PLOAD_1.0 { coroipcc_dispatch_recv; coroipcc_msg_send_reply_receive; coroipcc_msg_send_reply_receive_in_buf; + coroipcc_zcb_alloc; + coroipcc_zcb_free; + coroipcc_zcb_msg_send_reply_receive; saHandleCreate; saHandleDestroy; saHandleInstanceGet; diff --git a/lib/libquorum.versions b/lib/libquorum.versions index 73b24346..76e22406 100644 --- a/lib/libquorum.versions +++ b/lib/libquorum.versions @@ -17,6 +17,9 @@ COROSYNC_QUORUM_1.0 { coroipcc_dispatch_recv; coroipcc_msg_send_reply_receive; coroipcc_msg_send_reply_receive_in_buf; + coroipcc_zcb_alloc; + coroipcc_zcb_free; + coroipcc_zcb_msg_send_reply_receive; saHandleCreate; saHandleDestroy; saHandleInstanceGet; diff --git a/lib/libvotequorum.versions b/lib/libvotequorum.versions index ca89cbef..101a039c 100644 --- a/lib/libvotequorum.versions +++ b/lib/libvotequorum.versions @@ -26,6 +26,9 @@ COROSYNC_VOTEQUORUM_1.0 { coroipcc_dispatch_recv; coroipcc_msg_send_reply_receive; coroipcc_msg_send_reply_receive_in_buf; + coroipcc_zcb_alloc; + coroipcc_zcb_free; + coroipcc_zcb_msg_send_reply_receive; saHandleCreate; saHandleDestroy; saHandleInstanceGet; diff --git a/man/Makefile.am b/man/Makefile.am index 33b1625f..c1c9ce3d 100644 --- a/man/Makefile.am +++ b/man/Makefile.am @@ -69,6 +69,9 @@ dist_man_MANS = \ cpg_leave.3 \ cpg_local_get.3 \ cpg_mcast_joined.3 \ + cpg_zcb_mcast_joined.3 \ + cpg_zcb_alloc.3 \ + cpg_zcb_free.3 \ cpg_membership_get.3 \ evs_dispatch.3 \ evs_fd_get.3 \ diff --git a/man/cpg_context_get.3 b/man/cpg_context_get.3 index c9e46532..a843a925 100644 --- a/man/cpg_context_get.3 +++ b/man/cpg_context_get.3 @@ -56,6 +56,9 @@ The errors are undocumented. .BR cpg_join (3), .BR cpg_leave (3), .BR cpg_mcast_joined (3), -.BR cpg_membership_get (3) -.BR cpg_context_set (3) +.BR cpg_membership_get (3), +.BR cpg_context_set (3), +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) .PP diff --git a/man/cpg_context_set.3 b/man/cpg_context_set.3 index cfedce7c..e8752bd0 100644 --- a/man/cpg_context_set.3 +++ b/man/cpg_context_set.3 @@ -60,4 +60,7 @@ The errors are undocumented. .BR cpg_mcast_joined (3), .BR cpg_membership_get (3) .BR cpg_context_get (3) +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) .PP diff --git a/man/cpg_fd_get.3 b/man/cpg_fd_get.3 index 200dd922..a5940173 100644 --- a/man/cpg_fd_get.3 +++ b/man/cpg_fd_get.3 @@ -64,4 +64,7 @@ The errors are undocumented. .BR cpg_leave (3), .BR cpg_mcast_joined (3), .BR cpg_membership_get (3) +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) .PP diff --git a/man/cpg_finalize.3 b/man/cpg_finalize.3 index fc348602..39391b64 100644 --- a/man/cpg_finalize.3 +++ b/man/cpg_finalize.3 @@ -59,5 +59,9 @@ The errors are undocumented. .BR cpg_join (3), .BR cpg_leave (3), .BR cpg_mcast_joined (3), -.BR cpg_membership_get (3) +.BR cpg_membership_get (3), +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) + .PP diff --git a/man/cpg_initialize.3 b/man/cpg_initialize.3 index e48c154f..3d764c02 100644 --- a/man/cpg_initialize.3 +++ b/man/cpg_initialize.3 @@ -167,6 +167,9 @@ The errors are undocumented. .BR cpg_join (3), .BR cpg_leave (3), .BR cpg_mcast_joined (3), -.BR cpg_membership_get (3) -.BR cpg_groups_get (3) +.BR cpg_membership_get (3), +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) + .PP diff --git a/man/cpg_join.3 b/man/cpg_join.3 index bd57015e..bfe67b7a 100644 --- a/man/cpg_join.3 +++ b/man/cpg_join.3 @@ -100,5 +100,9 @@ Not all errors are documented. .BR cpg_dispatch (3), .BR cpg_leave (3), .BR cpg_mcast_joined (3), -.BR cpg_membership_get (3) +.BR cpg_membership_get (3), +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) + .PP diff --git a/man/cpg_leave.3 b/man/cpg_leave.3 index b1cdc97a..47b0de26 100644 --- a/man/cpg_leave.3 +++ b/man/cpg_leave.3 @@ -65,5 +65,9 @@ The errors are undocumented. .BR cpg_dispatch (3), .BR cpg_join (3), .BR cpg_mcast_joined (3), -.BR cpg_membership_get (3) +.BR cpg_membership_get (3), +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) + .PP diff --git a/man/cpg_local_get.3 b/man/cpg_local_get.3 index 823f6b1e..4ec8ecdf 100644 --- a/man/cpg_local_get.3 +++ b/man/cpg_local_get.3 @@ -60,5 +60,9 @@ The errors are undocumented. .BR cpg_dispatch (3), .BR cpg_leave (3), .BR cpg_mcast_joined (3), -.BR cpg_membership_get (3) +.BR cpg_membership_get (3), +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) + .PP diff --git a/man/cpg_mcast_joined.3 b/man/cpg_mcast_joined.3 index 3865dda6..42e0c882 100644 --- a/man/cpg_mcast_joined.3 +++ b/man/cpg_mcast_joined.3 @@ -129,5 +129,9 @@ The errors are undocumented. .BR cpg_dispatch (3), .BR cpg_leave (3), .BR cpg_join (3), -.BR cpg_membership_get (3) +.BR cpg_membership_get (3), +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) + .PP diff --git a/man/cpg_membership_get.3 b/man/cpg_membership_get.3 index f97078ab..d9aa782a 100644 --- a/man/cpg_membership_get.3 +++ b/man/cpg_membership_get.3 @@ -69,5 +69,9 @@ The errors are undocumented. .BR cpg_dispatch (3), .BR cpg_leave (3), .BR cpg_mcast_joined (3), -.BR cpg_membership_get (3) +.BR cpg_membership_get (3), +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) + .PP diff --git a/man/cpg_overview.8 b/man/cpg_overview.8 index 2ad3f147..354d8e88 100644 --- a/man/cpg_overview.8 +++ b/man/cpg_overview.8 @@ -1,5 +1,5 @@ .\"/* -.\" * Copyright (c) 2006 Red Hat, Inc. +.\" * Copyright (c) 2006-2009 Red Hat, Inc. .\" * .\" * All rights reserved. .\" * @@ -31,7 +31,7 @@ .\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF .\" * THE POSSIBILITY OF SUCH DAMAGE. .\" */ -.TH CPG_OVERVIEW 8 2006-03-06 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual" +.TH CPG_OVERVIEW 8 2009-4-15 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual" .SH OVERVIEW The CPG library is delivered with the corosync project. This library is used to create distributed applications that operate properly during partitions, merges, @@ -45,12 +45,11 @@ The library provides a mechanism to: * Deliver configuration changes .PP .SH SECURITY -The CPG library encrypts all messages sent over the network using the SOBER-128 -stream cipher. The EVS library uses HMAC and SHA1 to authenticate all messages. -The CPG library uses SOBER-128 as a pseudo random number generator. TheCPG -library feeds the PRNG using the /dev/random Linux device. -.SH BUGS -This software is not yet production, so there may still be some bugs. +If encryption is enabled in corosync.conf, the CPG library will encrypt and +authenticate message contents. Applications must run as the ais user to be +validated by corosync on IPC connection, otherwise they will be unable to +access the corosync services. + .SH "SEE ALSO" .BR cpg_initialize (3), .BR cpg_finalize (3), @@ -60,5 +59,8 @@ This software is not yet production, so there may still be some bugs. .BR cpg_leave (3), .BR cpg_mcast_joined (3), .BR cpg_membership_get (3) +.BR cpg_zcb_alloc (3) +.BR cpg_zcb_free (3) +.BR cpg_zcb_mcast_joined (3) .PP diff --git a/man/cpg_zcb_alloc.3 b/man/cpg_zcb_alloc.3 new file mode 100644 index 00000000..9e94768d --- /dev/null +++ b/man/cpg_zcb_alloc.3 @@ -0,0 +1,80 @@ +.\"/* +.\" * Copyright (c) 2009 Red Hat, Inc. +.\" * +.\" * All rights reserved. +.\" * +.\" * Author: Steven Dake +.\" * +.\" * This software licensed under BSD license, the text of which follows: +.\" * +.\" * Redistribution and use in source and binary forms, with or without +.\" * modification, are permitted provided that the following conditions are met: +.\" * +.\" * - Redistributions of source code must retain the above copyright notice, +.\" * this list of conditions and the following disclaimer. +.\" * - Redistributions in binary form must reproduce the above copyright notice, +.\" * this list of conditions and the following disclaimer in the documentation +.\" * and/or other materials provided with the distribution. +.\" * - Neither the name of the MontaVista Software, Inc. nor the names of its +.\" * contributors may be used to endorse or promote products derived from this +.\" * software without specific prior written permission. +.\" * +.\" * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +.\" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +.\" * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +.\" * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +.\" * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +.\" * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +.\" * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +.\" * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +.\" * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF +.\" * THE POSSIBILITY OF SUCH DAMAGE. +.\" */ +.TH CPG_ZCB_ALLOC 2009-04-15 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual" +.SH NAME +cpg_zcb_alloc \- Allocates a zero copy buffer +.B #include +.sp +.BI "int cpg_zcb_alloc(cpg_handle_t " handle ", size_t " size ", void **" buffer "); +.SH DESCRIPTION +The +.B cpg_zcb_alloc +function will allocate a zero copy buffer for use with the +.B cpg_zcb_mcast_joined(3) +funtion. This buffer should not be used in another thread while a +cpg_zcb_mcast_joined operation is taking place on the buffer. The buffer is +allocated via operating system mechanisms to avoid copying in the IPC layer. + +.PP +The argument +.I handle +describes the handle on which the buffer will be allocated. +.PP +The argument +.I size +requests a buffer of size be allocated. +.PP +The +.I buffer +argument is set to the buffer address that is allocated by this operatoin. + +.SH RETURN VALUE +This call returns the CPG_OK value if successful, otherwise an error is returned. +.PP +.SH ERRORS +The errors are undocumented. +.SH "SEE ALSO" +.BR cpg_overview (8), +.BR cpg_initialize (3), +.BR cpg_finalize (3), +.BR cpg_fd_get (3), +.BR cpg_dispatch (3), +.BR cpg_leave (3), +.BR cpg_join (3), +.BR cpg_membership_get (3), +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) + +.PP diff --git a/man/cpg_zcb_free.3 b/man/cpg_zcb_free.3 new file mode 100644 index 00000000..daae971e --- /dev/null +++ b/man/cpg_zcb_free.3 @@ -0,0 +1,72 @@ +.\"/* +.\" * Copyright (c) 2009 Red Hat, Inc. +.\" * +.\" * All rights reserved. +.\" * +.\" * Author: Steven Dake +.\" * +.\" * This software licensed under BSD license, the text of which follows: +.\" * +.\" * Redistribution and use in source and binary forms, with or without +.\" * modification, are permitted provided that the following conditions are met: +.\" * +.\" * - Redistributions of source code must retain the above copyright notice, +.\" * this list of conditions and the following disclaimer. +.\" * - Redistributions in binary form must reproduce the above copyright notice, +.\" * this list of conditions and the following disclaimer in the documentation +.\" * and/or other materials provided with the distribution. +.\" * - Neither the name of the MontaVista Software, Inc. nor the names of its +.\" * contributors may be used to endorse or promote products derived from this +.\" * software without specific prior written permission. +.\" * +.\" * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +.\" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +.\" * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +.\" * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +.\" * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +.\" * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +.\" * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +.\" * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +.\" * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF +.\" * THE POSSIBILITY OF SUCH DAMAGE. +.\" */ +.TH CPG_ZCB_FREE 2009-04-15 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual" +.SH NAME +cpg_zcb_free \- Frees a zero copy buffer +.B #include +.sp +.BI "int cpg_zcb_fre(cpg_handle_t " handle ", void *" buffer "); +.SH DESCRIPTION +The +.B cpg_zcb_free +function will free a zero copy buffer. + +.PP +The argument +.I handle +describes the handle on which the buffer will be allocated. +.PP +The argument +.I buffer +is the zero copy buffer to free. + +.SH RETURN VALUE +This call returns the CPG_OK value if successful, otherwise an error is returned. +.PP +.SH ERRORS +The errors are undocumented. +.SH "SEE ALSO" +.BR cpg_overview (8), +.BR cpg_initialize (3), +.BR cpg_finalize (3), +.BR cpg_fd_get (3), +.BR cpg_dispatch (3), +.BR cpg_leave (3), +.BR cpg_join (3), +.BR cpg_membership_get (3), +.BR cpg_zcb_alloc (3), +.BR cpg_zcb_free (3), +.BR cpg_zcb_mcast_joined (3) + +.PP diff --git a/man/cpg_zcb_mcast_joined.3 b/man/cpg_zcb_mcast_joined.3 new file mode 100644 index 00000000..ca553023 --- /dev/null +++ b/man/cpg_zcb_mcast_joined.3 @@ -0,0 +1,121 @@ +.\"/* +.\" * Copyright (c) 2009 Red Hat, Inc. +.\" * +.\" * All rights reserved. +.\" * +.\" * Author: Steven Dake +.\" * +.\" * This software licensed under BSD license, the text of which follows: +.\" * +.\" * Redistribution and use in source and binary forms, with or without +.\" * modification, are permitted provided that the following conditions are met: +.\" * +.\" * - Redistributions of source code must retain the above copyright notice, +.\" * this list of conditions and the following disclaimer. +.\" * - Redistributions in binary form must reproduce the above copyright notice, +.\" * this list of conditions and the following disclaimer in the documentation +.\" * and/or other materials provided with the distribution. +.\" * - Neither the name of the MontaVista Software, Inc. nor the names of its +.\" * contributors may be used to endorse or promote products derived from this +.\" * software without specific prior written permission. +.\" * +.\" * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +.\" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +.\" * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +.\" * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +.\" * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +.\" * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +.\" * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +.\" * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +.\" * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF +.\" * THE POSSIBILITY OF SUCH DAMAGE. +.\" */ +.TH CPG_ZCB_MCAST_JOINED 3 3004-08-31 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual" +.SH NAME +cpg_mcast_joined \- Multicasts a zero copy buffer to all groups joined to a handle +.SH SYNOPSIS +.B #include +.B #include +.sp +.BI "int cpg_zcb_mcast_joined(cpg_handle_t " handle ", cpg_gurantee_t " guarantee ", const void *" buffer "", int " msg_len "); +.SH DESCRIPTION +The +.B cpg_zcb_mcast_joined +function will multicast a zero copy buffer message to all the processes that +have been joined with the +.B cpg_join(3) +funtion for the same group name. +Messages that are sent to any of the groups joined to the parameter +.I handle +will be delivered to all subscribed processes in the system. +.PP +The argument +.I guarantee +requests a delivery guarantee for the message to be sent. The cpg_guarantee_t type is +defined by: +.IP +.RS +.ne 18 +.nf +.ta 4n 30n 33n +typedef enum { + CPG_TYPE_UNORDERED, /* not implemented */ + CPG_TYPE_FIFO, /* same as agreed */ + CPG_TYPE_AGREED, /* implemented */ + CPG_TYPE_SAFE /* not implemented */ +} cpg_guarantee_t; +.ta +.fi +.RE +.IP +.PP +.PP +The meanings of the cpg_guarantee_t typedef are: +.TP +.B CPG_TYPE_UNORDERED +Messages are guaranteed to be delivered, but with no particular order. This +mode is unimplemented in the CPG library. +.TP +.B CPG_TYPE_FIFO +Messages are guaranteed to be delivered in first sent first delivery order. +In fact, this guarantee is equivalent to the CPG_TYPE_AGREED guarantee. +.TP +.B CPG_TYPE_AGREED +All processors must agree on the order of delivery. If a message is sent +from two or more processors at about the same time, the delivery will occur +in the same order to all processors. +.TP +.B CPG_TYPE_SAFE +All processors must agree on the order of delivery. Further all processors +must have a copy of the message before any delivery takes place. This mode is +unimplemented in the CPG library. +.PP +The +.I msg +argument describes the zero copy buffer which is used to transmit a message. +this buffer must be allocated by +.B cpg_zcb_alloc(3). + +.PP +The +.I msg_len +argument describes the number of bytes to be transmitted in the zero copy buffer. + +.SH RETURN VALUE +This call returns the CPG_OK value if successful, otherwise an error is returned. +.PP +.SH ERRORS +The errors are undocumented. +.SH "SEE ALSO" +.BR cpg_overview (8), +.BR cpg_initialize (3), +.BR cpg_finalize (3), +.BR cpg_fd_get (3), +.BR cpg_dispatch (3), +.BR cpg_leave (3), +.BR cpg_join (3), +.BR cpg_membership_get (3) +.BR cpg_zcb_alloc (3) +.BR cpg_zcb_free (3) +.PP diff --git a/test/Makefile.am b/test/Makefile.am index ba883e1d..a9e3f29c 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -35,7 +35,7 @@ INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include noinst_PROGRAMS = testevs evsbench evsverify testcpg testcpg2 cpgbench testconfdb \ logsysbench logsysrec testquorum testvotequorum1 testvotequorum2 \ - logsys_s logsys_t1 logsys_t2 + logsys_s logsys_t1 logsys_t2 testcpgzc cpgbenchzc testzcgc testevs_LDADD = -levs testevs_LDFLAGS = -L../lib @@ -43,6 +43,10 @@ testcpg_LDADD = -lcpg testcpg_LDFLAGS = -L../lib testcpg2_LDADD = -lcpg testcpg2_LDFLAGS = -L../lib +testcpgzc_LDADD = -lcpg +testcpgzc_LDFLAGS = -L../lib +testzcgc_LDADD = -lcpg +testzcgc_LDFLAGS = -L../lib testconfdb_LDADD = -lconfdb ../lcr/liblcr.a testconfdb_LDFLAGS = -L../lib testquorum_LDADD = -lquorum @@ -57,6 +61,8 @@ evsbench_LDADD = -levs evsbench_LDFLAGS = -L../lib cpgbench_LDADD = -lcpg cpgbench_LDFLAGS = -L../lib +cpgbenchzc_LDADD = -lcpg +cpgbenchzc_LDFLAGS = -L../lib logsysbench_LDADD = -llogsys logsysbench_LDFLAGS = -L../exec logsysrec_LDADD = -llogsys diff --git a/test/cpgbenchzc.c b/test/cpgbenchzc.c new file mode 100644 index 00000000..2f8c0fb8 --- /dev/null +++ b/test/cpgbenchzc.c @@ -0,0 +1,195 @@ +#include +/* + * Copyright (c) 2006, 2009 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Steven Dake (sdake@redhat.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#ifdef COROSYNC_SOLARIS +#define timersub(a, b, result) \ + do { \ + (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \ + (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \ + if ((result)->tv_usec < 0) { \ + --(result)->tv_sec; \ + (result)->tv_usec += 1000000; \ + } \ + } while (0) +#endif + +static int alarm_notice; + +static void cpg_bm_confchg_fn ( + cpg_handle_t handle, + const struct cpg_name *group_name, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) +{ +} + +static unsigned int write_count; + +static void cpg_bm_deliver_fn ( + cpg_handle_t handle, + const struct cpg_name *group_name, + uint32_t nodeid, + uint32_t pid, + const void *msg, + size_t msg_len) +{ + write_count++; +} + +static cpg_callbacks_t callbacks = { + .cpg_deliver_fn = cpg_bm_deliver_fn, + .cpg_confchg_fn = cpg_bm_confchg_fn +}; + + +void *data; + +static void cpg_benchmark ( + cpg_handle_t handle, + int write_size) +{ + struct timeval tv1, tv2, tv_elapsed; + unsigned int res; + cpg_flow_control_state_t flow_control_state; + + alarm_notice = 0; + + write_count = 0; + alarm (10); + + gettimeofday (&tv1, NULL); + do { + /* + * Test checkpoint write + */ + cpg_flow_control_state_get (handle, &flow_control_state); + if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) { +retry: + res = cpg_zcb_mcast_joined (handle, CPG_TYPE_AGREED, data, write_size); + if (res == CS_ERR_TRY_AGAIN) { + goto retry; + } + } + res = cpg_dispatch (handle, CS_DISPATCH_ALL); + if (res != CS_OK) { + printf ("cpg dispatch returned error %d\n", res); + exit (1); + } + } while (alarm_notice == 0); + gettimeofday (&tv2, NULL); + timersub (&tv2, &tv1, &tv_elapsed); + + printf ("%5d messages received ", write_count); + printf ("%5d bytes per write ", write_size); + printf ("%7.3f Seconds runtime ", + (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); + printf ("%9.3f TP/s ", + ((float)write_count) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); + printf ("%7.3f MB/s.\n", + ((float)write_count) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0)); +} + +static void sigalrm_handler (int num) +{ + alarm_notice = 1; +} + +static struct cpg_name group_name = { + .value = "cpg_bm", + .length = 6 +}; + +int main (void) { + cpg_handle_t handle; + unsigned int size; + int i; + unsigned int res; + + + + size = 1000; + signal (SIGALRM, sigalrm_handler); + res = cpg_initialize (&handle, &callbacks); + if (res != CS_OK) { + printf ("cpg_initialize failed with result %d\n", res); + exit (1); + } + cpg_zcb_alloc (handle, 500000, &data); + if (res != CS_OK) { + printf ("cpg_zcb_alloc couldn't allocate zero copy buffer %d\n", res); + exit (1); + } + + res = cpg_join (handle, &group_name); + if (res != CS_OK) { + printf ("cpg_join failed with result %d\n", res); + exit (1); + } + + for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */ + cpg_benchmark (handle, size); + size += 1000; + } + + res = cpg_finalize (handle); + if (res != CS_OK) { + printf ("cpg_join failed with result %d\n", res); + exit (1); + } + return (0); +} diff --git a/test/testcpgzc.c b/test/testcpgzc.c new file mode 100644 index 00000000..9e139667 --- /dev/null +++ b/test/testcpgzc.c @@ -0,0 +1,244 @@ +/* + * Copyright (c) 2006-2009 Red Hat Inc + * + * All rights reserved. + * + * Author: Christine Caulfield + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +static int quit = 0; +static int show_ip = 0; + +static void print_cpgname (const struct cpg_name *name) +{ + int i; + + for (i = 0; i < name->length; i++) { + printf ("%c", name->value[i]); + } +} + +static void DeliverCallback ( + cpg_handle_t handle, + const struct cpg_name *groupName, + uint32_t nodeid, + uint32_t pid, + const void *msg, + size_t msg_len) +{ + if (show_ip) { + struct in_addr saddr; + saddr.s_addr = nodeid; + printf("DeliverCallback: message (len=%lu)from node/pid %s/%d: '%s'\n", + (unsigned long int) msg_len, + inet_ntoa(saddr), pid, (const char *)msg); + } + else { + printf("DeliverCallback: message (len=%lu)from node/pid %d/%d: '%s'\n", + (unsigned long int) msg_len, nodeid, pid, + (const char *)msg); + } +} + +static void ConfchgCallback ( + cpg_handle_t handle, + const struct cpg_name *groupName, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) +{ + int i; + struct in_addr saddr; + + printf("\nConfchgCallback: group '"); + print_cpgname(groupName); + printf("'\n"); + for (i=0; i optind) { + strcpy(group_name.value, argv[optind]); + group_name.length = strlen(argv[optind])+1; + } + else { + strcpy(group_name.value, "GROUP"); + group_name.length = 6; + } + + result = cpg_initialize (&handle, &callbacks); + if (result != CS_OK) { + printf ("Could not initialize Cluster Process Group API instance error %d\n", result); + exit (1); + } + cpg_zcb_alloc (handle, 8192, &buffer); + cpg_zcb_free (handle, buffer); + cpg_zcb_alloc (handle, 8192, &buffer); + + result = cpg_local_get (handle, &nodeid); + if (result != CS_OK) { + printf ("Could not get local node id\n"); + exit (1); + } + + printf ("Local node id is %x\n", nodeid); + result = cpg_join(handle, &group_name); + if (result != CS_OK) { + printf ("Could not join process group, error %d\n", result); + exit (1); + } + + FD_ZERO (&read_fds); + cpg_fd_get(handle, &select_fd); + printf ("Type EXIT to finish\n"); + do { + FD_SET (select_fd, &read_fds); + FD_SET (STDIN_FILENO, &read_fds); + result = select (select_fd + 1, &read_fds, 0, 0, 0); + if (result == -1) { + perror ("select\n"); + } + if (FD_ISSET (STDIN_FILENO, &read_fds)) { + fgets_res = fgets(buffer, sizeof(buffer), stdin); + if (fgets_res == NULL) { + cpg_leave(handle, &group_name); + } + if (strncmp(buffer, "EXIT", 4) == 0) { + cpg_leave(handle, &group_name); + } + else { + cpg_zcb_mcast_joined (handle, CPG_TYPE_AGREED, + buffer, strlen (buffer) + 1); + } + } + if (FD_ISSET (select_fd, &read_fds)) { + if (cpg_dispatch (handle, CS_DISPATCH_ALL) != CS_OK) + exit(1); + } + } while (result && !quit); + + + result = cpg_finalize (handle); + printf ("Finalize result is %d (should be 1)\n", result); + return (0); +} diff --git a/test/testzcgc.c b/test/testzcgc.c new file mode 100644 index 00000000..da376554 --- /dev/null +++ b/test/testzcgc.c @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2006-2009 Red Hat Inc + * + * All rights reserved. + * + * Author: Christine Caulfield + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +static int quit = 0; +static int show_ip = 0; + +static void print_cpgname (const struct cpg_name *name) +{ + int i; + + for (i = 0; i < name->length; i++) { + printf ("%c", name->value[i]); + } +} + +static void DeliverCallback ( + cpg_handle_t handle, + const struct cpg_name *groupName, + uint32_t nodeid, + uint32_t pid, + const void *msg, + size_t msg_len) +{ + if (show_ip) { + struct in_addr saddr; + saddr.s_addr = nodeid; + printf("DeliverCallback: message (len=%lu)from node/pid %s/%d: '%s'\n", + (unsigned long int) msg_len, + inet_ntoa(saddr), pid, (const char *)msg); + } + else { + printf("DeliverCallback: message (len=%lu)from node/pid %d/%d: '%s'\n", + (unsigned long int) msg_len, nodeid, pid, + (const char *)msg); + } +} + +static void ConfchgCallback ( + cpg_handle_t handle, + const struct cpg_name *groupName, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) +{ + int i; + struct in_addr saddr; + + printf("\nConfchgCallback: group '"); + print_cpgname(groupName); + printf("'\n"); + for (i=0; i