From 4cb815a6cee5fb3a9dc857ea9504259e60801c19 Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Tue, 24 Jun 2008 04:46:03 +0000 Subject: [PATCH] Forward port of the synchronization engine into trunk. git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1563 fd59a12c-fef9-0310-b244-a6a79926bd2f --- exec/sync.c | 360 ++++++++++++++-------------------------------------- 1 file changed, 96 insertions(+), 264 deletions(-) diff --git a/exec/sync.c b/exec/sync.c index 7fe2886c..34bbb06f 100644 --- a/exec/sync.c +++ b/exec/sync.c @@ -1,13 +1,10 @@ /* * Copyright (c) 2005-2006 MontaVista Software, Inc. - * Copyright (c) 2006 Ericsson AB. * Copyright (c) 2006-2007 Red Hat, Inc. * - * Author: Steven Dake (sdake@redhat.com) - * Author: Hans Feldt - * * All rights reserved. * + * Author: Steven Dake (sdake@redhat.com) * * This software licensed under BSD license, the text of which follows: * @@ -58,14 +55,13 @@ #include "totempg.h" #include "totem.h" #include "vsf.h" +#include "swab.h" #include "../lcr/lcr_ifact.h" #include "logsys.h" -#include "util.h" LOGSYS_DECLARE_SUBSYS ("SYNC", LOG_INFO); #define MESSAGE_REQ_SYNC_BARRIER 0 -#define MESSAGE_REQ_SYNC_REQUEST 1 struct barrier_data { unsigned int nodeid; @@ -76,8 +72,7 @@ static struct memb_ring_id *sync_ring_id; static int vsf_none = 0; -static int (*sync_callbacks_retrieve) (int sync_id, - struct sync_callbacks *callack); +static int (*sync_callbacks_retrieve) (int sync_id, struct sync_callbacks *callack); static struct sync_callbacks sync_callbacks; @@ -88,7 +83,10 @@ static void (*sync_synchronization_completed) (void); static int sync_recovery_index = 0; static void *sync_callback_token_handle = 0; -static void *sync_request_token_handle; + +static struct barrier_data barrier_data_confchg[PROCESSOR_COUNT_MAX]; + +static int barrier_data_confchg_entries; static struct barrier_data barrier_data_process[PROCESSOR_COUNT_MAX]; @@ -96,13 +94,11 @@ static struct openais_vsf_iface_ver0 *vsf_iface; static int sync_barrier_send (struct memb_ring_id *ring_id); -static int sync_start_process ( - enum totem_callback_token_type type, void *data); +static int sync_start_process (enum totem_callback_token_type type, void *data); static void sync_service_init (struct memb_ring_id *ring_id); -static int sync_service_process ( - enum totem_callback_token_type type, void *data); +static int sync_service_process (enum totem_callback_token_type type, void *data); static void sync_deliver_fn ( unsigned int nodeid, @@ -124,71 +120,52 @@ static void sync_primary_callback_fn ( struct memb_ring_id *ring_id); static struct totempg_group sync_group = { - .group = "sync", - .group_len = 4 + .group = "sync", + .group_len = 4 }; static totempg_groups_handle sync_group_handle; -static char *service_name; -static struct memb_ring_id deliver_ring_id; -static unsigned int current_members[PROCESSOR_COUNT_MAX]; -static unsigned int current_members_cnt; -struct sync_barrier_start { -}; - -struct sync_request { - uint32_t name_len; - char name[0] __attribute__((aligned(8))); -}; - -typedef struct sync_msg { +struct req_exec_sync_barrier_start { mar_req_header_t header; struct memb_ring_id ring_id; - union { - struct sync_barrier_start sync_barrier_start; - struct sync_request sync_request; - }; -} sync_msg_t; +}; /* * Send a barrier data structure */ static int sync_barrier_send (struct memb_ring_id *ring_id) { - sync_msg_t msg; + struct req_exec_sync_barrier_start req_exec_sync_barrier_start; struct iovec iovec; int res; - msg.header.size = sizeof (sync_msg_t); - msg.header.id = MESSAGE_REQ_SYNC_BARRIER; + req_exec_sync_barrier_start.header.size = sizeof (struct req_exec_sync_barrier_start); + req_exec_sync_barrier_start.header.id = MESSAGE_REQ_SYNC_BARRIER; - memcpy (&msg.ring_id, ring_id, sizeof (struct memb_ring_id)); + memcpy (&req_exec_sync_barrier_start.ring_id, ring_id, + sizeof (struct memb_ring_id)); - iovec.iov_base = (char *)&msg; - iovec.iov_len = sizeof (msg); + iovec.iov_base = (char *)&req_exec_sync_barrier_start; + iovec.iov_len = sizeof (req_exec_sync_barrier_start); - res = totempg_groups_mcast_joined ( - sync_group_handle, &iovec, 1, TOTEMPG_AGREED); + res = totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED); return (res); } -static void sync_start_init (struct memb_ring_id *ring_id) +void sync_start_init (struct memb_ring_id *ring_id) { - ENTER(""); totempg_callback_token_create ( &sync_callback_token_handle, TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */ sync_start_process, (void *)ring_id); - LEAVE(""); } static void sync_service_init (struct memb_ring_id *ring_id) { - ENTER(""); sync_callbacks.sync_init (); totempg_callback_token_destroy (&sync_callback_token_handle); @@ -201,16 +178,13 @@ static void sync_service_init (struct memb_ring_id *ring_id) 0, /* don't delete after callback */ sync_service_process, (void *)ring_id); - LEAVE(""); } -static int sync_start_process ( - enum totem_callback_token_type type, void *data) +static int sync_start_process (enum totem_callback_token_type type, void *data) { int res; struct memb_ring_id *ring_id = (struct memb_ring_id *)data; - ENTER(""); res = sync_barrier_send (ring_id); if (res == 0) { /* @@ -218,52 +192,43 @@ static int sync_start_process ( */ totempg_callback_token_destroy (&sync_callback_token_handle); } - LEAVE(""); return (0); } -static void sync_callbacks_load (void) +void sync_callbacks_load (void) { int res; - ENTER(""); for (;;) { res = sync_callbacks_retrieve (sync_recovery_index, &sync_callbacks); + /* * No more service handlers have sync callbacks at this time - */ + ` */ if (res == -1) { sync_processing = 0; break; } - if ((service_name != NULL) && - strcmp (sync_callbacks.name, service_name) != 0) { - sync_recovery_index += 1; - continue; - } sync_recovery_index += 1; if (sync_callbacks.sync_init) { break; } } - LEAVE(""); } -static int sync_service_process ( - enum totem_callback_token_type type, void *data) +static int sync_service_process (enum totem_callback_token_type type, void *data) { int res; struct memb_ring_id *ring_id = (struct memb_ring_id *)data; - ENTER(""); - + /* * If process operation not from this ring id, then ignore it and stop * processing */ if (memcmp (ring_id, sync_ring_id, sizeof (struct memb_ring_id)) != 0) { - goto end; + return (0); } /* @@ -272,14 +237,12 @@ static int sync_service_process ( */ res = sync_callbacks.sync_process (); if (res != 0) { - goto end; + return (0); } totempg_callback_token_destroy (&sync_callback_token_handle); sync_start_init (ring_id); -end: - LEAVE(""); return (0); } @@ -354,16 +317,10 @@ static void sync_primary_callback_fn ( { int i; - ENTER(""); - if (primary_designated) { - log_printf (LOG_LEVEL_NOTICE, - "This node is within the primary component and will provide" - " service.\n"); + log_printf (LOG_LEVEL_NOTICE, "This node is within the primary component and will provide service.\n"); } else { - log_printf (LOG_LEVEL_NOTICE, - "This node is within the non-primary component and will NOT" - " provide any services.\n"); + log_printf (LOG_LEVEL_NOTICE, "This node is within the non-primary component and will NOT provide any services.\n"); return; } @@ -375,14 +332,25 @@ static void sync_primary_callback_fn ( totempg_callback_token_destroy (&sync_callback_token_handle); sync_recovery_index = 0; - + memset (&barrier_data_confchg, 0, sizeof (barrier_data_confchg)); for (i = 0; i < view_list_entries; i++) { - barrier_data_process[i].nodeid = view_list[i]; - barrier_data_process[i].completed = 0; + barrier_data_confchg[i].nodeid = view_list[i]; + barrier_data_confchg[i].completed = 0; } - + memcpy (barrier_data_process, barrier_data_confchg, + sizeof (barrier_data_confchg)); + barrier_data_confchg_entries = view_list_entries; sync_start_init (sync_ring_id); - LEAVE(""); +} + +static struct memb_ring_id deliver_ring_id; + +void sync_endian_convert (struct req_exec_sync_barrier_start *req_exec_sync_barrier_start) +{ + totemip_copy_endian_convert(&req_exec_sync_barrier_start->ring_id.rep, + &req_exec_sync_barrier_start->ring_id.rep); + req_exec_sync_barrier_start->ring_id.seq = swab64 (req_exec_sync_barrier_start->ring_id.seq); + } static void sync_deliver_fn ( @@ -391,67 +359,36 @@ static void sync_deliver_fn ( int iov_len, int endian_conversion_required) { - int i; - int barrier_completed; - sync_msg_t *msg = (sync_msg_t *)iovec[0].iov_base; + struct req_exec_sync_barrier_start *req_exec_sync_barrier_start = + (struct req_exec_sync_barrier_start *)iovec[0].iov_base; - ENTER("type %d, len %d", msg->header.id, (int)iovec[0].iov_len); + int i; if (endian_conversion_required) { - swab_mar_req_header_t (&msg->header); - swab_memb_ring_id_t (&msg->ring_id); + sync_endian_convert (req_exec_sync_barrier_start); } + int barrier_completed = 1; + + memcpy (&deliver_ring_id, &req_exec_sync_barrier_start->ring_id, + sizeof (struct memb_ring_id)); + /* - * If this message is not from this configuration, ignore it + * Is this barrier from this configuration, if not, ignore it */ - if (memcmp (&msg->ring_id, sync_ring_id, + if (memcmp (&req_exec_sync_barrier_start->ring_id, sync_ring_id, sizeof (struct memb_ring_id)) != 0) { - goto end; + return; } - if (msg->header.id == MESSAGE_REQ_SYNC_REQUEST) { - if (endian_conversion_required) { - swab_mar_uint32_t (&msg->sync_request.name_len); - } - /* - * If there is an ongoing sync, abort it. A requested sync is - * only allowed to abort other requested synchronizations, - * not full synchronizations. - */ - if (sync_processing && sync_callbacks.sync_abort) { - sync_callbacks.sync_abort(); - sync_callbacks.sync_activate = NULL; - sync_processing = 0; - assert (service_name != NULL); - free (service_name); - service_name = NULL; - } - - service_name = malloc (msg->sync_request.name_len); - strcpy (service_name, msg->sync_request.name); - - /* - * Start requested synchronization - */ - sync_primary_callback_fn (current_members, current_members_cnt, 1, - sync_ring_id); - - goto end; - } - - barrier_completed = 1; - - memcpy (&deliver_ring_id, &msg->ring_id, sizeof (struct memb_ring_id)); - /* * Set completion for source_addr's address */ - for (i = 0; i < current_members_cnt; i++) { + for (i = 0; i < barrier_data_confchg_entries; i++) { if (nodeid == barrier_data_process[i].nodeid) { barrier_data_process[i].completed = 1; log_printf (LOG_LEVEL_DEBUG, - "Barrier Start Received From %d\n", + "Barrier Start Recieved From %d\n", barrier_data_process[i].nodeid); break; } @@ -460,37 +397,36 @@ static void sync_deliver_fn ( /* * Test if barrier is complete */ - for (i = 0; i < current_members_cnt; i++) { + for (i = 0; i < barrier_data_confchg_entries; i++) { log_printf (LOG_LEVEL_DEBUG, "Barrier completion status for nodeid %d = %d. \n", barrier_data_process[i].nodeid, barrier_data_process[i].completed); - if (barrier_data_process[i].completed == 0) { barrier_completed = 0; } } - if (barrier_completed) { - log_printf (LOG_LEVEL_DEBUG, "Synchronization barrier completed\n"); - /* - * This sync is complete so activate and start next service sync - */ - if (sync_callbacks.sync_activate) { - log_printf (LOG_LEVEL_DEBUG, - "Committing synchronization for (%s)\n", - sync_callbacks.name); + log_printf (LOG_LEVEL_DEBUG, + "Synchronization barrier completed\n"); + } + /* + * This sync is complete so activate and start next service sync + */ + if (barrier_completed && sync_callbacks.sync_activate) { + sync_callbacks.sync_activate (); + + log_printf (LOG_LEVEL_DEBUG, + "Committing synchronization for (%s)\n", + sync_callbacks.name); + } - sync_callbacks.sync_activate (); - } - - /* - * Start synchronization if the barrier has completed - */ - for (i = 0; i < current_members_cnt; i++) { - barrier_data_process[i].nodeid = current_members[i]; - barrier_data_process[i].completed = 0; - } + /* + * Start synchronization if the barrier has completed + */ + if (barrier_completed) { + memcpy (barrier_data_process, barrier_data_confchg, + sizeof (barrier_data_confchg)); sync_callbacks_load(); @@ -502,15 +438,9 @@ static void sync_deliver_fn ( "Synchronization actions starting for (%s)\n", sync_callbacks.name); sync_service_init (&deliver_ring_id); - } else { - if (service_name != NULL) { - free (service_name); - service_name = NULL; - } } } -end: - LEAVE(""); + return; } static void sync_confchg_fn ( @@ -520,99 +450,26 @@ static void sync_confchg_fn ( unsigned int *joined_list, int joined_list_entries, struct memb_ring_id *ring_id) { - int i; - - ENTER(""); - - if (configuration_type != TOTEM_CONFIGURATION_REGULAR) { - LEAVE(""); - return; - } - - /* - * Save current members and ring ID for later use - */ - for (i = 0; i < member_list_entries; i++) { - current_members[i] = member_list[i]; - } - current_members_cnt = member_list_entries; sync_ring_id = ring_id; + if (configuration_type != TOTEM_CONFIGURATION_REGULAR) { + return; + } + if (sync_processing && sync_callbacks.sync_abort != NULL) { + sync_callbacks.sync_abort (); + sync_callbacks.sync_activate = NULL; + } /* - * If no virtual synchrony filter configured. + * If no virtual synchrony filter configured, then start + * synchronization process */ if (vsf_none == 1) { - /* - * If there is an ongoing synchronization, abort it. - */ - if (sync_processing && sync_callbacks.sync_abort) { - sync_callbacks.sync_abort(); - sync_callbacks.sync_activate = NULL; - sync_processing = 0; - if (service_name != NULL) { - free (service_name); - service_name = NULL; - } - } - - /* - * Start new synchronization process - */ sync_primary_callback_fn ( - member_list, member_list_entries, 1, ring_id); + member_list, + member_list_entries, + 1, + ring_id); } - LEAVE(""); -} - -/** - * TOTEM callback function used to multicast a sync_request - * message - * @param type - * @param _name - * - * @return int - */ -static int sync_request_send ( - enum totem_callback_token_type type, void *_name) -{ - int res; - char *name = _name; - sync_msg_t msg; - struct iovec iovec[2]; - int name_len; - - ENTER("'%s'", name); - - name_len = strlen (name) + 1; - msg.header.size = sizeof (msg) + name_len; - msg.header.id = MESSAGE_REQ_SYNC_REQUEST; - - memcpy (&msg.ring_id, sync_ring_id, sizeof (struct memb_ring_id)); - msg.sync_request.name_len = name_len; - - iovec[0].iov_base = (char *)&msg; - iovec[0].iov_len = sizeof (msg); - iovec[1].iov_base = _name; - iovec[1].iov_len = name_len; - - res = totempg_groups_mcast_joined ( - sync_group_handle, iovec, 2, TOTEMPG_AGREED); - - if (res == 0) { - /* - * We managed to multicast the message so delete the token callback - * for the sync request. - */ - totempg_callback_token_destroy (&sync_request_token_handle); - } - - /* - * if we failed to multicast the message, this function will be called - * again. - */ - - LEAVE(""); - return (0); } int sync_in_process (void) @@ -628,28 +485,3 @@ int sync_primary_designated (void) return (vsf_iface->primary()); } } - -/** - * Execute synchronization upon request for the named service - * @param name - * - * @return int - */ -int sync_request (char *name) -{ - assert (name != NULL); - - ENTER("'%s'", name); - - if (sync_processing) { - return -1; - } - - totempg_callback_token_create (&sync_request_token_handle, - TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */ - sync_request_send, name); - - LEAVE(""); - - return 0; -}