From f9f0931cb4b2ebd89f6525e70d566369f91bdd43 Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Fri, 10 Nov 2006 17:46:22 +0000 Subject: [PATCH] Update checkpoint service from all qualifications that have occured in whitetank branch. git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1299 fd59a12c-fef9-0310-b244-a6a79926bd2f --- exec/ckpt.c | 2302 +++++++++++++++++++++------------------------------ 1 file changed, 965 insertions(+), 1337 deletions(-) diff --git a/exec/ckpt.c b/exec/ckpt.c index 60643acc..c53cea87 100644 --- a/exec/ckpt.c +++ b/exec/ckpt.c @@ -79,8 +79,9 @@ enum ckpt_message_req_types { MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE = 8, MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE = 9, MESSAGE_REQ_EXEC_CKPT_SECTIONREAD = 10, - MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE = 11, - MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESECTION = 12 + MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINT = 11, + MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINTSECTION = 12, + MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINTREFCOUNT = 13 }; struct checkpoint_section { @@ -90,35 +91,66 @@ struct checkpoint_section { timer_handle expiration_timer; }; -struct ckpt_refcnt { - int count; +enum sync_state { + SYNC_STATE_CHECKPOINT, + SYNC_STATE_REFCOUNT +}; + +enum iteration_state { + ITERATION_STATE_CHECKPOINT, + ITERATION_STATE_SECTION +}; + +struct refcount_set { + unsigned int refcount; unsigned int nodeid; }; typedef struct { - int count __attribute__((aligned(8))); + unsigned int refcount __attribute__((aligned(8))); unsigned int nodeid __attribute__((aligned(8))); -} mar_ckpt_refcnt_t; +} mar_refcount_set_t; -static inline void marshall_to_mar_ckpt_refcnt_t ( - mar_ckpt_refcnt_t *dest, - struct ckpt_refcnt *src) +static inline void marshall_to_mar_refcount_set_t ( + mar_refcount_set_t *dest, + struct refcount_set *src) { - dest->count = src->count; + dest->refcount = src->refcount; dest->nodeid = src->nodeid; } -static inline void marshall_from_mar_ckpt_refcnt_t ( - struct ckpt_refcnt *dest, - mar_ckpt_refcnt_t *src) +static inline void marshall_to_mar_refcount_set_t_all ( + mar_refcount_set_t *dest, + struct refcount_set *src) { - dest->count = src->count; + unsigned int i; + for (i = 0; i < PROCESSOR_COUNT_MAX; i++) { + marshall_to_mar_refcount_set_t (&dest[i], &src[i]); + } +} + +static inline void marshall_from_mar_refcount_set_t ( + struct refcount_set *dest, + mar_refcount_set_t *src) +{ + dest->refcount = src->refcount; dest->nodeid = src->nodeid; } -static inline void swab_mar_ckpt_refcnt_t (mar_ckpt_refcnt_t *to_swab) +static inline void marshall_from_mar_refcount_set_t_all ( + struct refcount_set *dest, + mar_refcount_set_t *src) { - swab_mar_int32_t (&to_swab->count); + unsigned int i; + + for (i = 0; i < PROCESSOR_COUNT_MAX; i++) { + marshall_from_mar_refcount_set_t (&dest[i], &src[i]); + } +} + +static inline void swab_mar_refcount_set_t (mar_refcount_set_t *to_swab) +{ + swab_mar_uint32_t (&to_swab->refcount); swab_mar_uint32_t (&to_swab->nodeid); } @@ -128,22 +160,23 @@ struct checkpoint { mar_uint32_t ckpt_id; mar_ckpt_checkpoint_creation_attributes_t checkpoint_creation_attributes; struct list_head sections_list_head; - int referenceCount; + int reference_count; int unlinked; timer_handle retention_timer; - int expired; int active_replica_set; - int sectionCount; - struct ckpt_refcnt ckpt_refcnt[PROCESSOR_COUNT_MAX]; + int section_count; + struct refcount_set refcount_set[PROCESSOR_COUNT_MAX]; }; struct iteration_entry { - int active; - struct checkpoint_section *checkpoint_section; + char *section_id; + unsigned int section_id_len; }; struct iteration_instance { struct iteration_entry *iteration_entries; + mar_name_t checkpoint_name; + mar_uint32_t ckpt_id; int iteration_entries_count; unsigned int iteration_pos; }; @@ -166,6 +199,8 @@ static int ckpt_lib_exit_fn (void *conn); static int ckpt_lib_init_fn (void *conn); +static void ckpt_dump_fn (void); + static void message_handler_req_lib_ckpt_checkpointopen ( void *conn, void *msg); @@ -238,11 +273,15 @@ static void message_handler_req_exec_ckpt_checkpointopen ( void *message, unsigned int nodeid); -static void message_handler_req_exec_ckpt_synchronize_state ( +static void message_handler_req_exec_ckpt_sync_checkpoint ( void *message, unsigned int nodeid); -static void message_handler_req_exec_ckpt_synchronize_section ( +static void message_handler_req_exec_ckpt_sync_checkpoint_section ( + void *message, + unsigned int nodeid); + +static void message_handler_req_exec_ckpt_sync_checkpoint_refcount ( void *message, unsigned int nodeid); @@ -297,18 +336,24 @@ static void exec_ckpt_sectrionexpirationtimeset_endian_convert (void *msg); static void exec_ckpt_sectionwrite_endian_convert (void *msg); static void exec_ckpt_sectionoverwrite_endian_convert (void *msg); static void exec_ckpt_sectionread_endian_convert (void *msg); -static void exec_ckpt_synchronize_state_endian_convert (void *msg); -static void exec_ckpt_synchronize_section_endian_convert (void *msg); +static void exec_ckpt_sync_checkpoint_endian_convert (void *msg); +static void exec_ckpt_sync_checkpoint_section_endian_convert (void *msg); +static void exec_ckpt_sync_checkpoint_refcount_endian_convert (void *msg); -static void ckpt_recovery_activate (void); -static void ckpt_recovery_initialize (void); -static int ckpt_recovery_process (void); -static void ckpt_recovery_finalize (void); -static void ckpt_recovery_abort(void); -static void ckpt_recovery_process_members_exit ( - unsigned int *left_list, - int left_list_entries); -static void ckpt_replace_localhost_ip (unsigned int *joined_list); + +static void ckpt_sync_init (void); +static void ckpt_sync_activate (void); +static int ckpt_sync_process (void); +static void ckpt_sync_abort(void); + +static void sync_refcount_increment ( + struct checkpoint *checkpoint, unsigned int nodeid); + +static void sync_refcount_decrement ( + struct checkpoint *checkpoint, unsigned int nodeid); + +static void sync_refcount_calculate ( + struct checkpoint *checkpoint); void checkpoint_release (struct checkpoint *checkpoint); void timer_function_retention (void *data); @@ -316,60 +361,37 @@ unsigned int abstime_to_msec (mar_time_t time); void timer_function_section_expire (void *data); void clean_checkpoint_list(struct list_head* head); -static int recovery_checkpoint_open( - mar_name_t *checkpoint_name, - mar_uint32_t ckpt_id, - mar_ckpt_checkpoint_creation_attributes_t *ckptAttributes, - struct ckpt_refcnt *ref_cnt); - -static int recovery_section_create ( - mar_ckpt_section_descriptor_t *section_descriptor, - mar_name_t *checkpoint_name, - mar_uint32_t ckpt_id, - char* section_id); - -static int recovery_section_write( - int section_id_len, - char *section_id, - mar_name_t *checkpoint_name, - mar_uint32_t ckpt_id, - void *new_data, - mar_uint32_t data_offset, - mar_uint32_t data_size); - -static void dump_fn(void); - -static int process_localhost_transition = 0; - DECLARE_LIST_INIT(checkpoint_list_head); +DECLARE_LIST_INIT(sync_checkpoint_list_head); + DECLARE_LIST_INIT(checkpoint_iteration_list_head); DECLARE_LIST_INIT(checkpoint_recovery_list_head); -/* cluster wide synchronized checkpoint ID */ static mar_uint32_t global_ckpt_id = 0; +static enum sync_state my_sync_state; + +static enum iteration_state my_iteration_state; + +static struct list_head *my_iteration_state_checkpoint; + +static struct list_head *my_iteration_state_section; + +static unsigned int my_member_list[PROCESSOR_COUNT_MAX]; + +static unsigned int my_member_list_entries = 0; + +static unsigned int my_lowest_nodeid = 0; + struct checkpoint_cleanup { struct list_head list; - struct checkpoint checkpoint; + mar_name_t checkpoint_name; + mar_uint32_t ckpt_id; }; -typedef enum { - SYNCHRONY_STATE_STARTED, - SYNCHRONY_STATE_ENDED -}synchrony_state; - - -static synchrony_state recovery_state = SYNCHRONY_STATE_ENDED; - -static struct list_head *recovery_ckpt_next = 0; -static struct list_head *recovery_ckpt_section_next = 0; -static int recovery_section_data_offset = 0; -static int recovery_section_send_flag = 0; -static int recovery_abort = 0; - -static struct memb_ring_id saved_ring_id; +static struct memb_ring_id my_saved_ring_id; static void ckpt_confchg_fn ( enum totem_configuration_type configuration_type, @@ -535,12 +557,16 @@ static struct openais_exec_handler ckpt_exec_service[] = { .exec_endian_convert_fn = exec_ckpt_sectionread_endian_convert }, { - .exec_handler_fn = message_handler_req_exec_ckpt_synchronize_state, - .exec_endian_convert_fn = exec_ckpt_synchronize_state_endian_convert + .exec_handler_fn = message_handler_req_exec_ckpt_sync_checkpoint, + .exec_endian_convert_fn = exec_ckpt_sync_checkpoint_endian_convert }, { - .exec_handler_fn = message_handler_req_exec_ckpt_synchronize_section, - .exec_endian_convert_fn = exec_ckpt_synchronize_section_endian_convert + .exec_handler_fn = message_handler_req_exec_ckpt_sync_checkpoint_section, + .exec_endian_convert_fn = exec_ckpt_sync_checkpoint_section_endian_convert + }, + { + .exec_handler_fn = message_handler_req_exec_ckpt_sync_checkpoint_refcount, + .exec_endian_convert_fn = exec_ckpt_sync_checkpoint_refcount_endian_convert } }; @@ -554,14 +580,14 @@ struct openais_service_handler ckpt_service_handler = { .lib_service = ckpt_lib_service, .lib_service_count = sizeof (ckpt_lib_service) / sizeof (struct openais_lib_handler), .exec_init_fn = ckpt_exec_init_fn, - .exec_dump_fn = dump_fn, + .exec_dump_fn = ckpt_dump_fn, .exec_service = ckpt_exec_service, .exec_service_count = sizeof (ckpt_exec_service) / sizeof (struct openais_exec_handler), .confchg_fn = ckpt_confchg_fn, - .sync_init = ckpt_recovery_initialize, - .sync_process = ckpt_recovery_process, - .sync_activate = ckpt_recovery_activate, - .sync_abort = ckpt_recovery_abort, + .sync_init = ckpt_sync_init, + .sync_process = ckpt_sync_process, + .sync_activate = ckpt_sync_activate, + .sync_abort = ckpt_sync_abort, }; /* @@ -703,656 +729,38 @@ struct req_exec_ckpt_sectionread { mar_offset_t data_size __attribute__((aligned(8))); }; -struct req_exec_ckpt_synchronize_state { +struct req_exec_ckpt_sync_checkpoint { mar_req_header_t header __attribute__((aligned(8))); - struct memb_ring_id previous_ring_id __attribute__((aligned(8))); + struct memb_ring_id ring_id __attribute__((aligned(8))); mar_name_t checkpoint_name __attribute__((aligned(8))); mar_uint32_t ckpt_id __attribute__((aligned(8))); mar_ckpt_checkpoint_creation_attributes_t checkpoint_creation_attributes __attribute__((aligned(8))); - mar_ckpt_section_descriptor_t section_descriptor __attribute__((aligned(8))); - mar_uint32_t nodeid __attribute__((aligned(8))); - mar_ckpt_refcnt_t ckpt_refcnt[PROCESSOR_COUNT_MAX] __attribute__((aligned(8))); + mar_uint32_t checkpoint_creation_attributes_set __attribute__((aligned(8))); + mar_uint32_t active_replica_set __attribute__((aligned(8))); + mar_uint32_t unlinked __attribute__((aligned(8))); }; -struct req_exec_ckpt_synchronize_section { +struct req_exec_ckpt_sync_checkpoint_section { mar_req_header_t header __attribute__((aligned(8))); - struct memb_ring_id previous_ring_id __attribute__((aligned(8))); + struct memb_ring_id ring_id __attribute__((aligned(8))); mar_name_t checkpoint_name __attribute__((aligned(8))); mar_uint32_t ckpt_id __attribute__((aligned(8))); mar_uint32_t id_len __attribute__((aligned(8))); - mar_offset_t data_offset __attribute__((aligned(8))); - mar_offset_t data_size __attribute__((aligned(8))); + mar_time_t expiration_time __attribute__((aligned(8))); + mar_uint32_t section_size __attribute__((aligned(8))); +}; + +struct req_exec_ckpt_sync_checkpoint_refcount { + mar_req_header_t header __attribute__((aligned(8))); + struct memb_ring_id ring_id __attribute__((aligned(8))); + mar_name_t checkpoint_name __attribute__((aligned(8))); + mar_uint32_t ckpt_id __attribute__((aligned(8))); + mar_refcount_set_t refcount_set[PROCESSOR_COUNT_MAX] __attribute__((aligned(8))); }; /* * Implementation */ -static int processor_index_set( - unsigned int nodeid, - struct ckpt_refcnt *ckpt_refcnt) -{ - int i; - for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { - if (ckpt_refcnt[i].nodeid == 0) { - /* - * If the source addresses do not match and this element - * has no stored value then store the new value and - * return the Index. - */ - ckpt_refcnt[i].nodeid = nodeid; - return i; - } - /* - * If the source addresses match then this processor index - * has already been set - */ - else - if (ckpt_refcnt[i].nodeid == nodeid) { - return -1; - } - - } - /* - * Could not Find an empty slot - * to store the new Processor. - */ - for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { - log_printf (LOG_LEVEL_ERROR,"Processor Set: Index %d has proc %s and count %d\n", - i, - totempg_ifaces_print (ckpt_refcnt[i].nodeid), - ckpt_refcnt[i].count); - } - - return -1; -} - -static int processor_add ( - unsigned int nodeid, - int count, - struct ckpt_refcnt *ckpt_refcnt) -{ - int i; - for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { - if (ckpt_refcnt[i].nodeid == 0) { - log_printf (LOG_LEVEL_DEBUG,"processor_add found empty slot to insert new item\n"); - ckpt_refcnt[i].nodeid = nodeid; - ckpt_refcnt[i].count = count; - return i; - } - /*Dont know how we missed this in the processor find but update this*/ - else - if (ckpt_refcnt[i].nodeid == nodeid) { - ckpt_refcnt[i].count += count; - log_printf (LOG_LEVEL_DEBUG,"processor_add for existent proc. nodeid %s, New count = %d\n", - totempg_ifaces_print (ckpt_refcnt[i].nodeid), - ckpt_refcnt[i].count); - - return i; - } - } - /* - * Could not Find an empty slot - * to store the new Processor. - */ - log_printf (LOG_LEVEL_ERROR,"Processor Add Failed. Dumping Refcount Array\n"); - for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { - log_printf (LOG_LEVEL_ERROR,"Processor Add: Index %d has proc %s and count %d\n", - i, - totempg_ifaces_print (ckpt_refcnt[i].nodeid), - ckpt_refcnt[i].count); - } - return -1; - -} - -static int processor_index_find( - unsigned int nodeid, - struct ckpt_refcnt *ckpt_refcnt) -{ - int i; - for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { - /* - * If the source addresses match then return the index - */ - - if (ckpt_refcnt[i].nodeid == nodeid) { - return i; - } - } - /* - * Could not Find the Processor - */ - return -1; -} - -static int ckpt_refcnt_total(struct ckpt_refcnt *ckpt_refcnt) -{ - int i; - int total = 0; - for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { - total += ckpt_refcnt[i].count; - } - return total; -} - -static void initialize_ckpt_refcnt_array ( - struct ckpt_refcnt *ckpt_refcnt) -{ - memset((char*)ckpt_refcnt, 0, - PROCESSOR_COUNT_MAX * sizeof(struct ckpt_refcnt)); -} - -static void merge_ckpt_refcnts ( - struct ckpt_refcnt *local, - struct ckpt_refcnt *network) -{ - int index,i; - - for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { - index = processor_index_find (local[i].nodeid, network); - if (index == -1) { /*Could Not Find the Local Entry in the remote.Add to it*/ - log_printf (LOG_LEVEL_DEBUG,"calling processor_add for nodeid %s, count %d\n", - totempg_ifaces_print (local[i].nodeid), - local[i].count); - index = processor_add (local[i].nodeid, local[i].count, network); - if (index == -1) { - log_printf(LOG_LEVEL_ERROR, - "merge_ckpt_refcnts : could not add a new processor as the MAX limit of procs is reached.Exiting\n"); - assert(0); - } - } - else { - if (local[i].count == network[index].count) { - /*Nothing to do here as the network is already up 2 date*/ - log_printf (LOG_LEVEL_DEBUG,"merge_ckpt_refcnts counts match, continue\n"); - continue; - } - else { - /*Found a match for this proc in the Network choose the larger of the 2.*/ - network[index].count += local[i].count; - log_printf (LOG_LEVEL_DEBUG,"setting count for nodeid %s = %d\n", - totempg_ifaces_print (network[index].nodeid), - network[index].count); - } - } - } -} - - -static void ckpt_recovery_initialize (void) -{ - struct list_head *checkpoint_list; - struct list_head *checkpoint_section_list; - struct checkpoint *checkpoint; - struct checkpoint_section *section; - struct checkpoint *savedCheckpoint; - struct checkpoint_section *savedSection; - - if (recovery_abort) { /*Abort was called.*/ - return; - } - - /* - * Save off the existing Checkpoints to be used by ckpt_recovery_process - */ - for (checkpoint_list = checkpoint_list_head.next; - checkpoint_list != &checkpoint_list_head; - checkpoint_list = checkpoint_list->next) { - - checkpoint = list_entry (checkpoint_list, - struct checkpoint, list); - - if (checkpoint->referenceCount < 1) { /*defect 1192*/ - log_printf (LOG_LEVEL_DEBUG, "ckpt_recovery_initialize checkpoint %s has referenceCount < 1 ignoring.\n", - checkpoint->name.value); - continue; - } - savedCheckpoint = - (struct checkpoint *) malloc (sizeof(struct checkpoint)); - assert(savedCheckpoint); - memcpy(savedCheckpoint, checkpoint, sizeof(struct checkpoint)); - list_init(&savedCheckpoint->list); - list_add_tail(&savedCheckpoint->list,&checkpoint_recovery_list_head); - list_init(&savedCheckpoint->sections_list_head); - for (checkpoint_section_list = checkpoint->sections_list_head.next; - checkpoint_section_list != &checkpoint->sections_list_head; - checkpoint_section_list = checkpoint_section_list->next) { - section = list_entry (checkpoint_section_list, - struct checkpoint_section, list); - savedSection = - (struct checkpoint_section *) malloc (sizeof(struct checkpoint_section)); - assert(savedSection); - openais_timer_delete_data (section->expiration_timer); - memcpy(savedSection, section, sizeof(struct checkpoint_section)); - list_init(&savedSection->list); - list_add_tail(&savedSection->list,&savedCheckpoint->sections_list_head); - } - } - - if (list_empty (&checkpoint_recovery_list_head)) { - return; - } - recovery_ckpt_next = checkpoint_recovery_list_head.next; - savedCheckpoint = list_entry (recovery_ckpt_next, - struct checkpoint, list); - recovery_ckpt_section_next = savedCheckpoint->sections_list_head.next; -} - -static int ckpt_recovery_process (void) -{ - struct req_exec_ckpt_synchronize_state request_exec_sync_state; - struct req_exec_ckpt_synchronize_section request_exec_sync_section; - struct iovec iovecs[3]; - struct checkpoint *checkpoint; - struct checkpoint_section *checkpoint_section; - mar_size_t origSectionSize; - mar_size_t newSectionSize; - int res; - unsigned int i; - - if (recovery_abort) { /*Abort was called.*/ - goto recovery_exit_clean; - } - /*So Initialize did not have any checkpoints to Synchronize*/ - if ((recovery_ckpt_next == 0) && (recovery_ckpt_section_next == 0)) { - log_printf (LOG_LEVEL_DEBUG, "ckpt_recovery_process Nothing to Process ...\n"); - goto recovery_exit_clean; - } - - /* - * ALGORITHM : - * 1.) extract the checkpoint if there. - * 2.) If there is a checkpoint then there has to be a section - * 3.) If the recovery_section_send_flag was not set in the previous - * invocation that means we have to send out a sync_msg before - * we send out the sections - * 4.) Set the recovery_section_send_flag and send the sections. - */ - - while (1) { /*Go for as long as the oubound queue is not full*/ - - if(recovery_ckpt_next != &checkpoint_recovery_list_head) { - checkpoint = list_entry (recovery_ckpt_next, - struct checkpoint, list); - if (recovery_ckpt_section_next == 0) { - recovery_ckpt_section_next = checkpoint->sections_list_head.next; - } - if (recovery_ckpt_section_next != &checkpoint->sections_list_head) { - checkpoint_section = list_entry (recovery_ckpt_section_next, - struct checkpoint_section, list); - - /* - * None of the section data msgs have been sent - * so lets start with sending the sync_msg - */ - if (recovery_section_send_flag == 0) { - if (checkpoint_section->section_descriptor.section_id.id) { - log_printf (LOG_LEVEL_DEBUG, "New Sync State Message for ckpt = %s, section = %s.\n", - checkpoint->name.value, - (char *)checkpoint_section->section_descriptor.section_id.id); - } else { - log_printf (LOG_LEVEL_DEBUG, "New Sync State Message for ckpt = %s, section = default section.\n", - checkpoint->name.value); - } - request_exec_sync_state.header.size = sizeof (struct req_exec_ckpt_synchronize_state); - request_exec_sync_state.header.id = - SERVICE_ID_MAKE (CKPT_SERVICE, - MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE); - memcpy(&request_exec_sync_state.previous_ring_id, &saved_ring_id, sizeof(struct memb_ring_id)); - memcpy(&request_exec_sync_state.checkpoint_name, - &checkpoint->name, - sizeof(mar_name_t)); - memcpy(&request_exec_sync_state.checkpoint_creation_attributes, - &checkpoint->checkpoint_creation_attributes, - sizeof(mar_ckpt_checkpoint_creation_attributes_t)); - memcpy(&request_exec_sync_state.section_descriptor, - &checkpoint_section->section_descriptor, - sizeof(mar_ckpt_section_descriptor_t)); - - request_exec_sync_state.ckpt_id = checkpoint->ckpt_id; - request_exec_sync_state.nodeid = this_ip->nodeid; - - for (i = 0; i < PROCESSOR_COUNT_MAX; i++) { - - marshall_to_mar_ckpt_refcnt_t ( - &request_exec_sync_state.ckpt_refcnt[i], - &checkpoint->ckpt_refcnt[i]); - } - request_exec_sync_state.section_descriptor.section_id.id = 0; - - log_printf (LOG_LEVEL_DEBUG, "New Sync State Message Values\n"); - for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { - if (request_exec_sync_state.ckpt_refcnt[i].nodeid) { - log_printf (LOG_LEVEL_DEBUG,"Index %d has proc %s and count %d\n", - i, - totempg_ifaces_print (request_exec_sync_state.ckpt_refcnt[i].nodeid), - request_exec_sync_state.ckpt_refcnt[i].count); - } - } - - iovecs[0].iov_base = (char *)&request_exec_sync_state; - iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_state); - - /* - * Populate the Section ID - */ - iovecs[1].iov_base = ((char*)checkpoint_section->section_descriptor.section_id.id); - iovecs[1].iov_len = checkpoint_section->section_descriptor.section_id.id_len; - request_exec_sync_state.header.size += iovecs[1].iov_len; - - /* - * Check to see if we can queue the new message and if you can - * then mcast the message else break and create callback. - */ - res = totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED); - if (res == 0) { - log_printf (LOG_LEVEL_DEBUG, "Multicasted Sync State Message.\n"); - } - else { - log_printf (LOG_LEVEL_DEBUG, "Sync State Message Outbound Queue full need to Wait for Callback.\n"); - return (1); - } - recovery_section_send_flag = 1; - } - - origSectionSize = checkpoint_section->section_descriptor.section_size; - newSectionSize = 0; - - /* - * Now Create SyncSection messsages in chunks of CKPT_MAX_SECTION_DATA_SEND or less - */ - while (recovery_section_data_offset < origSectionSize) { - /* - * Send a Max of CKPT_MAX_SECTION_DATA_SEND of section data - */ - if ((origSectionSize - recovery_section_data_offset) > CKPT_MAX_SECTION_DATA_SEND) { - newSectionSize = CKPT_MAX_SECTION_DATA_SEND; - } - else { - newSectionSize = (origSectionSize - recovery_section_data_offset); - } - - /* - * Create and save a new Sync Section message. - */ - - request_exec_sync_section.header.size = sizeof (struct req_exec_ckpt_synchronize_section); - request_exec_sync_section.header.id = - SERVICE_ID_MAKE (CKPT_SERVICE, - MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESECTION); - memcpy (&request_exec_sync_section.previous_ring_id, &saved_ring_id, sizeof(struct memb_ring_id)); - memcpy (&request_exec_sync_section.checkpoint_name, &checkpoint->name, sizeof(mar_name_t)); - request_exec_sync_section.id_len = - checkpoint_section->section_descriptor.section_id.id_len; - memcpy (&request_exec_sync_section.data_offset, &recovery_section_data_offset, sizeof(mar_uint32_t)); - memcpy (&request_exec_sync_section.data_size, &newSectionSize, sizeof(mar_uint32_t)); - if (checkpoint_section->section_descriptor.section_id.id) { - log_printf (LOG_LEVEL_DEBUG, "New Sync Section Message for ckpt = %s, section = %s, Data size = %d.\n", - checkpoint->name.value, - (char *)checkpoint_section->section_descriptor.section_id.id, - (int)newSectionSize); - } else { - log_printf (LOG_LEVEL_DEBUG, "New Sync Section Message for ckpt = %s, default section, Data size = %d.\n", - checkpoint->name.value, - (int)newSectionSize); - } - /* - * Populate the Sync Section Request - */ - iovecs[0].iov_base = (char *)&request_exec_sync_section; - iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_section); - - /* - * Populate the Section ID - */ - iovecs[1].iov_base = ((char*)checkpoint_section->section_descriptor.section_id.id); - iovecs[1].iov_len = checkpoint_section->section_descriptor.section_id.id_len; - request_exec_sync_section.header.size += iovecs[1].iov_len; - - /* - * Populate the Section Data. - */ - iovecs[2].iov_base = ((char*)checkpoint_section->section_data + recovery_section_data_offset); - iovecs[2].iov_len = newSectionSize; - request_exec_sync_section.header.size += iovecs[2].iov_len; - request_exec_sync_section.ckpt_id = checkpoint->ckpt_id; - - /* - * Check to see if we can queue the new message and if you can - * then mcast the message else break and create callback. - */ - - res = totempg_groups_mcast_joined (openais_group_handle, iovecs, 3, TOTEMPG_AGREED); - if (res == 0) { - log_printf (LOG_LEVEL_DEBUG, "Multicasted Sync Section Message.\n"); - } else { - log_printf (LOG_LEVEL_DEBUG, "Sync Section Message Outbound Queue full need to Wait for Callback.\n"); - return (1); - } - - recovery_section_data_offset += newSectionSize; - } - recovery_section_send_flag = 0; - recovery_section_data_offset = 0; - recovery_ckpt_section_next = recovery_ckpt_section_next->next; - continue; - } - else { - /* - * We have reached the end of a section List. - * Move to the next element in the ckpt list. - * Init the section ptr to 0 so it is re evaled - */ - recovery_ckpt_next = recovery_ckpt_next->next; - recovery_ckpt_section_next = 0; - continue; - } - } - /*Should only be here at the end of the traversal of the ckpt list*/ - ckpt_recovery_finalize(); -recovery_exit_clean: - /*Re - Initialize the static's*/ - recovery_ckpt_next = 0; - recovery_ckpt_section_next = 0; - recovery_section_data_offset = 0; - recovery_section_send_flag = 0; - recovery_abort = 0; - - return (0); - } -} - -static void ckpt_recovery_finalize (void) -{ - struct list_head *checkpoint_list; - struct list_head *checkpoint_section_list; - struct checkpoint *checkpoint; - struct checkpoint_section *section; - struct ckpt_identifier *ckpt_id; - - /* - * Remove All elements from old checkpoint - * list - */ - checkpoint_list = checkpoint_list_head.next; - while (!list_empty(&checkpoint_list_head)) { - checkpoint = list_entry (checkpoint_list, - struct checkpoint, list); - - checkpoint_section_list = checkpoint->sections_list_head.next; - while (!list_empty(&checkpoint->sections_list_head)) { - section = list_entry (checkpoint_section_list, - struct checkpoint_section, list); - - list_del (§ion->list); - log_printf (LOG_LEVEL_DEBUG, "ckpt_recovery_finalize removed 0x%p.\n", section); - free (section); - checkpoint_section_list = checkpoint->sections_list_head.next; - } - list_del(&checkpoint->list); - free(checkpoint); - checkpoint_list = checkpoint_list_head.next; - } - - /* - * Initialize the old list again. - */ - list_init(&checkpoint_list_head); - - /* - * Copy the contents of the new list_head into the old list head - */ - checkpoint_recovery_list_head.prev->next = &checkpoint_list_head; - checkpoint_recovery_list_head.next->prev = &checkpoint_list_head; - memcpy(&checkpoint_list_head, &checkpoint_recovery_list_head, sizeof(struct list_head)); - - /*Timers might have been started before recovery happened .. restart them ..*/ - for (checkpoint_list = checkpoint_list_head.next; - checkpoint_list != &checkpoint_list_head; - checkpoint_list = checkpoint_list->next) { - - checkpoint = list_entry (checkpoint_list, - struct checkpoint, list); - - for (checkpoint_section_list = checkpoint->sections_list_head.next; - checkpoint_section_list != &checkpoint->sections_list_head; - checkpoint_section_list = checkpoint_section_list->next) { - section = list_entry (checkpoint_section_list, - struct checkpoint_section, list); - - if (section->section_descriptor.expiration_time != SA_TIME_END) { - ckpt_id = malloc (sizeof(struct ckpt_identifier)); - assert(ckpt_id); - memcpy(&ckpt_id->ckpt_name,&checkpoint->name,sizeof(mar_name_t)); - memcpy(&ckpt_id->ckpt_section_id, §ion->section_descriptor.section_id,sizeof(mar_ckpt_section_id_t)); - - openais_timer_add ( - abstime_to_msec (section->section_descriptor.expiration_time), - ckpt_id, - timer_function_section_expire, - §ion->expiration_timer); - log_printf (LOG_LEVEL_DEBUG, "ckpt_recovery_initialize expiration timer = 0x%p\n", - section->expiration_timer); - } - } - } - - - /* - * Initialize the new list head for reuse. - */ - list_init(&checkpoint_recovery_list_head); - log_printf (LOG_LEVEL_DEBUG, "ckpt_recovery_finalize Done.\n"); - -} - -static void ckpt_recovery_activate (void) -{ - recovery_state = SYNCHRONY_STATE_ENDED; - return; -} - -static void ckpt_recovery_abort (void) -{ - recovery_abort = 1; - return; -} - -static void ckpt_replace_localhost_ip (unsigned int *joined_list) { - struct list_head *checkpoint_list; - struct checkpoint *checkpoint; - unsigned int localhost_nodeid = 0; - int index; - - assert(joined_list); - - for (checkpoint_list = checkpoint_list_head.next; - checkpoint_list != &checkpoint_list_head; - checkpoint_list = checkpoint_list->next) { - - checkpoint = list_entry (checkpoint_list, - struct checkpoint, list); - index = processor_index_find(localhost_nodeid, checkpoint->ckpt_refcnt); - if (index == -1) { - continue; - } - checkpoint->ckpt_refcnt[index].nodeid = joined_list[0]; - log_printf (LOG_LEVEL_DEBUG, "Transitioning From Local Host replacing 127.0.0.1 with %x ...\n", - joined_list[0]); - - } - process_localhost_transition = 0; -} - - -static void ckpt_recovery_process_members_exit ( - unsigned int *left_list, - int left_list_entries) -{ - struct list_head *checkpoint_list; - struct checkpoint *checkpoint; - unsigned int *member_nodeid; - unsigned int localhost_nodeid; - int index; - int i; - - localhost_nodeid = 0; // TODO - - if (left_list_entries == 0) { - return; - } -// TODO this is wrong - if ((left_list_entries == 1) && - *left_list == localhost_nodeid) { - process_localhost_transition = 1; - return; - } - - /* - * Iterate left_list_entries. - */ - member_nodeid = left_list; - for (i = 0; i < left_list_entries; i++) { - checkpoint_list = checkpoint_list_head.next; - -iterate_while_loop: - while (checkpoint_list != &checkpoint_list_head) { - checkpoint = list_entry (checkpoint_list, - struct checkpoint, list); - assert (checkpoint > 0); - index = processor_index_find(*member_nodeid, - checkpoint->ckpt_refcnt); - assert (-1 <= index); - assert (index < PROCESSOR_COUNT_MAX); - if (index < 0) { - checkpoint_list = checkpoint_list->next; - goto iterate_while_loop; - } - /* - * Decrement - * - */ - if (checkpoint->referenceCount > 0) { /*defect 1192*/ - checkpoint->referenceCount -= checkpoint->ckpt_refcnt[index].count; - log_printf (LOG_LEVEL_DEBUG, "ckpt_recovery_process_members_exit: refCount for %s = %d.\n", - checkpoint->name.value,checkpoint->referenceCount); - assert (checkpoint->referenceCount > 0);/*defect 1192*/ - } else { - log_printf (LOG_LEVEL_ERROR, "ckpt_recovery_process_members_exit: refCount for %s = %d.\n", - checkpoint->name.value,checkpoint->referenceCount); - assert(0); - } - checkpoint->ckpt_refcnt[index].count = 0; - checkpoint->ckpt_refcnt[index].nodeid = 0; - checkpoint_list = checkpoint_list->next; - } - member_nodeid++; - } - - clean_checkpoint_list(&checkpoint_list_head); - - return; -} void clean_checkpoint_list(struct list_head *head) { @@ -1360,7 +768,7 @@ void clean_checkpoint_list(struct list_head *head) struct checkpoint *checkpoint; if (list_empty(head)) { - log_printf (LOG_LEVEL_NOTICE, "clean_checkpoint_list: List is empty \n"); + log_printf (LOG_LEVEL_DEBUG, "clean_checkpoint_list: List is empty \n"); return; } @@ -1373,16 +781,16 @@ void clean_checkpoint_list(struct list_head *head) /* * If checkpoint has been unlinked and this is the last reference, delete it */ - if (checkpoint->unlinked && checkpoint->referenceCount == 1) { /*defect 1129*/ - log_printf (LOG_LEVEL_NOTICE,"clean_checkpoint_list: deallocating checkpoint %s.\n", + if (checkpoint->unlinked && checkpoint->reference_count == 0) { + log_printf (LOG_LEVEL_DEBUG,"clean_checkpoint_list: deallocating checkpoint %s.\n", checkpoint->name.value); checkpoint_list = checkpoint_list->next; checkpoint_release (checkpoint); continue; } - else if ((checkpoint->expired == 0) && (checkpoint->referenceCount == 1)) { /*defect 1192*/ - log_printf (LOG_LEVEL_NOTICE, "clean_checkpoint_list: Starting timer to release checkpoint %s.\n", + else if (checkpoint->reference_count == 0) { + log_printf (LOG_LEVEL_DEBUG, "clean_checkpoint_list: Starting timer to release checkpoint %s.\n", checkpoint->name.value); openais_timer_delete (checkpoint->retention_timer); openais_timer_add ( @@ -1402,34 +810,56 @@ static void ckpt_confchg_fn ( unsigned int *joined_list, int joined_list_entries, struct memb_ring_id *ring_id) { - if (configuration_type == TOTEM_CONFIGURATION_REGULAR) { - if (recovery_state == SYNCHRONY_STATE_ENDED) { - memcpy (&saved_ring_id, ring_id, sizeof(struct memb_ring_id)); - } - if (process_localhost_transition) { - ckpt_replace_localhost_ip (joined_list); + unsigned int i, j; + + /* + * Determine lowest nodeid in old regular configuration for the + * purpose of executing the synchronization algorithm + */ + if (configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) { + for (i = 0; i < left_list_entries; i++) { + for (j = 0; j < my_member_list_entries; j++) { + if (left_list[i] == my_member_list[j]) { + my_member_list[j] = 0; + } + } + } + } + + my_lowest_nodeid = 0xffffffff; + for (i = 0; i < my_member_list_entries; i++) { + if ((my_member_list[i] != 0) && + (my_member_list[i] < my_lowest_nodeid)) { + + my_lowest_nodeid = my_member_list[i]; } } - else if (configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) { - ckpt_recovery_process_members_exit(left_list, left_list_entries); - recovery_state = SYNCHRONY_STATE_STARTED; - recovery_abort = 0; + /* + * Handle regular configuration + */ + if (configuration_type == TOTEM_CONFIGURATION_REGULAR) { + memcpy (my_member_list, member_list, + sizeof (unsigned int) * member_list_entries); + my_member_list_entries = member_list_entries; + memcpy (&my_saved_ring_id, ring_id, + sizeof (struct memb_ring_id)); } } static struct checkpoint *checkpoint_find ( + struct list_head *ckpt_list_head, mar_name_t *name, mar_uint32_t ckpt_id) { - struct list_head *checkpoint_list; + struct list_head *list; struct checkpoint *checkpoint; - for (checkpoint_list = checkpoint_list_head.next; - checkpoint_list != &checkpoint_list_head; - checkpoint_list = checkpoint_list->next) { + for (list = ckpt_list_head->next; + list != ckpt_list_head; + list = list->next) { - checkpoint = list_entry (checkpoint_list, + checkpoint = list_entry (list, struct checkpoint, list); if (mar_name_match (name, &checkpoint->name) && @@ -1441,17 +871,17 @@ static struct checkpoint *checkpoint_find ( } static struct checkpoint *checkpoint_find_linked ( + struct list_head *ckpt_list_head, mar_name_t *name) { - struct list_head *checkpoint_list; + struct list_head *list; struct checkpoint *checkpoint; - for (checkpoint_list = checkpoint_list_head.next; - checkpoint_list != &checkpoint_list_head; - checkpoint_list = checkpoint_list->next) { + for (list = ckpt_list_head->next; + list != ckpt_list_head; + list = list->next) { - checkpoint = list_entry (checkpoint_list, - struct checkpoint, list); + checkpoint = list_entry (list, struct checkpoint, list); if (mar_name_match (name, &checkpoint->name) && checkpoint->unlinked == 0) { @@ -1460,9 +890,33 @@ static struct checkpoint *checkpoint_find_linked ( } return (0); } + +static struct checkpoint *checkpoint_find_specific ( + struct list_head *ckpt_list_head, + mar_name_t *name, + mar_uint32_t ckpt_id) +{ + struct list_head *list; + struct checkpoint *checkpoint; + + for (list = ckpt_list_head->next; + list != ckpt_list_head; + list = list->next) { + + checkpoint = list_entry (list, struct checkpoint, list); + + if (mar_name_match (name, &checkpoint->name) && + (ckpt_id == checkpoint->ckpt_id)) { + return (checkpoint); + } + } + return (0); +} + static void ckpt_checkpoint_remove_cleanup ( void *conn, - struct checkpoint *checkpoint) + mar_name_t checkpoint_name, + mar_uint32_t ckpt_id) { struct list_head *list; struct checkpoint_cleanup *checkpoint_cleanup; @@ -1473,8 +927,10 @@ static void ckpt_checkpoint_remove_cleanup ( list = list->next) { checkpoint_cleanup = list_entry (list, struct checkpoint_cleanup, list); - if (mar_name_match (&checkpoint_cleanup->checkpoint.name, &checkpoint->name) - || (checkpoint_cleanup->checkpoint.name.length == 0)) { + if (mar_name_match (&checkpoint_cleanup->checkpoint_name, + &checkpoint_name) && + (checkpoint_cleanup->ckpt_id == ckpt_id)) { + list_del (&checkpoint_cleanup->list); free (checkpoint_cleanup); return; @@ -1503,7 +959,8 @@ static struct checkpoint_section *checkpoint_section_find ( checkpoint_section = list_entry (checkpoint_section_list, struct checkpoint_section, list); if (checkpoint_section->section_descriptor.section_id.id_len) { - log_printf (LOG_LEVEL_DEBUG, "Checking section id %*s\n", + log_printf (LOG_LEVEL_DEBUG, "Checking section id %d %*s\n", + checkpoint_section->section_descriptor.section_id.id_len, checkpoint_section->section_descriptor.section_id.id_len, checkpoint_section->section_descriptor.section_id.id); } @@ -1586,20 +1043,20 @@ void checkpoint_release (struct checkpoint *checkpoint) struct checkpoint_section, list); list = list->next; - checkpoint->sectionCount -= 1; + checkpoint->section_count -= 1; checkpoint_section_release (section); } list_del (&checkpoint->list); free (checkpoint); } -int ckpt_checkpoint_close (struct checkpoint *checkpoint) { +int ckpt_checkpoint_close ( + mar_name_t *checkpoint_name, + mar_uint32_t ckpt_id) +{ struct req_exec_ckpt_checkpointclose req_exec_ckpt_checkpointclose; struct iovec iovec; - if (checkpoint->expired == 1) { - return (0); - } req_exec_ckpt_checkpointclose.header.size = sizeof (struct req_exec_ckpt_checkpointclose); req_exec_ckpt_checkpointclose.header.id = @@ -1607,18 +1064,15 @@ int ckpt_checkpoint_close (struct checkpoint *checkpoint) { MESSAGE_REQ_EXEC_CKPT_CHECKPOINTCLOSE); memcpy (&req_exec_ckpt_checkpointclose.checkpoint_name, - &checkpoint->name, sizeof (mar_name_t)); - req_exec_ckpt_checkpointclose.ckpt_id = checkpoint->ckpt_id; + checkpoint_name, sizeof (mar_name_t)); + req_exec_ckpt_checkpointclose.ckpt_id = ckpt_id; memset (&req_exec_ckpt_checkpointclose.source, 0, sizeof (mar_message_source_t)); iovec.iov_base = (char *)&req_exec_ckpt_checkpointclose; iovec.iov_len = sizeof (req_exec_ckpt_checkpointclose); - if (totempg_groups_send_ok_joined (openais_group_handle, &iovec, 1)) { - assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0); - return (0); - } + assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0); return (-1); } @@ -1627,12 +1081,6 @@ static int ckpt_exec_init_fn (struct objdb_iface_ver0 *objdb) { log_init ("CKPT"); - /* - * Initialize the saved ring ID. - */ - saved_ring_id.seq = 0; - totemip_copy(&saved_ring_id.rep, this_ip); - return (0); } @@ -1768,34 +1216,46 @@ static void exec_ckpt_sectionread_endian_convert (void *msg) swab_mar_offset_t (&req_exec_ckpt_sectionread->data_size); } -static void exec_ckpt_synchronize_state_endian_convert (void *msg) +static void exec_ckpt_sync_checkpoint_endian_convert (void *msg) { - struct req_exec_ckpt_synchronize_state *req_exec_ckpt_synchronize_state = (struct req_exec_ckpt_synchronize_state *)msg; +} +static void exec_ckpt_sync_checkpoint_section_endian_convert (void *msg) +{ +} +static void exec_ckpt_sync_checkpoint_refcount_endian_convert (void *msg) +{ +} + +#ifdef ABC +static void exec_ckpt_sync_state_endian_convert (void *msg) +{ + struct req_exec_ckpt_sync_state *req_exec_ckpt_sync_state = (struct req_exec_ckpt_sync_state *)msg; unsigned int i; - swab_mar_req_header_t (&req_exec_ckpt_synchronize_state->header); -// swab_mar_memb_ring_id_t (&req_exec_ckpt_synchronize_state->memb_ring_id); - swab_mar_name_t (&req_exec_ckpt_synchronize_state->checkpoint_name); - swab_mar_uint32_t (&req_exec_ckpt_synchronize_state->ckpt_id); - swab_mar_ckpt_checkpoint_creation_attributes_t (&req_exec_ckpt_synchronize_state->checkpoint_creation_attributes); - swab_mar_ckpt_section_descriptor_t (&req_exec_ckpt_synchronize_state->section_descriptor); - swab_mar_uint32_t (&req_exec_ckpt_synchronize_state->nodeid); + swab_mar_req_header_t (&req_exec_ckpt_sync_state->header); +// swab_mar_memb_ring_id_t (&req_exec_ckpt_sync_state->memb_ring_id); + swab_mar_name_t (&req_exec_ckpt_sync_state->checkpoint_name); + swab_mar_uint32_t (&req_exec_ckpt_sync_state->ckpt_id); + swab_mar_ckpt_checkpoint_creation_attributes_t (&req_exec_ckpt_sync_state->checkpoint_creation_attributes); +// swab_mar_ckpt_section_descriptor_t (&req_exec_ckpt_sync_state->section_descriptor); + swab_mar_uint32_t (&req_exec_ckpt_sync_state->nodeid); for (i = 0; i < PROCESSOR_COUNT_MAX; i++) { - swab_mar_ckpt_refcnt_t (&req_exec_ckpt_synchronize_state->ckpt_refcnt[i]); + swab_mar_refcount_set_t (&req_exec_ckpt_sync_state->refcount_set[i]); } } -static void exec_ckpt_synchronize_section_endian_convert (void *msg) +static void exec_ckpt_sync_section_endian_convert (void *msg) { - struct req_exec_ckpt_synchronize_section *req_exec_ckpt_synchronize_section = (struct req_exec_ckpt_synchronize_section *)msg; - swab_mar_req_header_t (&req_exec_ckpt_synchronize_section->header); -// swab_mar_memb_ring_id_t (&req_exec_ckpt_synchronize_section->memb_ring_id); - swab_mar_name_t (&req_exec_ckpt_synchronize_section->checkpoint_name); - swab_mar_uint32_t (&req_exec_ckpt_synchronize_section->ckpt_id); - swab_mar_uint32_t (&req_exec_ckpt_synchronize_section->id_len); - swab_mar_offset_t (&req_exec_ckpt_synchronize_section->data_offset); - swab_mar_offset_t (&req_exec_ckpt_synchronize_section->data_size); + struct req_exec_ckpt_sync_section *req_exec_ckpt_sync_section = (struct req_exec_ckpt_sync_section *)msg; + swab_mar_req_header_t (&req_exec_ckpt_sync_section->header); +// swab_mar_memb_ring_id_t (&req_exec_ckpt_sync_section->memb_ring_id); + swab_mar_name_t (&req_exec_ckpt_sync_section->checkpoint_name); + swab_mar_uint32_t (&req_exec_ckpt_sync_section->ckpt_id); + swab_mar_uint32_t (&req_exec_ckpt_sync_section->id_len); + swab_mar_offset_t (&req_exec_ckpt_sync_section->data_offset); + swab_mar_offset_t (&req_exec_ckpt_sync_section->data_size); } +#endif /* * Executive message handlers @@ -1813,7 +1273,6 @@ static void message_handler_req_exec_ckpt_checkpointopen ( struct checkpoint_cleanup *checkpoint_cleanup = 0; struct ckpt_pd *ckpt_pd; SaAisErrorT error = SA_AIS_OK; - int proc_index; log_printf (LOG_LEVEL_DEBUG, "Executive request to open checkpoint %p\n", req_exec_ckpt_checkpointopen); @@ -1830,7 +1289,9 @@ static void message_handler_req_exec_ckpt_checkpointopen ( } } - checkpoint = checkpoint_find_linked (&req_exec_ckpt_checkpointopen->checkpoint_name); + checkpoint = checkpoint_find_linked ( + &checkpoint_list_head, + &req_exec_ckpt_checkpointopen->checkpoint_name); /* * If checkpoint doesn't exist, create one @@ -1856,10 +1317,9 @@ static void message_handler_req_exec_ckpt_checkpointopen ( list_init (&checkpoint->list); list_init (&checkpoint->sections_list_head); list_add (&checkpoint->list, &checkpoint_list_head); - checkpoint->referenceCount = 1; + checkpoint->reference_count = 1; checkpoint->retention_timer = 0; - checkpoint->expired = 0; - checkpoint->sectionCount = 0; + checkpoint->section_count = 0; checkpoint->ckpt_id = global_ckpt_id++; if ((checkpoint->checkpoint_creation_attributes.creation_flags & (SA_CKPT_WR_ACTIVE_REPLICA | SA_CKPT_WR_ACTIVE_REPLICA_WEAK)) && @@ -1872,7 +1332,8 @@ static void message_handler_req_exec_ckpt_checkpointopen ( checkpoint->active_replica_set = 0; } - initialize_ckpt_refcnt_array(checkpoint->ckpt_refcnt); + memset (&checkpoint->refcount_set, 0, + sizeof (struct refcount_set) * PROCESSOR_COUNT_MAX); /* * Create default section id if max_sections is 1 @@ -1918,26 +1379,10 @@ static void message_handler_req_exec_ckpt_checkpointopen ( * Setup connection information and mark checkpoint as referenced */ log_printf (LOG_LEVEL_DEBUG, "CHECKPOINT opened is %p\n", checkpoint); - checkpoint->referenceCount += 1; - /* - * Add the connection reference information to the Checkpoint to be - * sent out later as a part of the sync process. - * - */ - proc_index = processor_index_find(nodeid,checkpoint->ckpt_refcnt); - if (proc_index == -1) {/* Could not find, lets set the processor to an index.*/ - proc_index = processor_index_set(nodeid,checkpoint->ckpt_refcnt); - } - if (proc_index != -1 ) { - checkpoint->ckpt_refcnt[proc_index].nodeid = nodeid; - checkpoint->ckpt_refcnt[proc_index].count++; - } - else { - log_printf (LOG_LEVEL_ERROR, - "MAX LIMIT OF PROCESSORS reached. Cannot store new proc %p info.\n", - checkpoint); - } + sync_refcount_increment (checkpoint, nodeid); + sync_refcount_calculate (checkpoint); + /* * Reset retention duration since this checkpoint was just opened */ @@ -1994,9 +1439,12 @@ error_exit: * This is the path taken when all goes well and this call was local */ if (error == SA_AIS_OK) { - memcpy(&checkpoint_cleanup->checkpoint,checkpoint,sizeof(struct checkpoint)); ckpt_pd = openais_conn_private_data_get (req_exec_ckpt_checkpointopen->source.conn); + memcpy(&checkpoint_cleanup->checkpoint_name, + &checkpoint->name, sizeof (mar_name_t)); + checkpoint_cleanup->ckpt_id = checkpoint->ckpt_id; + list_add (&checkpoint_cleanup->list, &ckpt_pd->checkpoint_list); } else { @@ -2009,230 +1457,6 @@ error_exit: } } -static int recovery_checkpoint_open ( - mar_name_t *checkpoint_name, - mar_uint32_t ckpt_id, - mar_ckpt_checkpoint_creation_attributes_t *ckptAttributes, - struct ckpt_refcnt *ref_cnt) -{ - int i; - struct checkpoint *checkpoint = 0; - struct checkpoint_section *checkpoint_section = 0; - SaAisErrorT error = SA_AIS_OK; - - log_printf (LOG_LEVEL_DEBUG, "recovery_checkpoint_open %s\n", checkpoint_name->value); - log_printf (LOG_LEVEL_DEBUG, "recovery_checkpoint_open refcnt Values\n"); - for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { - if (ref_cnt[i].nodeid) { - log_printf (LOG_LEVEL_DEBUG,"Index %d has proc %s and count %d\n", - i, - totempg_ifaces_print (ref_cnt[i].nodeid), - ref_cnt[i].count); - } - } - - - checkpoint = checkpoint_find (checkpoint_name, ckpt_id); - - /* - * If checkpoint doesn't exist, create one - */ - if (checkpoint == 0) { - log_printf (LOG_LEVEL_DEBUG, "recovery_checkpoint_open Allocating new Checkpoint %s\n", checkpoint_name->value); - checkpoint = malloc (sizeof (struct checkpoint)); - if (checkpoint == 0) { - error = SA_AIS_ERR_NO_MEMORY; - goto error_exit; - } - - checkpoint_section = malloc (sizeof (struct checkpoint_section)); - if (checkpoint_section == 0) { - free (checkpoint); - error = SA_AIS_ERR_NO_MEMORY; - goto error_exit; - } - - memcpy (&checkpoint->name, - checkpoint_name, - sizeof (mar_name_t)); - memcpy (&checkpoint->checkpoint_creation_attributes, - ckptAttributes, - sizeof (mar_ckpt_checkpoint_creation_attributes_t)); - checkpoint->ckpt_id = ckpt_id; - /* - * TODO this is wrong - worry about unlinked value of checkpoint sync - */ - checkpoint->unlinked = 0; - list_init (&checkpoint->list); - list_init (&checkpoint->sections_list_head); - list_add (&checkpoint->list, &checkpoint_list_head); - checkpoint->retention_timer = 0; - checkpoint->expired = 0; - -#if 0 - /* - * Add in default checkpoint section - */ - list_init (&checkpoint_section->list); - list_add (&checkpoint_section->list, &checkpoint->sections_list_head); - - /* - * Default section id - */ - checkpoint_section->section_descriptor.section_id.id = 0; - checkpoint_section->section_descriptor.section_id.id_len = 0; - checkpoint_section->section_descriptor.expiration_time = SA_TIME_END; - checkpoint_section->section_descriptor.section_state = SA_CKPT_SECTION_VALID; - checkpoint_section->section_descriptor.last_update = 0; /*current time*/ - checkpoint_section->section_descriptor.section_size = strlen("Factory installed data\0")+1; - checkpoint_section->section_data = malloc(strlen("Factory installed data\0")+1); - assert(checkpoint_section->section_data); - memcpy(checkpoint_section->section_data, "Factory installed data\0", strlen("Factory installed data\0")+1); - checkpoint_section->expiration_timer = 0; -#endif - - if ((checkpoint->checkpoint_creation_attributes.creation_flags & (SA_CKPT_WR_ACTIVE_REPLICA | SA_CKPT_WR_ACTIVE_REPLICA_WEAK)) && - (checkpoint->checkpoint_creation_attributes.creation_flags & SA_CKPT_CHECKPOINT_COLLOCATED) == 0) { - checkpoint->active_replica_set = 1; - } else - if ((checkpoint->checkpoint_creation_attributes.creation_flags & SA_CKPT_WR_ALL_REPLICAS) == 1) { - checkpoint->active_replica_set = 1; - } else { - checkpoint->active_replica_set = 0; - } - - - initialize_ckpt_refcnt_array(checkpoint->ckpt_refcnt); - } - else { - /* - * Setup connection information and mark checkpoint as referenced - */ - log_printf (LOG_LEVEL_DEBUG, "recovery CHECKPOINT reopened is %p\n", checkpoint); - } - - /* synchronize global_ckpt_id to max(ckpt_id,global_ckpt_id)+1 */ - if (ckpt_id >= global_ckpt_id) { - global_ckpt_id = ckpt_id + 1; - } - - /*CHECK to see if there are any existing ckpts*/ - if ((checkpoint->ckpt_refcnt) && (ckpt_refcnt_total(checkpoint->ckpt_refcnt) > 0)) { - log_printf (LOG_LEVEL_DEBUG,"calling merge_ckpt_refcnts\n"); - merge_ckpt_refcnts(checkpoint->ckpt_refcnt, ref_cnt); - } - else { - initialize_ckpt_refcnt_array(checkpoint->ckpt_refcnt); - } - - /*No Existing ckpts. Lets assign what we got over the network or the merged with network values*/ - /* - * The reason why we are adding 1 is because there is an assignment vis-a-via an increment in the - * the next line. Whether the ckpt was opened earlier or just now, the referenceCount is getting - * obliterated in the next line. - */ - checkpoint->referenceCount = ckpt_refcnt_total(ref_cnt) + 1; /*defect 1192*/ - log_printf (LOG_LEVEL_DEBUG, "OPEN checkpoint->referenceCount %d\n",checkpoint->referenceCount); - memcpy (checkpoint->ckpt_refcnt, - ref_cnt, - sizeof(struct ckpt_refcnt) * PROCESSOR_COUNT_MAX); - - /* - * Reset retention duration since this checkpoint was just opened - */ - openais_timer_delete (checkpoint->retention_timer); - checkpoint->retention_timer = 0; - - /* - * Send error result to CKPT library - */ -error_exit: - return (error); -} - -static void message_handler_req_exec_ckpt_synchronize_state ( - void *message, - unsigned int nodeid) -{ - int retcode; - struct req_exec_ckpt_synchronize_state *req_exec_ckpt_sync_state - = (struct req_exec_ckpt_synchronize_state *)message; - struct ckpt_refcnt local_ckpt_refcnt[PROCESSOR_COUNT_MAX]; - unsigned int i; - - /* - * If the Incoming message's previous ring id == saved_ring_id - * Ignore because we have seen this message before. - */ - if (memcmp (&req_exec_ckpt_sync_state->previous_ring_id, &saved_ring_id,sizeof (struct memb_ring_id)) == 0) { - log_printf(LOG_LEVEL_DEBUG, "message_handler_req_exec_ckpt_synchronize_state ignoring ...\n"); - return; - } - - for (i = 0; i < PROCESSOR_COUNT_MAX; i++) { - marshall_from_mar_ckpt_refcnt_t (&local_ckpt_refcnt[i], - &req_exec_ckpt_sync_state->ckpt_refcnt[i]); - } - retcode = recovery_checkpoint_open ( - &req_exec_ckpt_sync_state->checkpoint_name, - req_exec_ckpt_sync_state->ckpt_id, - &req_exec_ckpt_sync_state->checkpoint_creation_attributes, - local_ckpt_refcnt); - if (retcode != SA_AIS_OK) { - log_printf(LOG_LEVEL_DEBUG, "message_handler_req_exec_ckpt_synchronize_state\n"); - log_printf(LOG_LEVEL_DEBUG, "recovery_checkpoint_open returned %d\n",retcode); - } - - retcode = recovery_section_create ( - &req_exec_ckpt_sync_state->section_descriptor, - &req_exec_ckpt_sync_state->checkpoint_name, - req_exec_ckpt_sync_state->ckpt_id, - (char*)req_exec_ckpt_sync_state - + sizeof (struct req_exec_ckpt_synchronize_state)); - if (retcode != SA_AIS_OK) { - log_printf(LOG_LEVEL_DEBUG, "message_handler_req_exec_ckpt_synchronize_state\n"); - log_printf(LOG_LEVEL_DEBUG, "recovery_section_create returned %d\n",retcode); - } -} - -static void message_handler_req_exec_ckpt_synchronize_section ( - void *message, - unsigned int nodeid) -{ - int retcode; - struct req_exec_ckpt_synchronize_section *req_exec_ckpt_sync_section - = (struct req_exec_ckpt_synchronize_section *)message; - - /* - * If the Incoming message's previous ring id == saved_ring_id - * Ignore because we have seen this message before. - */ - if (memcmp (&req_exec_ckpt_sync_section->previous_ring_id, &saved_ring_id,sizeof (struct memb_ring_id)) == 0) { - log_printf(LOG_LEVEL_DEBUG, "message_handler_req_exec_ckpt_synchronize_section ignoring ...\n"); - return; - } - - /* - * Write the contents of the section to the checkpoint section. - */ - retcode = recovery_section_write( - req_exec_ckpt_sync_section->id_len, - (char*)req_exec_ckpt_sync_section + - sizeof (struct req_exec_ckpt_synchronize_section), - &req_exec_ckpt_sync_section->checkpoint_name, - req_exec_ckpt_sync_section->ckpt_id, - (char*)req_exec_ckpt_sync_section - + sizeof (struct req_exec_ckpt_synchronize_section) - + req_exec_ckpt_sync_section->id_len, - req_exec_ckpt_sync_section->data_offset, - req_exec_ckpt_sync_section->data_size); - if (retcode != SA_AIS_OK) { - log_printf(LOG_LEVEL_ERROR, "message_handler_req_exec_ckpt_synchronize_section\n"); - log_printf(LOG_LEVEL_ERROR, "recovery_section_write returned %d\n",retcode); - } -} - - unsigned int abstime_to_msec (mar_time_t time) { struct timeval tv; @@ -2266,7 +1490,10 @@ void timer_function_section_expire (void *data) goto free_mem; } - checkpoint = checkpoint_find (&ckpt_id->ckpt_name, ckpt_id->ckpt_id); + checkpoint = checkpoint_find ( + &checkpoint_list_head, + &ckpt_id->ckpt_name, + ckpt_id->ckpt_id); if (checkpoint == 0) { log_printf (LOG_LEVEL_ERROR, "timer_function_section_expire could not find ckpt %s\n", ckpt_id->ckpt_name.value); @@ -2287,7 +1514,7 @@ void timer_function_section_expire (void *data) ckpt_id->ckpt_section_id.id, ckpt_id->ckpt_name.value); - checkpoint->sectionCount -= 1; + checkpoint->section_count -= 1; /* * defect id 1112 "memory leak in checkpoint service" Dont try * to delete the timer as the timer mechanism takes care of that. @@ -2332,13 +1559,13 @@ static void message_handler_req_exec_ckpt_checkpointclose ( struct res_lib_ckpt_checkpointclose res_lib_ckpt_checkpointclose; struct checkpoint *checkpoint = 0; SaAisErrorT error = SA_AIS_OK; - int proc_index; int release_checkpoint = 0; log_printf (LOG_LEVEL_DEBUG, "Got EXEC request to close checkpoint %s\n", get_mar_name_t (&req_exec_ckpt_checkpointclose->checkpoint_name)); checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_exec_ckpt_checkpointclose->checkpoint_name, req_exec_ckpt_checkpointclose->ckpt_id); if (checkpoint == 0) { @@ -2346,34 +1573,21 @@ static void message_handler_req_exec_ckpt_checkpointclose ( goto error_exit; } - log_printf (LOG_LEVEL_DEBUG, "CKPT:CLOSE checkpoint->referenceCount %d\n",checkpoint->referenceCount); - checkpoint->referenceCount--; - /* - * Modify the connection reference information to the Checkpoint to be - * sent out later as a part of the sync process. - */ + sync_refcount_decrement (checkpoint, nodeid); + sync_refcount_calculate (checkpoint); - proc_index = processor_index_find(nodeid, checkpoint->ckpt_refcnt); - if (proc_index != -1 ) { - checkpoint->ckpt_refcnt[proc_index].count--; - } - else { - log_printf (LOG_LEVEL_ERROR, - "Could Not find Processor Info %p info.\n", - checkpoint); - } - assert (checkpoint->referenceCount > 0); /*defect 1192*/ - log_printf (LOG_LEVEL_DEBUG, "disconnect called, new CKPT ref count is %d\n", - checkpoint->referenceCount); + log_printf (LOG_LEVEL_DEBUG, "Close checkpoint->reference_count %d\n", + checkpoint->reference_count); + assert (checkpoint->reference_count >= 0); /* * If checkpoint has been unlinked and this is the last reference, delete it */ - if (checkpoint->unlinked && checkpoint->referenceCount == 1) { /*defect 1192*/ + if (checkpoint->unlinked && checkpoint->reference_count == 0) { log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n"); release_checkpoint = 1; } else - if (checkpoint->referenceCount == 1) { /*defect 1192*/ + if (checkpoint->reference_count == 0) { openais_timer_add ( checkpoint->checkpoint_creation_attributes.retention_duration / 1000000, checkpoint, @@ -2392,11 +1606,6 @@ error_exit: res_lib_ckpt_checkpointclose.header.error = error; openais_conn_send_response (req_exec_ckpt_checkpointclose->source.conn, &res_lib_ckpt_checkpointclose, sizeof (struct res_lib_ckpt_checkpointclose)); - if (error == SA_AIS_OK) { - ckpt_checkpoint_remove_cleanup ( - req_exec_ckpt_checkpointclose->source.conn, - checkpoint); - } } /* @@ -2419,6 +1628,7 @@ static void message_handler_req_exec_ckpt_checkpointunlink ( log_printf (LOG_LEVEL_DEBUG, "Got EXEC request to unlink checkpoint %p\n", req_exec_ckpt_checkpointunlink); checkpoint = checkpoint_find_linked ( + &checkpoint_list_head, &req_exec_ckpt_checkpointunlink->checkpoint_name); if (checkpoint == 0) { error = SA_AIS_ERR_NOT_EXIST; @@ -2431,7 +1641,7 @@ static void message_handler_req_exec_ckpt_checkpointunlink ( /* * Immediately delete entry if reference count is zero */ - if (checkpoint->referenceCount == 1) { + if (checkpoint->reference_count == 0) { /* * Remove retention timer since this checkpoint was unlinked and is no * longer referenced @@ -2464,6 +1674,7 @@ static void message_handler_req_exec_ckpt_checkpointretentiondurationset ( SaAisErrorT error = SA_AIS_ERR_BAD_OPERATION; checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_exec_ckpt_checkpointretentiondurationset->checkpoint_name, req_exec_ckpt_checkpointretentiondurationset->ckpt_id); if (checkpoint) { @@ -2473,7 +1684,7 @@ static void message_handler_req_exec_ckpt_checkpointretentiondurationset ( checkpoint->checkpoint_creation_attributes.retention_duration = req_exec_ckpt_checkpointretentiondurationset->retention_duration; - if (checkpoint->expired == 0 && checkpoint->referenceCount == 1) { /*defect 1192*/ + if (checkpoint->reference_count == 0) { openais_timer_delete (checkpoint->retention_timer); openais_timer_add ( @@ -2511,12 +1722,12 @@ static void message_handler_req_exec_ckpt_checkpointretentiondurationexpire ( struct iovec iovec; checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name, req_exec_ckpt_checkpointretentiondurationexpire->ckpt_id); - if (checkpoint && (checkpoint->expired == 0) && (checkpoint->referenceCount == 1)) { + if (checkpoint && (checkpoint->reference_count == 0)) { log_printf (LOG_LEVEL_DEBUG, "Expiring checkpoint %s\n", get_mar_name_t (&req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name)); - checkpoint->expired = 1; req_exec_ckpt_checkpointunlink.header.size = sizeof (struct req_exec_ckpt_checkpointunlink); @@ -2538,158 +1749,6 @@ static void message_handler_req_exec_ckpt_checkpointretentiondurationexpire ( } } -static int recovery_section_create ( - mar_ckpt_section_descriptor_t *section_descriptor, - mar_name_t *checkpoint_name, - mar_uint32_t ckpt_id, - char *section_id) -{ - struct checkpoint *checkpoint; - struct checkpoint_section *checkpoint_section; - void *initial_data; - struct ckpt_identifier *ckpt_identifier = 0; - SaAisErrorT error = SA_AIS_OK; - void *section_id_new; - - if ((int)section_descriptor->section_id.id_len) { - log_printf (LOG_LEVEL_DEBUG, "recovery_section_create for checkpoint %s, section %s.\n", - checkpoint_name->value, section_id); - } else { - log_printf (LOG_LEVEL_DEBUG, "recovery_section_create for checkpoint %s, default section.\n", - checkpoint_name->value); - } - - checkpoint = checkpoint_find (checkpoint_name, ckpt_id); - if (checkpoint == 0) { - error = SA_AIS_ERR_NOT_EXIST; - goto error_exit; - } - - /* - * Determine if user-specified checkpoint ID already exists - */ - checkpoint_section = checkpoint_section_find ( - checkpoint, - section_id, - (int)section_descriptor->section_id.id_len); - if (checkpoint_section) { - /* - * This use case is mostly for the default section and is not probable for any other - * sections. - */ - if (section_descriptor->section_size - > checkpoint_section->section_descriptor.section_size) { - void *section_data_tmp; - - log_printf (LOG_LEVEL_NOTICE, - "recovery_section_create reallocating data. Present Size: %d, New Size: %d\n", - (int)checkpoint_section->section_descriptor.section_size, - (int)section_descriptor->section_size); - - section_data_tmp = - realloc (checkpoint_section->section_data, section_descriptor->section_size); - if (section_data_tmp == NULL) { - log_printf (LOG_LEVEL_ERROR, - "recovery_section_create section_data realloc returned NULL Calling error_exit.\n"); - error = SA_AIS_ERR_NO_MEMORY; - checkpoint_section_release(checkpoint_section); - goto error_exit; - } - checkpoint_section->section_data = section_data_tmp; - - checkpoint_section->section_descriptor.section_size = section_descriptor->section_size; - error = SA_AIS_OK; - - } - else { - error = SA_AIS_ERR_EXIST; - } - goto error_exit; - } - - /* - * Allocate checkpoint section - */ - checkpoint_section = malloc (sizeof (struct checkpoint_section)); - if (checkpoint_section == 0) { - error = SA_AIS_ERR_NO_MEMORY; - goto error_exit; - } - /* - * Allocate checkpoint section data - */ - initial_data = malloc (section_descriptor->section_size); - if (initial_data == 0) { - free (checkpoint_section); - error = SA_AIS_ERR_NO_MEMORY; - goto error_exit; - } - /* - * Allocate checkpoint section id - */ - section_id_new = NULL; - if (section_descriptor->section_id.id_len) { - section_id_new = malloc ((int)section_descriptor->section_id.id_len); - if (section_id_new == 0) { - free (checkpoint_section); - free (initial_data); - error = SA_AIS_ERR_NO_MEMORY; - goto error_exit; - } - } - - /* - * Copy checkpoint section ID and initialize data. - */ - if (section_id) { - memcpy ((char*)section_id_new, (char*)section_id, - (int)section_descriptor->section_id.id_len); - } - memset (initial_data, 0, section_descriptor->section_size); - - /* - * Configure checkpoint section - */ - memcpy(&checkpoint_section->section_descriptor, - section_descriptor, - sizeof(mar_ckpt_section_descriptor_t)); - checkpoint_section->section_descriptor.section_state = SA_CKPT_SECTION_VALID; - checkpoint_section->section_data = initial_data; - checkpoint_section->expiration_timer = 0; - checkpoint_section->section_descriptor.section_id.id = section_id_new; - - if (section_descriptor->expiration_time != SA_TIME_END) { - ckpt_identifier = malloc (sizeof(struct ckpt_identifier)); - assert(ckpt_identifier); - memcpy(&ckpt_identifier->ckpt_name, checkpoint_name, - sizeof(mar_name_t)); - ckpt_identifier->ckpt_id = ckpt_id; - memcpy(&ckpt_identifier->ckpt_section_id, - &checkpoint_section->section_descriptor.section_id, - sizeof(mar_ckpt_section_id_t)); - log_printf (LOG_LEVEL_DEBUG, "recovery_section_create Enqueuing Timer to Expire section %s in ckpt %s\n", - ckpt_identifier->ckpt_section_id.id, - ckpt_identifier->ckpt_name.value); - openais_timer_add ( - abstime_to_msec (checkpoint_section->section_descriptor.expiration_time), - ckpt_identifier, - timer_function_section_expire, - &checkpoint_section->expiration_timer); - log_printf (LOG_LEVEL_DEBUG, "recovery_section_create expiration timer = 0x%p\n", - checkpoint_section->expiration_timer); - } - - /* - * Add checkpoint section to checkpoint - */ - list_init (&checkpoint_section->list); - list_add (&checkpoint_section->list, - &checkpoint->sections_list_head); - checkpoint->sectionCount += 1; - -error_exit: - return (error); -} static void message_handler_req_exec_ckpt_sectioncreate ( void *message, @@ -2706,6 +1765,7 @@ static void message_handler_req_exec_ckpt_sectioncreate ( log_printf (LOG_LEVEL_DEBUG, "Executive request to create a checkpoint section.\n"); checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_exec_ckpt_sectioncreate->checkpoint_name, req_exec_ckpt_sectioncreate->ckpt_id); if (checkpoint == 0) { @@ -2713,7 +1773,7 @@ static void message_handler_req_exec_ckpt_sectioncreate ( goto error_exit; } - if (checkpoint->sectionCount == checkpoint->checkpoint_creation_attributes.max_sections) { + if (checkpoint->section_count == checkpoint->checkpoint_creation_attributes.max_sections) { error = SA_AIS_ERR_NO_SPACE; goto error_exit; } @@ -2835,7 +1895,7 @@ static void message_handler_req_exec_ckpt_sectioncreate ( list_init (&checkpoint_section->list); list_add (&checkpoint_section->list, &checkpoint->sections_list_head); - checkpoint->sectionCount += 1; + checkpoint->section_count += 1; error_exit: if (message_source_is_local(&req_exec_ckpt_sectioncreate->source)) { @@ -2860,6 +1920,7 @@ static void message_handler_req_exec_ckpt_sectiondelete ( SaAisErrorT error = SA_AIS_OK; checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_exec_ckpt_sectiondelete->checkpoint_name, req_exec_ckpt_sectiondelete->ckpt_id); if (checkpoint == 0) { @@ -2895,7 +1956,7 @@ static void message_handler_req_exec_ckpt_sectiondelete ( /* * Delete checkpoint section */ - checkpoint->sectionCount -= 1; + checkpoint->section_count -= 1; checkpoint_section_release (checkpoint_section); /* @@ -2927,6 +1988,7 @@ static void message_handler_req_exec_ckpt_sectionexpirationtimeset ( log_printf (LOG_LEVEL_DEBUG, "Executive request to set section expiration time\n"); checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_exec_ckpt_sectionexpirationtimeset->checkpoint_name, req_exec_ckpt_sectionexpirationtimeset->ckpt_id); if (checkpoint == 0) { @@ -3006,63 +2068,6 @@ error_exit: } } -static int recovery_section_write( - int section_id_len, - char* section_id, - mar_name_t *checkpoint_name, - mar_uint32_t ckpt_id, - void *new_data, - mar_uint32_t data_offset, - mar_uint32_t data_size) -{ - struct checkpoint *checkpoint; - struct checkpoint_section *checkpoint_section; - int size_required; - SaAisErrorT error = SA_AIS_OK; - char *sd; - - log_printf (LOG_LEVEL_DEBUG, "recovery_section_write.\n"); - checkpoint = checkpoint_find (checkpoint_name, ckpt_id); - if (checkpoint == 0) { - error = SA_AIS_ERR_NOT_EXIST; - goto error_exit; - } - - /* - * Find checkpoint section to be written - */ - checkpoint_section = checkpoint_section_find (checkpoint, - section_id, section_id_len); - if (checkpoint_section == 0) { - error = SA_AIS_ERR_NOT_EXIST; - goto error_exit; - } - - /* - * If write would extend past end of section data, return error; - */ - size_required = data_offset + data_size; - if (size_required > checkpoint_section->section_descriptor.section_size) { - log_printf (LOG_LEVEL_ERROR, - "recovery_section_write. write-past-end size_required:(%d), data_offset:(%d), data_size:(%d), sync-section-size:(%d)\n", - size_required, data_offset, data_size, - (int)checkpoint_section->section_descriptor.section_size); - error = SA_AIS_ERR_ACCESS; - goto error_exit; - } - - /* - * Write checkpoint section to section data - */ - if (data_size > 0) { - sd = (char *)checkpoint_section->section_data; - memcpy (&sd[data_offset], new_data, data_size); - } -error_exit: - return (error); -} - - static void message_handler_req_exec_ckpt_sectionwrite ( void *message, unsigned int nodeid) @@ -3077,6 +2082,7 @@ static void message_handler_req_exec_ckpt_sectionwrite ( log_printf (LOG_LEVEL_DEBUG, "Executive request to section write.\n"); checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_exec_ckpt_sectionwrite->checkpoint_name, req_exec_ckpt_sectionwrite->ckpt_id); if (checkpoint == 0) { @@ -3127,8 +2133,8 @@ static void message_handler_req_exec_ckpt_sectionwrite ( req_exec_ckpt_sectionwrite->data_size; if (size_required > checkpoint_section->section_descriptor.section_size) { section_data = realloc (checkpoint_section->section_data, size_required); - if (section_data == NULL) { - log_printf (LOG_LEVEL_ERROR, "section_data realloc returned NULL Calling error_exit.\n"); + if (section_data == 0) { + log_printf (LOG_LEVEL_ERROR, "section_data realloc returned 0 Calling error_exit.\n"); error = SA_AIS_ERR_NO_MEMORY; goto error_exit; } @@ -3185,6 +2191,7 @@ static void message_handler_req_exec_ckpt_sectionoverwrite ( log_printf (LOG_LEVEL_DEBUG, "Executive request to section overwrite.\n"); checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_exec_ckpt_sectionoverwrite->checkpoint_name, req_exec_ckpt_sectionoverwrite->ckpt_id); if (checkpoint == 0) { @@ -3284,6 +2291,7 @@ static void message_handler_req_exec_ckpt_sectionread ( log_printf (LOG_LEVEL_DEBUG, "Executive request for section read.\n"); checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_exec_ckpt_sectionread->checkpoint_name, req_exec_ckpt_sectionread->ckpt_id); if (checkpoint == 0) { @@ -3378,10 +2386,6 @@ static int ckpt_lib_init_fn (void *conn) hdb_create (&ckpt_pd->iteration_hdb); - /* TODO - list_add (&ckpt_pd->sectionIterator.list, - &checkpoint_iteration_list_head); - */ list_init (&ckpt_pd->checkpoint_list); return (0); @@ -3391,7 +2395,7 @@ static int ckpt_lib_init_fn (void *conn) static int ckpt_lib_exit_fn (void *conn) { struct checkpoint_cleanup *checkpoint_cleanup; - struct list_head *cleanup_list; + struct list_head *list; struct ckpt_pd *ckpt_pd = (struct ckpt_pd *)openais_conn_private_data_get (conn); log_printf (LOG_LEVEL_DEBUG, "checkpoint exit conn %p\n", conn); @@ -3399,29 +2403,25 @@ static int ckpt_lib_exit_fn (void *conn) /* * close all checkpoints opened on this connection */ - cleanup_list = ckpt_pd->checkpoint_list.next; + list = ckpt_pd->checkpoint_list.next; while (!list_empty(&ckpt_pd->checkpoint_list)) { - checkpoint_cleanup = list_entry (cleanup_list, + checkpoint_cleanup = list_entry (list, struct checkpoint_cleanup, list); - if (checkpoint_cleanup->checkpoint.name.length > 0) { - ckpt_checkpoint_close ( - &checkpoint_cleanup->checkpoint); - } + assert (checkpoint_cleanup->checkpoint_name.length != 0); + ckpt_checkpoint_close ( + &checkpoint_cleanup->checkpoint_name, + checkpoint_cleanup->ckpt_id); list_del (&checkpoint_cleanup->list); free (checkpoint_cleanup); - cleanup_list = ckpt_pd->checkpoint_list.next; + list = ckpt_pd->checkpoint_list.next; } - /* TODO - if (ckpt_pd->sectionIterator.sectionIteratorEntries) { - free (ckpt_pd->sectionIterator.sectionIteratorEntries); - } - list_del (&ckpt_pd->sectionIterator.list); - */ + hdb_destroy (&ckpt_pd->iteration_hdb); + return (0); } @@ -3492,6 +2492,10 @@ static void message_handler_req_lib_ckpt_checkpointclose ( iovec.iov_base = (char *)&req_exec_ckpt_checkpointclose; iovec.iov_len = sizeof (req_exec_ckpt_checkpointclose); + ckpt_checkpoint_remove_cleanup ( + conn, + req_lib_ckpt_checkpointclose->checkpoint_name, + req_lib_ckpt_checkpointclose->ckpt_id); assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0); } @@ -3549,7 +2553,6 @@ static void message_handler_req_lib_ckpt_checkpointretentiondurationset ( assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0); - } static void message_handler_req_lib_ckpt_activereplicaset ( @@ -3562,6 +2565,7 @@ static void message_handler_req_lib_ckpt_activereplicaset ( SaAisErrorT error = SA_AIS_OK; checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_lib_ckpt_activereplicaset->checkpoint_name, req_lib_ckpt_activereplicaset->ckpt_id); @@ -3599,10 +2603,11 @@ static void message_handler_req_lib_ckpt_checkpointstatusget ( * Count memory used by checkpoint sections */ checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_lib_ckpt_checkpointstatusget->checkpoint_name, req_lib_ckpt_checkpointstatusget->ckpt_id); - if (checkpoint && (checkpoint->expired == 0)) { + if (checkpoint) { for (checkpoint_section_list = checkpoint->sections_list_head.next; checkpoint_section_list != &checkpoint->sections_list_head; @@ -3950,6 +2955,7 @@ static void message_handler_req_lib_ckpt_checkpointsynchronize ( struct checkpoint *checkpoint; checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_lib_ckpt_checkpointsynchronize->checkpoint_name, req_lib_ckpt_checkpointsynchronize->ckpt_id); if ((checkpoint->checkpoint_creation_attributes.creation_flags & (SA_CKPT_WR_ACTIVE_REPLICA | SA_CKPT_WR_ACTIVE_REPLICA_WEAK)) == 0) { @@ -3979,6 +2985,7 @@ static void message_handler_req_lib_ckpt_checkpointsynchronizeasync ( struct checkpoint *checkpoint; checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_lib_ckpt_checkpointsynchronizeasync->checkpoint_name, req_lib_ckpt_checkpointsynchronizeasync->ckpt_id); if ((checkpoint->checkpoint_creation_attributes.creation_flags & (SA_CKPT_WR_ACTIVE_REPLICA | SA_CKPT_WR_ACTIVE_REPLICA_WEAK)) == 0) { @@ -4026,6 +3033,7 @@ static void message_handler_req_lib_ckpt_sectioniterationinitialize ( log_printf (LOG_LEVEL_DEBUG, "section iteration initialize\n"); checkpoint = checkpoint_find ( + &checkpoint_list_head, &req_lib_ckpt_sectioniterationinitialize->checkpoint_name, req_lib_ckpt_sectioniterationinitialize->ckpt_id); if (checkpoint == 0) { @@ -4057,6 +3065,12 @@ static void message_handler_req_lib_ckpt_sectioniterationinitialize ( iteration_instance->iteration_entries_count = 0; iteration_instance->iteration_pos = 0; + memcpy (&iteration_instance->checkpoint_name, + &req_lib_ckpt_sectioniterationinitialize->checkpoint_name, + sizeof (mar_name_t)); + iteration_instance->ckpt_id = + req_lib_ckpt_sectioniterationinitialize->ckpt_id; + /* * Iterate list of checkpoint sections */ @@ -4090,16 +3104,25 @@ static void message_handler_req_lib_ckpt_sectioniterationinitialize ( /* iterate all sections - do nothing */ break; } - iteration_instance->iteration_entries_count += 1; - iteration_entries = realloc (iteration_instance->iteration_entries, - sizeof (struct iteration_entry) * iteration_instance->iteration_entries_count); + iteration_entries = realloc ( + iteration_instance->iteration_entries, + sizeof (struct iteration_entry) * + (iteration_instance->iteration_entries_count + 1)); if (iteration_entries == NULL) { error = SA_AIS_ERR_NO_MEMORY; goto error_put; } iteration_instance->iteration_entries = iteration_entries; - iteration_entries[iteration_instance->iteration_entries_count - 1].active = 1; - iteration_entries[iteration_instance->iteration_entries_count - 1].checkpoint_section = checkpoint_section; + + iteration_entries[iteration_instance->iteration_entries_count].section_id = + malloc (checkpoint_section->section_descriptor.section_id.id_len); + assert (iteration_entries[iteration_instance->iteration_entries_count].section_id); + memcpy (iteration_entries[iteration_instance->iteration_entries_count].section_id, + checkpoint_section->section_descriptor.section_id.id, + checkpoint_section->section_descriptor.section_id.id_len); + iteration_entries[iteration_instance->iteration_entries_count].section_id_len = checkpoint_section->section_descriptor.section_id.id_len; + iteration_instance->iteration_entries_count += 1; + } error_put: @@ -4149,8 +3172,6 @@ static void message_handler_req_lib_ckpt_sectioniterationfinalize ( hdb_handle_destroy (&ckpt_pd->iteration_hdb, req_lib_ckpt_sectioniterationfinalize->iteration_handle); - hdb_destroy (&ckpt_pd->iteration_hdb); - error_exit: res_lib_ckpt_sectioniterationfinalize.header.size = sizeof (struct res_lib_ckpt_sectioniterationfinalize); res_lib_ckpt_sectioniterationfinalize.header.id = MESSAGE_RES_CKPT_SECTIONITERATIONFINALIZE; @@ -4173,6 +3194,8 @@ static void message_handler_req_lib_ckpt_sectioniterationnext ( unsigned int res; struct iteration_instance *iteration_instance = NULL; void *iteration_instance_p; + struct checkpoint *checkpoint; + struct checkpoint_section *checkpoint_section = NULL; struct ckpt_pd *ckpt_pd = (struct ckpt_pd *)openais_conn_private_data_get (conn); @@ -4186,6 +3209,7 @@ static void message_handler_req_lib_ckpt_sectioniterationnext ( } iteration_instance = (struct iteration_instance *)iteration_instance_p; + assert (iteration_instance); /* * Find active iteration entry */ @@ -4199,29 +3223,40 @@ static void message_handler_req_lib_ckpt_sectioniterationnext ( } /* - * active iteration entry + * Find the checkpoint section to respond to library + */ + checkpoint = checkpoint_find_specific ( + &checkpoint_list_head, + &iteration_instance->checkpoint_name, + iteration_instance->ckpt_id); + + assert (checkpoint); + + checkpoint_section = checkpoint_section_find ( + checkpoint, + iteration_instance->iteration_entries[iteration_instance->iteration_pos].section_id, + iteration_instance->iteration_entries[iteration_instance->iteration_pos].section_id_len); + + + iteration_instance->iteration_pos += 1; + /* + * If checkpoint section found, then return it in iteration */ - if (iteration_instance->iteration_entries[iteration_instance->iteration_pos].active == 1) { + if (checkpoint_section) { + section_id_size = checkpoint_section->section_descriptor.section_id.id_len; + + memcpy (&res_lib_ckpt_sectioniterationnext.section_descriptor, + &checkpoint_section->section_descriptor, + sizeof (mar_ckpt_section_descriptor_t)); + + /* + * This drops out of for loop + */ break; } - iteration_instance->iteration_pos += 1; } - /* - * Prepare response to API - */ - section_id_size = iteration_instance->iteration_entries[iteration_instance->iteration_pos].checkpoint_section->section_descriptor.section_id.id_len; - - memcpy (&res_lib_ckpt_sectioniterationnext.section_descriptor, - &iteration_instance->iteration_entries[iteration_instance->iteration_pos].checkpoint_section->section_descriptor, - sizeof (mar_ckpt_section_descriptor_t)); - - /* - * Get to next iteration entry - */ - iteration_instance->iteration_pos += 1; - error_put: hdb_handle_put (&ckpt_pd->iteration_hdb, req_lib_ckpt_sectioniterationnext->iteration_handle); @@ -4238,16 +3273,611 @@ error_exit: if (error == SA_AIS_OK) { openais_conn_send_response ( conn, - iteration_instance->iteration_entries[ - iteration_instance->iteration_pos - 1]. - checkpoint_section->section_descriptor.section_id.id, - section_id_size); + checkpoint_section->section_descriptor.section_id.id, + checkpoint_section->section_descriptor.section_id.id_len); } } -static void dump_fn (void) +/* + * Recovery after network partition or merge + */ +void sync_refcount_increment ( + struct checkpoint *checkpoint, + unsigned int nodeid) +{ + unsigned int i; + + for (i = 0; i < PROCESSOR_COUNT_MAX; i++) { + if (checkpoint->refcount_set[i].nodeid == 0) { + checkpoint->refcount_set[i].nodeid = nodeid; + checkpoint->refcount_set[i].refcount = 1; + break; + } + if (checkpoint->refcount_set[i].nodeid == nodeid) { + checkpoint->refcount_set[i].refcount += 1; + break; + } + } +} + +void sync_refcount_add ( + struct checkpoint *checkpoint, + unsigned int nodeid, + unsigned int count) +{ + unsigned int i; + + for (i = 0; i < PROCESSOR_COUNT_MAX; i++) { + if (checkpoint->refcount_set[i].nodeid == 0) { + checkpoint->refcount_set[i].nodeid = nodeid; + checkpoint->refcount_set[i].refcount = count; + break; + } + if (checkpoint->refcount_set[i].nodeid == nodeid) { + checkpoint->refcount_set[i].refcount += count; + break; + } + } +} + +void sync_refcount_decrement ( + struct checkpoint *checkpoint, + unsigned int nodeid) +{ + unsigned int i; + + for (i = 0; i < PROCESSOR_COUNT_MAX; i++) { + if (checkpoint->refcount_set[i].nodeid == 0) { + break; + } + if (checkpoint->refcount_set[i].nodeid == nodeid) { + checkpoint->refcount_set[i].refcount -= 1; + break; + } + } +} + +/* + * Sum all reference counts for the checkpoint + */ +void sync_refcount_calculate ( + struct checkpoint *checkpoint) +{ + checkpoint->reference_count = 0; + unsigned int i; + + for (i = 0; i < PROCESSOR_COUNT_MAX; i++) { + if (checkpoint->refcount_set[i].nodeid == 0) { + break; + } + + checkpoint->reference_count += checkpoint->refcount_set[i].refcount; + } +} + +void sync_checkpoints_free (struct list_head *ckpt_list_head) +{ + struct checkpoint *checkpoint; + struct list_head *list; + + list = ckpt_list_head->next; + while (list != ckpt_list_head) { + checkpoint = list_entry (list, struct checkpoint, list); + list = list->next; + checkpoint_release (checkpoint); + } + list_init (ckpt_list_head); +} + +static inline void sync_checkpoints_enter (void) +{ + struct checkpoint *checkpoint; + + ENTER(); + + my_sync_state = SYNC_STATE_CHECKPOINT; + my_iteration_state = ITERATION_STATE_CHECKPOINT; + my_iteration_state_checkpoint = checkpoint_list_head.next; + + checkpoint = list_entry (checkpoint_list_head.next, struct checkpoint, + list); + my_iteration_state_section = checkpoint->sections_list_head.next; + + LEAVE(); +} + +static inline void sync_refcounts_enter (void) +{ + my_sync_state = SYNC_STATE_REFCOUNT; +} + +static void ckpt_sync_init (void) +{ + ENTER(); + + sync_checkpoints_enter(); + + LEAVE(); +} + +static int sync_checkpoint_transmit (struct checkpoint *checkpoint) +{ + struct req_exec_ckpt_sync_checkpoint req_exec_ckpt_sync_checkpoint; + struct iovec iovec; + + req_exec_ckpt_sync_checkpoint.header.size = + sizeof (struct req_exec_ckpt_sync_checkpoint); + req_exec_ckpt_sync_checkpoint.header.id = + SERVICE_ID_MAKE (CKPT_SERVICE, + MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINT); + + memcpy (&req_exec_ckpt_sync_checkpoint.ring_id, + &my_saved_ring_id, sizeof (struct memb_ring_id)); + + memcpy (&req_exec_ckpt_sync_checkpoint.checkpoint_name, + &checkpoint->name, sizeof (mar_name_t)); + + req_exec_ckpt_sync_checkpoint.ckpt_id = checkpoint->ckpt_id; + + memcpy (&req_exec_ckpt_sync_checkpoint.checkpoint_creation_attributes, + &checkpoint->checkpoint_creation_attributes, + sizeof (mar_ckpt_checkpoint_creation_attributes_t)); + + req_exec_ckpt_sync_checkpoint.active_replica_set = + checkpoint->active_replica_set; + + req_exec_ckpt_sync_checkpoint.unlinked = + checkpoint->unlinked; + + iovec.iov_base = (char *)&req_exec_ckpt_sync_checkpoint; + iovec.iov_len = sizeof (req_exec_ckpt_sync_checkpoint); + + return (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED)); +} + +static int sync_checkpoint_section_transmit ( + struct checkpoint *checkpoint, + struct checkpoint_section *checkpoint_section) +{ + struct req_exec_ckpt_sync_checkpoint_section req_exec_ckpt_sync_checkpoint_section; + struct iovec iovecs[3]; + + ENTER(); + + TRACE1 ("transmitting section\n"); + req_exec_ckpt_sync_checkpoint_section.header.size = + sizeof (struct req_exec_ckpt_sync_checkpoint_section); + req_exec_ckpt_sync_checkpoint_section.header.id = + SERVICE_ID_MAKE (CKPT_SERVICE, + MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINTSECTION); + + memcpy (&req_exec_ckpt_sync_checkpoint_section.ring_id, + &my_saved_ring_id, sizeof (struct memb_ring_id)); + + memcpy (&req_exec_ckpt_sync_checkpoint_section.checkpoint_name, + &checkpoint->name, sizeof (mar_name_t)); + + req_exec_ckpt_sync_checkpoint_section.ckpt_id = checkpoint->ckpt_id; + + req_exec_ckpt_sync_checkpoint_section.id_len = + checkpoint_section->section_descriptor.section_id.id_len; + + req_exec_ckpt_sync_checkpoint_section.section_size = + checkpoint_section->section_descriptor.section_size; + + req_exec_ckpt_sync_checkpoint_section.section_size = + checkpoint_section->section_descriptor.section_size; + + req_exec_ckpt_sync_checkpoint_section.expiration_time = + checkpoint_section->section_descriptor.expiration_time; + + iovecs[0].iov_base = (char *)&req_exec_ckpt_sync_checkpoint_section; + iovecs[0].iov_len = sizeof (req_exec_ckpt_sync_checkpoint_section); + iovecs[1].iov_base = checkpoint_section->section_descriptor.section_id.id; + iovecs[1].iov_len = checkpoint_section->section_descriptor.section_id.id_len; + iovecs[2].iov_base = checkpoint_section->section_data; + iovecs[2].iov_len = checkpoint_section->section_descriptor.section_size; + + LEAVE(); + return (totempg_groups_mcast_joined (openais_group_handle, iovecs, 3, TOTEMPG_AGREED)); +} + +static int sync_checkpoint_refcount_transmit ( + struct checkpoint *checkpoint) +{ + struct req_exec_ckpt_sync_checkpoint_refcount req_exec_ckpt_sync_checkpoint_refcount; + struct iovec iovec; + + ENTER(); + + TRACE1 ("transmitting refcounts for checkpoints\n"); + req_exec_ckpt_sync_checkpoint_refcount.header.size = + sizeof (struct req_exec_ckpt_sync_checkpoint_refcount); + req_exec_ckpt_sync_checkpoint_refcount.header.id = + SERVICE_ID_MAKE (CKPT_SERVICE, + MESSAGE_REQ_EXEC_CKPT_SYNCCHECKPOINTREFCOUNT); + + memcpy (&req_exec_ckpt_sync_checkpoint_refcount.ring_id, + &my_saved_ring_id, sizeof (struct memb_ring_id)); + + memcpy (&req_exec_ckpt_sync_checkpoint_refcount.checkpoint_name, + &checkpoint->name, sizeof (mar_name_t)); + + req_exec_ckpt_sync_checkpoint_refcount.ckpt_id = checkpoint->ckpt_id; + + marshall_to_mar_refcount_set_t_all ( + req_exec_ckpt_sync_checkpoint_refcount.refcount_set, + checkpoint->refcount_set); + + iovec.iov_base = (char *)&req_exec_ckpt_sync_checkpoint_refcount; + iovec.iov_len = sizeof (struct req_exec_ckpt_sync_checkpoint_refcount); + + LEAVE(); + return (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED)); +} + +unsigned int sync_checkpoints_iterate (void) +{ + struct checkpoint *checkpoint; + struct checkpoint_section *checkpoint_section; + struct list_head *checkpoint_list; + struct list_head *section_list; + unsigned int res = 0; + + for (checkpoint_list = checkpoint_list_head.next; + checkpoint_list != &checkpoint_list_head; + checkpoint_list = checkpoint_list->next) { + + checkpoint = list_entry (checkpoint_list, struct checkpoint, list); + + res = sync_checkpoint_transmit (checkpoint); + if (res != 0) { + break; + } + for (section_list = checkpoint->sections_list_head.next; + section_list != &checkpoint->sections_list_head; + section_list = section_list->next) { + + checkpoint_section = list_entry (section_list, struct checkpoint_section, list); + res = sync_checkpoint_section_transmit (checkpoint, checkpoint_section); + } + } + return (res); +} + +unsigned int sync_refcounts_iterate (void) +{ + struct checkpoint *checkpoint; + struct list_head *list; + unsigned int res = 0; + + for (list = checkpoint_list_head.next; + list != &checkpoint_list_head; + list = list->next) { + + checkpoint = list_entry (list, struct checkpoint, list); + + res = sync_checkpoint_refcount_transmit (checkpoint); + if (res != 0) { + break; + } + } + return (res); +} + +static int ckpt_sync_process (void) +{ + unsigned int done_queueing = 1; + unsigned int continue_processing = 0; + unsigned int res; + + ENTER(); + + switch (my_sync_state) { + case SYNC_STATE_CHECKPOINT: + if (my_lowest_nodeid == this_ip->nodeid) { + TRACE1 ("should transmit checkpoints because lowest member in old configuration.\n"); + res = sync_checkpoints_iterate (); + + if (res == 0) { + done_queueing = 1; + } + } + if (done_queueing) { + sync_refcounts_enter (); + } + + /* + * TODO recover current iteration state + */ + continue_processing = 1; + break; + + case SYNC_STATE_REFCOUNT: + done_queueing = 1; + if (my_lowest_nodeid == this_ip->nodeid) { + TRACE1 ("transmit refcounts because this processor is the lowest member in old configuration.\n"); + res = sync_refcounts_iterate (); + } + if (done_queueing) { + continue_processing = 0; + } + break; + } + + LEAVE(); + return (continue_processing); +} + +static void ckpt_sync_activate (void) +{ + ENTER(); + + sync_checkpoints_free (&checkpoint_list_head); + + list_init (&checkpoint_list_head); + + if (!list_empty (&sync_checkpoint_list_head)) { + list_splice (&sync_checkpoint_list_head, &checkpoint_list_head); + } + + list_init (&sync_checkpoint_list_head); + + my_sync_state = SYNC_STATE_CHECKPOINT; + + LEAVE(); +} + +static void ckpt_sync_abort (void) +{ + sync_checkpoints_free (&sync_checkpoint_list_head); +} + +static void message_handler_req_exec_ckpt_sync_checkpoint ( + void *message, + unsigned int nodeid) +{ + struct req_exec_ckpt_sync_checkpoint *req_exec_ckpt_sync_checkpoint = + (struct req_exec_ckpt_sync_checkpoint *)message; + struct checkpoint *checkpoint = 0; + + ENTER(); + + /* + * Ignore messages from previous ring ids + */ + if (memcmp (&req_exec_ckpt_sync_checkpoint->ring_id, + &my_saved_ring_id, sizeof (struct memb_ring_id)) != 0) { + return; + } + + checkpoint = checkpoint_find_specific ( + &sync_checkpoint_list_head, + &req_exec_ckpt_sync_checkpoint->checkpoint_name, + req_exec_ckpt_sync_checkpoint->ckpt_id); + + /* + * If checkpoint doesn't exist, create one + */ + if (checkpoint == 0) { + checkpoint = malloc (sizeof (struct checkpoint)); + if (checkpoint == 0) { + LEAVE(); + openais_exit_error (AIS_DONE_OUT_OF_MEMORY); + } + memset (checkpoint, 0, sizeof (struct checkpoint)); + + memcpy (&checkpoint->name, + &req_exec_ckpt_sync_checkpoint->checkpoint_name, + sizeof (mar_name_t)); + + memcpy (&checkpoint->checkpoint_creation_attributes, + &req_exec_ckpt_sync_checkpoint->checkpoint_creation_attributes, + sizeof (mar_ckpt_checkpoint_creation_attributes_t)); + + memset (&checkpoint->refcount_set, 0, sizeof (struct refcount_set) * PROCESSOR_COUNT_MAX); + checkpoint->ckpt_id = req_exec_ckpt_sync_checkpoint->ckpt_id; + + checkpoint->active_replica_set = req_exec_ckpt_sync_checkpoint->active_replica_set; + + checkpoint->unlinked = req_exec_ckpt_sync_checkpoint->unlinked; + checkpoint->reference_count = 0; + checkpoint->retention_timer = 0; + checkpoint->section_count = 0; + + list_init (&checkpoint->list); + list_init (&checkpoint->sections_list_head); + list_add (&checkpoint->list, &sync_checkpoint_list_head); + + memset (checkpoint->refcount_set, 0, + sizeof (struct refcount_set) * PROCESSOR_COUNT_MAX); + } + + if (checkpoint->ckpt_id >= global_ckpt_id) { + global_ckpt_id = checkpoint->ckpt_id + 1; + } + + LEAVE(); +} + +static void message_handler_req_exec_ckpt_sync_checkpoint_section ( + void *message, + unsigned int nodeid) +{ + struct req_exec_ckpt_sync_checkpoint_section *req_exec_ckpt_sync_checkpoint_section = + (struct req_exec_ckpt_sync_checkpoint_section *)message; + struct checkpoint *checkpoint; + struct checkpoint_section *checkpoint_section; + char *section_contents; + char *section_id; + + ENTER(); + + /* + * Ignore messages from previous ring ids + */ + if (memcmp (&req_exec_ckpt_sync_checkpoint_section->ring_id, + &my_saved_ring_id, sizeof (struct memb_ring_id)) != 0) { + LEAVE(); + return; + } + + checkpoint = checkpoint_find_specific ( + &sync_checkpoint_list_head, + &req_exec_ckpt_sync_checkpoint_section->checkpoint_name, + req_exec_ckpt_sync_checkpoint_section->ckpt_id); + + assert (checkpoint != NULL); + + /* + * Determine if user-specified checkpoint section already exists + */ + checkpoint_section = checkpoint_section_find ( + checkpoint, + ((char *)req_exec_ckpt_sync_checkpoint_section) + + sizeof (struct req_exec_ckpt_sync_checkpoint_section), + req_exec_ckpt_sync_checkpoint_section->id_len); + if (checkpoint_section == NULL) { + /* + * Allocate checkpoint section + */ + checkpoint_section = malloc (sizeof (struct checkpoint_section)); + if (checkpoint_section == 0) { + LEAVE(); + openais_exit_error (AIS_DONE_OUT_OF_MEMORY); + } + section_contents = malloc (req_exec_ckpt_sync_checkpoint_section->section_size); + if (section_contents == 0) { + free (checkpoint_section); + LEAVE(); + openais_exit_error (AIS_DONE_OUT_OF_MEMORY); + } + if (req_exec_ckpt_sync_checkpoint_section->id_len) { + + section_id = malloc (req_exec_ckpt_sync_checkpoint_section->id_len + 1); + if (section_id == 0) { + free (checkpoint_section); + free (section_contents); + LEAVE(); + openais_exit_error (AIS_DONE_OUT_OF_MEMORY); + } + + /* + * Copy checkpoint section and section ID + */ + memcpy (section_id, + ((char *)req_exec_ckpt_sync_checkpoint_section) + + sizeof (struct req_exec_ckpt_sync_checkpoint_section), + req_exec_ckpt_sync_checkpoint_section->id_len); + + /* + * Null terminate the section id for printing purposes + */ + ((char*)(section_id))[req_exec_ckpt_sync_checkpoint_section->id_len] = '\0'; + + } else { + /* + * Default section + */ + section_id = NULL; + } + + memcpy (section_contents, + ((char *)req_exec_ckpt_sync_checkpoint_section) + + sizeof (struct req_exec_ckpt_sync_checkpoint_section) + + req_exec_ckpt_sync_checkpoint_section->id_len, + req_exec_ckpt_sync_checkpoint_section->section_size); + + /* + * Configure checkpoint section + */ + checkpoint_section->section_descriptor.section_id.id = (unsigned char *)section_id; + checkpoint_section->section_descriptor.section_id.id_len = + req_exec_ckpt_sync_checkpoint_section->id_len; + checkpoint_section->section_descriptor.section_size = + req_exec_ckpt_sync_checkpoint_section->section_size; + checkpoint_section->section_descriptor.expiration_time = + req_exec_ckpt_sync_checkpoint_section->expiration_time; + checkpoint_section->section_descriptor.section_state = + SA_CKPT_SECTION_VALID; + checkpoint_section->section_descriptor.last_update = 0; /* TODO current time */ + checkpoint_section->section_data = section_contents; + checkpoint_section->expiration_timer = 0; + + /* + * Add checkpoint section to checkpoint + */ + list_init (&checkpoint_section->list); + list_add (&checkpoint_section->list, + &checkpoint->sections_list_head); + checkpoint->section_count += 1; + } + + LEAVE(); +} + +static void message_handler_req_exec_ckpt_sync_checkpoint_refcount ( + void *message, + unsigned int nodeid) +{ + struct req_exec_ckpt_sync_checkpoint_refcount *req_exec_ckpt_sync_checkpoint_refcount + = (struct req_exec_ckpt_sync_checkpoint_refcount *)message; + struct checkpoint *checkpoint; + unsigned int i, j; + + ENTER(); + + /* + * Ignore messages from previous ring ids + */ + if (memcmp (&req_exec_ckpt_sync_checkpoint_refcount->ring_id, + &my_saved_ring_id, sizeof (struct memb_ring_id)) != 0) { + LEAVE(); + return; + } + + checkpoint = checkpoint_find_specific ( + &sync_checkpoint_list_head, + &req_exec_ckpt_sync_checkpoint_refcount->checkpoint_name, + req_exec_ckpt_sync_checkpoint_refcount->ckpt_id); + + assert (checkpoint != NULL); + + for (i = 0; i < PROCESSOR_COUNT_MAX; i++) { + if (req_exec_ckpt_sync_checkpoint_refcount->refcount_set[i].nodeid == 0) { + break; + } + for (j = 0; j < PROCESSOR_COUNT_MAX; j++) { + if (checkpoint->refcount_set[j].nodeid == 0) { + checkpoint->refcount_set[j].nodeid = + req_exec_ckpt_sync_checkpoint_refcount->refcount_set[i].nodeid; + checkpoint->refcount_set[j].refcount = + req_exec_ckpt_sync_checkpoint_refcount->refcount_set[i].refcount; + /* + * No match found, added processor reference count + */ + break; + } + + if (req_exec_ckpt_sync_checkpoint_refcount->refcount_set[i].nodeid == checkpoint->refcount_set[j].nodeid) { + checkpoint->refcount_set[j].refcount += + req_exec_ckpt_sync_checkpoint_refcount->refcount_set[i].refcount; + /* + * Found match, so look at next processor ref count + */ + break; + } + } + } + + sync_refcount_calculate (checkpoint); + + LEAVE(); +} + + +static void ckpt_dump_fn (void) { -#ifdef DEBUG struct list_head *checkpoint_list; struct checkpoint *checkpoint; struct list_head *checkpoint_section_list; @@ -4270,8 +3900,8 @@ static void dump_fn (void) log_printf (LOG_LEVEL_NOTICE, "Checkpoint %s (%d):", checkpoint->name.value, checkpoint->name.length); log_printf (LOG_LEVEL_NOTICE, " id: %u", checkpoint->ckpt_id); - log_printf (LOG_LEVEL_NOTICE, " sec cnt: %u", checkpoint->sectionCount); - log_printf (LOG_LEVEL_NOTICE, " ref cnt: %u", checkpoint->referenceCount); + log_printf (LOG_LEVEL_NOTICE, " sec cnt: %u", checkpoint->section_count); + log_printf (LOG_LEVEL_NOTICE, " ref cnt: %u", checkpoint->reference_count); log_printf (LOG_LEVEL_NOTICE, " unlinked: %u", checkpoint->unlinked); for (checkpoint_section_list = checkpoint->sections_list_head.next; @@ -4292,6 +3922,4 @@ static void dump_fn (void) section->section_descriptor.expiration_time); } } -#endif } -