From 91c6b6d699135d6864411f457850d221db8cb49f Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Tue, 28 Jun 2005 19:01:25 +0000 Subject: [PATCH] Patch from Muni Bajpai to improve synchronization after a merge. git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@713 fd59a12c-fef9-0310-b244-a6a79926bd2f --- exec/aispoll.c | 26 ++ exec/aispoll.h | 4 + exec/ckpt.c | 769 ++++++++++++++++++++++++++++++++++++------------ exec/tlist.c | 11 + exec/tlist.h | 2 + test/testckpt.c | 4 +- 6 files changed, 630 insertions(+), 186 deletions(-) diff --git a/exec/aispoll.c b/exec/aispoll.c index fee8bee7..fcc14117 100644 --- a/exec/aispoll.c +++ b/exec/aispoll.c @@ -342,6 +342,32 @@ error_exit: return (-1); } +int poll_timer_delete_data ( + poll_handle handle, + poll_timer_handle timer_handle) { + struct poll_instance *poll_instance; + SaErrorT error; + + if (timer_handle == 0) { + return (0); + } + error = saHandleInstanceGet (&poll_instance_database, handle, + (void *)&poll_instance); + if (error != SA_OK) { + goto error_exit; + } + + timerlist_del_data (&poll_instance->timerlist, (void *)timer_handle); + + saHandleInstancePut (&poll_instance_database, handle); + + return (0); + +error_exit: + return (-1); + +} + int poll_entry_compare (const void *a, const void *b) { struct poll_entry *poll_entry_a = (struct poll_entry *)a; diff --git a/exec/aispoll.h b/exec/aispoll.h index 5fdaa7e2..dc696ac3 100644 --- a/exec/aispoll.h +++ b/exec/aispoll.h @@ -71,6 +71,10 @@ int poll_timer_delete ( poll_handle handle, poll_timer_handle timer_handle); +int poll_timer_delete_data ( + poll_handle handle, + poll_timer_handle timer_handle); + int poll_run ( poll_handle handle); diff --git a/exec/ckpt.c b/exec/ckpt.c index 1144632d..3c46ebd6 100644 --- a/exec/ckpt.c +++ b/exec/ckpt.c @@ -43,6 +43,7 @@ #include #include #include +#include #include "../include/ais_types.h" #include "../include/saCkpt.h" @@ -60,6 +61,13 @@ #define CKPT_MAX_SECTION_DATA_SEND (1024*400) #include "print.h" +struct ckpt_identifier { + SaNameT ckpt_name; + SaCkptSectionIdT ckpt_section_id; +}; + +static int process_localhost_transition = 0; + DECLARE_LIST_INIT(checkpoint_list_head); DECLARE_LIST_INIT(checkpoint_iterator_list_head); @@ -143,15 +151,20 @@ static int message_handler_req_lib_ckpt_checkpointsynchronizeasync (struct conn_ static int message_handler_req_lib_ckpt_sectioniteratorinitialize (struct conn_info *conn_info, void *message); static int message_handler_req_lib_ckpt_sectioniteratornext (struct conn_info *conn_info, void *message); + static void ckpt_recovery_activate (void); static void ckpt_recovery_initialize (void); static int ckpt_recovery_process (void); static void ckpt_recovery_finalize(); static void ckpt_recovery_abort(void); -static void ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries); +static void ckpt_recovery_process_members_exit(struct in_addr *left_list, + int left_list_entries); +static void ckpt_replace_localhost_ip (struct in_addr *joined_list); void checkpoint_release (struct saCkptCheckpoint *checkpoint); void timer_function_retention (void *data); +unsigned int abstime_to_msec (SaTimeT time); +void timer_function_section_expire (void *data); static int recovery_checkpoint_open(SaNameT *checkpointName, SaCkptCheckpointCreationAttributesT *ckptAttributes, @@ -324,13 +337,7 @@ static int processor_index_set(struct in_addr *proc_addr, { int i; for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { - /* - * If the source addresses match then this processor index - * has already been set - */ - if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) { - return -1; - } else if (ckpt_refcount[i].addr.s_addr == 0) { + if (ckpt_refcount[i].addr.s_addr == 0) { /* * If the source addresses do not match and this element * has no stored value then store the new value and @@ -339,14 +346,74 @@ static int processor_index_set(struct in_addr *proc_addr, memcpy(&ckpt_refcount[i].addr, proc_addr, sizeof(struct in_addr)); return i; } + /* + * If the source addresses match then this processor index + * has already been set + */ + else if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) { + return -1; + } + } /* * Could not Find an empty slot * to store the new Processor. */ + for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { + if (ckpt_refcount[i].addr.s_addr == 0) { + log_printf (LOG_LEVEL_ERROR,"Processor Set: Index %d has proc 0 and count 0\n", i); + } + else { + log_printf (LOG_LEVEL_ERROR,"Processor Set: Index %d has proc %s and count %d\n", + i, + inet_ntoa(ckpt_refcount[i].addr), + ckpt_refcount[i].count); + } + } + return -1; } +static int processor_add (struct in_addr *proc_addr, int count, struct ckpt_refcnt *ckpt_refcount) +{ + int i; + for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { + if (ckpt_refcount[i].addr.s_addr == 0) { + log_printf (LOG_LEVEL_DEBUG,"processor_add found empty slot to insert new item\n"); + memcpy(&ckpt_refcount[i].addr, proc_addr, sizeof(struct in_addr)); + ckpt_refcount[i].count = count; + return i; + } + /*Dont know how we missed this in the processor find but update this*/ + else if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) { + ckpt_refcount[i].count += count; + log_printf (LOG_LEVEL_DEBUG,"processor_add for existent proc. ip %s, New count = %d\n", + inet_ntoa(ckpt_refcount[i].addr), + ckpt_refcount[i].count); + + return i; + } + } + /* + * Could not Find an empty slot + * to store the new Processor. + */ + log_printf (LOG_LEVEL_ERROR,"Processor Add Failed. Dumping Refcount Array\n"); + for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { + if (ckpt_refcount[i].addr.s_addr == 0) { + log_printf (LOG_LEVEL_ERROR,"Processor Add: Index %d has proc 0 and count 0\n", i); + } + else { + log_printf (LOG_LEVEL_ERROR,"Processor Add: Index %d has proc %s and count %d\n", + i, + inet_ntoa(ckpt_refcount[i].addr), + ckpt_refcount[i].count); + } + } + return -1; + +} + static int processor_index_find(struct in_addr *proc_addr, struct ckpt_refcnt *ckpt_refcount) { @@ -381,6 +448,44 @@ static void initialize_ckpt_refcount_array (struct ckpt_refcnt *ckpt_refcount) memset((char*)ckpt_refcount, 0, PROCESSOR_COUNT_MAX * sizeof(struct ckpt_refcnt)); } +static void merge_ckpt_refcounts(struct ckpt_refcnt *local, struct ckpt_refcnt *network) +{ + int index,i; + + for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { + if (local[i].addr.s_addr == 0) { + continue; + } + index = processor_index_find (&local[i].addr, network); + if (index == -1) { /*Could Not Find the Local Entry in the remote.Add to it*/ + log_printf (LOG_LEVEL_DEBUG,"calling processor_add for ip %s, count %d\n", + inet_ntoa(local[i].addr), + local[i].count); + index = processor_add (&local[i].addr, local[i].count, network); + if (index == -1) { + log_printf(LOG_LEVEL_ERROR, + "merge_ckpt_refcounts : could not add a new processor as the MAX limit of procs is reached.Exiting\n"); + assert(0); + } + } + else { + if (local[i].count == network[index].count) { + /*Nothing to do here as the network is already up 2 date*/ + log_printf (LOG_LEVEL_DEBUG,"merge_ckpt_refcounts counts match, continue\n"); + continue; + } + else { + /*Found a match for this proc in the Network choose the larger of the 2.*/ + network[index].count += local[i].count; + log_printf (LOG_LEVEL_DEBUG,"setting count for %s = %d\n", + inet_ntoa(network[index].addr), + network[index].count); + } + } + } +} + + static void ckpt_recovery_initialize (void) { struct list_head *checkpoint_list; @@ -424,6 +529,7 @@ static void ckpt_recovery_initialize (void) savedSection = (struct saCkptCheckpointSection *) malloc (sizeof(struct saCkptCheckpointSection)); assert(savedSection); + poll_timer_delete_data (aisexec_poll_handle, section->expiration_timer); memcpy(savedSection, section, sizeof(struct saCkptCheckpointSection)); list_init(&savedSection->list); list_add(&savedSection->list,&savedCheckpoint->checkpointSectionsListHead); @@ -441,7 +547,7 @@ static void ckpt_recovery_initialize (void) static int ckpt_recovery_process (void) { - + int i; struct req_exec_ckpt_synchronize_state request_exec_sync_state; struct req_exec_ckpt_synchronize_section request_exec_sync_section; struct iovec iovecs[3]; @@ -487,9 +593,9 @@ static int ckpt_recovery_process (void) if (recovery_section_send_flag == 0) { if ((int)ckptCheckpointSection->sectionDescriptor.sectionId.id) { log_printf (LOG_LEVEL_DEBUG, "CKPT: New Sync State Message for ckpt = %s, section = %s.\n", - (char*)&checkpoint->name.value, - ((char*)ckptCheckpointSection->sectionDescriptor.sectionId.id)); - } else { + (char*)&checkpoint->name.value, + ((char*)ckptCheckpointSection->sectionDescriptor.sectionId.id)); + } else { log_printf (LOG_LEVEL_DEBUG, "CKPT: New Sync State Message for ckpt = %s, section = default section.\n", (char*)&checkpoint->name.value); } @@ -509,7 +615,21 @@ static int ckpt_recovery_process (void) checkpoint->ckpt_refcount, sizeof(struct ckpt_refcnt)*PROCESSOR_COUNT_MAX); request_exec_sync_state.sectionDescriptor.sectionId.id = 0; - + + log_printf (LOG_LEVEL_DEBUG, "CKPT: New Sync State Message Values\n"); + for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { + if (request_exec_sync_state.ckpt_refcount[i].addr.s_addr == 0) { + log_printf (LOG_LEVEL_DEBUG,"Index %d has proc 0 and count %d\n", i, + request_exec_sync_state.ckpt_refcount[i].count); + } + else { + log_printf (LOG_LEVEL_DEBUG,"Index %d has proc %s and count %d\n", + i, + inet_ntoa(request_exec_sync_state.ckpt_refcount[i].addr), + request_exec_sync_state.ckpt_refcount[i].count); + } + } + iovecs[0].iov_base = (char *)&request_exec_sync_state; iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_state); @@ -578,7 +698,7 @@ static int ckpt_recovery_process (void) /* * Populate the Sync Section Request */ - iovecs[0].iov_base = (char *)&request_exec_sync_section; + iovecs[0].iov_base = (char *)&request_exec_sync_section; iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_section); /* @@ -643,8 +763,9 @@ static void ckpt_recovery_finalize () { struct list_head *checkpoint_list; struct list_head *checkpoint_section_list; - struct saCkptCheckpoint *checkpoint; - struct saCkptCheckpointSection *section; + struct saCkptCheckpoint *checkpoint; + struct saCkptCheckpointSection *section; + struct ckpt_identifier *ckpt_id; /* * Remove All elements from old checkpoint @@ -661,6 +782,7 @@ static void ckpt_recovery_finalize () struct saCkptCheckpointSection, list); list_del (§ion->list); + log_printf (LOG_LEVEL_DEBUG, "ckpt_recovery_finalize removed 0x%x.\n", section); free (section); checkpoint_section_list = checkpoint->checkpointSectionsListHead.next; } @@ -681,6 +803,38 @@ static void ckpt_recovery_finalize () checkpoint_recovery_list_head.next->prev = &checkpoint_list_head; memcpy(&checkpoint_list_head, &checkpoint_recovery_list_head, sizeof(struct list_head)); + /*Timers might have been started before recovery happened .. restart them ..*/ + for (checkpoint_list = checkpoint_list_head.next; + checkpoint_list != &checkpoint_list_head; + checkpoint_list = checkpoint_list->next) { + + checkpoint = list_entry (checkpoint_list, + struct saCkptCheckpoint, list); + + for (checkpoint_section_list = checkpoint->checkpointSectionsListHead.next; + checkpoint_section_list != &checkpoint->checkpointSectionsListHead; + checkpoint_section_list = checkpoint_section_list->next) { + section = list_entry (checkpoint_section_list, + struct saCkptCheckpointSection, list); + + if (section->sectionDescriptor.expirationTime != SA_TIME_END) { + ckpt_id = malloc (sizeof(struct ckpt_identifier)); + assert(ckpt_id); + memcpy(&ckpt_id->ckpt_name,&checkpoint->name,sizeof(SaNameT)); + memcpy(&ckpt_id->ckpt_section_id, §ion->sectionDescriptor.sectionId,sizeof(SaCkptSectionIdT)); + + poll_timer_add (aisexec_poll_handle, + abstime_to_msec (section->sectionDescriptor.expirationTime), + ckpt_id, + timer_function_section_expire, + §ion->expiration_timer); + log_printf (LOG_LEVEL_DEBUG, "CKPT: ckpt_recovery_initialize expiration timer = 0x%x\n", + section->expiration_timer); + } + } + } + + /* * Initialize the new list head for reuse. */ @@ -701,23 +855,62 @@ static void ckpt_recovery_abort (void) return; } -static void ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries) +static void ckpt_replace_localhost_ip (struct in_addr *joined_list) { + struct list_head *checkpoint_list; + struct saCkptCheckpoint *checkpoint; + struct in_addr local_ip; + int index; + + assert(joined_list); + + local_ip.s_addr = inet_addr("127.0.0.1"); + + for (checkpoint_list = checkpoint_list_head.next; + checkpoint_list != &checkpoint_list_head; + checkpoint_list = checkpoint_list->next) { + + checkpoint = list_entry (checkpoint_list, + struct saCkptCheckpoint, list); + index = processor_index_find(&local_ip, checkpoint->ckpt_refcount); + if (index == -1) { + continue; + } + memcpy(&checkpoint->ckpt_refcount[index].addr, joined_list, sizeof(struct in_addr)); + log_printf (LOG_LEVEL_DEBUG, "Transitioning From Local Host replacing 127.0.0.1 with %s ...\n", + inet_ntoa(*joined_list)); + + } + process_localhost_transition = 0; +} + + +static void ckpt_recovery_process_members_exit(struct in_addr *left_list, + int left_list_entries) { struct list_head *checkpoint_list; struct saCkptCheckpoint *checkpoint; struct in_addr *member; + struct in_addr local_ip; int index; int i; + + local_ip.s_addr = inet_addr("127.0.0.1"); if (left_list_entries == 0) { return; } + + if ((left_list_entries == 1) && + (left_list->s_addr == local_ip.s_addr)) { + process_localhost_transition = 1; + return; + } /* * Iterate left_list_entries. */ member = left_list; - for (i = 0; i < left_list_entries; i++) { + for (i = 0; i < left_list_entries; i++) { for (checkpoint_list = checkpoint_list_head.next; checkpoint_list != &checkpoint_list_head; checkpoint_list = checkpoint_list->next) { @@ -753,9 +946,11 @@ static void ckpt_recovery_process_members_exit(struct in_addr *left_list, int le &checkpoint->name.value); checkpoint_release (checkpoint); } else - if (checkpoint->referenceCount == 0) { + if ((checkpoint->expired == 0) && (checkpoint->referenceCount == 0)) { log_printf (LOG_LEVEL_DEBUG, "ckpt_recovery_process_members_exit: Starting timer to release checkpoint %s.\n", &checkpoint->name.value); + poll_timer_delete (aisexec_poll_handle, checkpoint->retention_timer); + poll_timer_add (aisexec_poll_handle, checkpoint->checkpointCreationAttributes.retentionDuration / 1000000, checkpoint, @@ -783,6 +978,9 @@ static int ckpt_confchg_fn ( if (recovery_state == SYNCHRONY_STATE_ENDED) { memcpy (&saved_ring_id, ring_id, sizeof(struct memb_ring_id)); } + if (process_localhost_transition) { + ckpt_replace_localhost_ip (joined_list); + } } else if (configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) { @@ -882,6 +1080,9 @@ static struct saCkptCheckpointSection *ckpt_checkpoint_find_globalSection ( (memcmp (ckptCheckpointSection->sectionDescriptor.sectionId.id, id, idLen) == 0)) { + log_printf (LOG_LEVEL_DEBUG, "Returning section %s(0x%x)\n", ckptCheckpointSection->sectionDescriptor.sectionId.id, + ckptCheckpointSection); + return (ckptCheckpointSection); } } @@ -890,6 +1091,7 @@ static struct saCkptCheckpointSection *ckpt_checkpoint_find_globalSection ( void checkpoint_section_release (struct saCkptCheckpointSection *section) { + log_printf (LOG_LEVEL_DEBUG, "CKPT: checkpoint_section_release expiration timer = 0x%x\n", section->expiration_timer); list_del (§ion->list); if (section->sectionDescriptor.sectionId.id) { free (section->sectionDescriptor.sectionId.id); @@ -977,6 +1179,8 @@ static int ckpt_exit_fn (struct conn_info *conn_info) if (conn_info->conn_info_partner->service != CKPT_SERVICE) { return 0; } + + log_printf(LOG_LEVEL_DEBUG, "ckpt_exit_fn conn_info = %#x, with fd = %d\n", conn_info, conn_info->fd); /* * close all checkpoints opened on this fd @@ -1187,12 +1391,26 @@ static int recovery_checkpoint_open(SaNameT *checkpointName, SaCkptCheckpointCreationAttributesT *ckptAttributes, struct ckpt_refcnt *ref_cnt) { - + int i; struct saCkptCheckpoint *ckptCheckpoint = 0; struct saCkptCheckpointSection *ckptCheckpointSection = 0; SaErrorT error = SA_AIS_OK; log_printf (LOG_LEVEL_DEBUG, "CKPT: recovery_checkpoint_open %s\n", &checkpointName->value); + log_printf (LOG_LEVEL_DEBUG, "CKPT: recovery_checkpoint_open refcount Values\n"); + for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) { + if (ref_cnt[i].addr.s_addr == 0) { + log_printf (LOG_LEVEL_DEBUG,"Index %d has proc 0 and count %d\n", i, + ref_cnt[i].count); + } + else { + log_printf (LOG_LEVEL_DEBUG,"Index %d has proc %s and count %d\n", + i, + inet_ntoa(ref_cnt[i].addr), + ref_cnt[i].count); + } + } + ckptCheckpoint = ckpt_checkpoint_find_global (checkpointName); @@ -1200,6 +1418,7 @@ static int recovery_checkpoint_open(SaNameT *checkpointName, * If checkpoint doesn't exist, create one */ if (ckptCheckpoint == 0) { + log_printf (LOG_LEVEL_DEBUG, "CKPT: recovery_checkpoint_open Allocating new Checkpoint %s\n", &checkpointName->value); ckptCheckpoint = malloc (sizeof (struct saCkptCheckpoint)); if (ckptCheckpoint == 0) { error = SA_AIS_ERR_NO_MEMORY; @@ -1245,6 +1464,14 @@ static int recovery_checkpoint_open(SaNameT *checkpointName, assert(ckptCheckpointSection->sectionData); memcpy(ckptCheckpointSection->sectionData, "Factory installed data\0", strlen("Factory installed data\0")+1); ckptCheckpointSection->expiration_timer = 0; + + initialize_ckpt_refcount_array(ckptCheckpoint->ckpt_refcount); + } + else { + /* + * Setup connection information and mark checkpoint as referenced + */ + log_printf (LOG_LEVEL_DEBUG, "CKPT: recovery CHECKPOINT reopened is %p\n", ckptCheckpoint); } /* @@ -1254,17 +1481,21 @@ static int recovery_checkpoint_open(SaNameT *checkpointName, error = SA_AIS_ERR_BAD_OPERATION; /* Is this the correct return ? */ goto error_exit; } - - initialize_ckpt_refcount_array(ckptCheckpoint->ckpt_refcount); + + /*CHECK to see if there are any existing ckpts*/ + if ((ckptCheckpoint->ckpt_refcount) && (ckpt_refcount_total(ckptCheckpoint->ckpt_refcount) > 0)) { + log_printf (LOG_LEVEL_DEBUG,"calling merge_ckpt_refcounts\n"); + merge_ckpt_refcounts(ckptCheckpoint->ckpt_refcount, ref_cnt); + } + else { + initialize_ckpt_refcount_array(ckptCheckpoint->ckpt_refcount); + } + + /*No Existing ckpts. Lets assign what we got over the network or the merged with network values*/ ckptCheckpoint->referenceCount = ckpt_refcount_total(ref_cnt); log_printf (LOG_LEVEL_DEBUG, "CKPT: OPEN ckptCheckpoint->referenceCount %d\n",ckptCheckpoint->referenceCount); memcpy(ckptCheckpoint->ckpt_refcount,ref_cnt,sizeof(struct ckpt_refcnt)*PROCESSOR_COUNT_MAX); - /* - * Setup connection information and mark checkpoint as referenced - */ - log_printf (LOG_LEVEL_DEBUG, "CKPT: recovery CHECKPOINT reopened is %p\n", ckptCheckpoint); - /* * Reset retention duration since this checkpoint was just opened */ @@ -1363,11 +1594,47 @@ unsigned int abstime_to_msec (SaTimeT time) void timer_function_section_expire (void *data) { - struct saCkptCheckpointSection *section = (struct saCkptCheckpointSection *)data; - if (section->sectionDescriptor.sectionId.id) { - log_printf (LOG_LEVEL_NOTICE, "CKPT: Expiring section %s\n", section->sectionDescriptor.sectionId.id); + struct saCkptCheckpoint *ckptCheckpoint = 0; + struct saCkptCheckpointSection *ckptCheckpointSection = 0; + struct ckpt_identifier *ckpt_id = 0; + + ckpt_id = (struct ckpt_identifier *)data; + log_printf (LOG_LEVEL_DEBUG, "CKPT: timer_function_section_expire data = 0x%x \n",data); + if (ckpt_id->ckpt_section_id.idLen && ckpt_id->ckpt_section_id.id) { + log_printf (LOG_LEVEL_DEBUG, "CKPT: Attempting to Expire section %s in ckpt %s\n", + ckpt_id->ckpt_section_id.id, + (char *)&ckpt_id->ckpt_name.value); } - checkpoint_section_release (section); + else { + log_printf (LOG_LEVEL_ERROR, "CKPT: timer_function_section_expire data incorect\n"); + goto free_mem; + } + + ckptCheckpoint = ckpt_checkpoint_find_global (&ckpt_id->ckpt_name); + if (ckptCheckpoint == 0) { + log_printf (LOG_LEVEL_ERROR, "CKPT: timer_function_section_expire could not find ckpt %s\n", + (char *)&ckpt_id->ckpt_name.value); + goto free_mem; + } + + ckptCheckpointSection = ckpt_checkpoint_find_globalSection (ckptCheckpoint, + ckpt_id->ckpt_section_id.id, + (int)ckpt_id->ckpt_section_id.idLen); + if (ckptCheckpointSection == 0) { + log_printf (LOG_LEVEL_ERROR, "CKPT: timer_function_section_expire could not find section %s in ckpt %s\n", + ckpt_id->ckpt_section_id.id, + (char *)&ckpt_id->ckpt_name.value); + goto free_mem; + } + + log_printf (LOG_LEVEL_DEBUG, "CKPT: Expiring section %s in ckpt %s\n", + ckpt_id->ckpt_section_id.id, + (char *)&ckpt_id->ckpt_name.value); + + checkpoint_section_release (ckptCheckpointSection); +free_mem : + free (ckpt_id); + } void timer_function_retention (void *data) @@ -1485,6 +1752,10 @@ static int message_handler_req_exec_ckpt_checkpointunlink (void *message, struct */ checkpoint_release (ckptCheckpoint); } + else if ( ckptCheckpoint->referenceCount > 0 ) { + ckptCheckpoint->unlinked = 0; + ckptCheckpoint->expired = 0; + } error_exit: /* @@ -1509,7 +1780,7 @@ static int message_handler_req_exec_ckpt_checkpointretentiondurationset (void *m checkpoint = ckpt_checkpoint_find_global (&req_exec_ckpt_checkpointretentiondurationset->checkpointName); if (checkpoint) { - log_printf (LOG_LEVEL_NOTICE, "CKPT: Setting retention duration for checkpoint %s\n", + log_printf (LOG_LEVEL_DEBUG, "CKPT: Setting retention duration for checkpoint %s\n", getSaNameT (&req_exec_ckpt_checkpointretentiondurationset->checkpointName)); if (checkpoint->unlinked == 0) { checkpoint->checkpointCreationAttributes.retentionDuration = @@ -1551,7 +1822,7 @@ static int message_handler_req_exec_ckpt_checkpointretentiondurationexpire (void struct iovec iovecs[2]; checkpoint = ckpt_checkpoint_find_global (&req_exec_ckpt_checkpointretentiondurationexpire->checkpointName); - if (checkpoint && checkpoint->expired == 0) { + if (checkpoint && (checkpoint->expired == 0) && (checkpoint->referenceCount < 1)) { log_printf (LOG_LEVEL_NOTICE, "CKPT: Expiring checkpoint %s\n", getSaNameT (&req_exec_ckpt_checkpointretentiondurationexpire->checkpointName)); checkpoint->expired = 1; @@ -1582,6 +1853,7 @@ static int recovery_section_create (SaCkptSectionDescriptorT *sectionDescriptor, struct saCkptCheckpointSection *ckptCheckpointSection; void *initialData; void *sectionId; + struct ckpt_identifier *ckpt_id = 0; SaErrorT error = SA_AIS_OK; if ((int)sectionDescriptor->sectionId.idLen) { @@ -1661,12 +1933,21 @@ static int recovery_section_create (SaCkptSectionDescriptorT *sectionDescriptor, ckptCheckpointSection->expiration_timer = 0; ckptCheckpointSection->sectionDescriptor.sectionId.id = sectionId; - if (sectionDescriptor->expirationTime != SA_TIME_END) { + if (sectionDescriptor->expirationTime != SA_TIME_END) { + ckpt_id = malloc (sizeof(struct ckpt_identifier)); + assert(ckpt_id); + memcpy(&ckpt_id->ckpt_name,checkpointName,sizeof(SaNameT)); + memcpy(&ckpt_id->ckpt_section_id, &ckptCheckpointSection->sectionDescriptor.sectionId,sizeof(SaCkptSectionIdT)); + log_printf (LOG_LEVEL_DEBUG, "CKPT: recovery_section_create Enqueuing Timer to Expire section %s in ckpt %s\n", + ckpt_id->ckpt_section_id.id, + (char *)&ckpt_id->ckpt_name.value); poll_timer_add (aisexec_poll_handle, abstime_to_msec (ckptCheckpointSection->sectionDescriptor.expirationTime), - ckptCheckpointSection, + ckpt_id, timer_function_section_expire, &ckptCheckpointSection->expiration_timer); + log_printf (LOG_LEVEL_DEBUG, "CKPT: recovery_section_create expiration timer = 0x%x\n", + ckptCheckpointSection->expiration_timer); } /* @@ -1689,6 +1970,7 @@ static int message_handler_req_exec_ckpt_sectioncreate (void *message, struct in struct saCkptCheckpointSection *ckptCheckpointSection; void *initialData; void *sectionId; + struct ckpt_identifier *ckpt_id = 0; SaErrorT error = SA_AIS_OK; log_printf (LOG_LEVEL_DEBUG, "Executive request to create a checkpoint section.\n"); @@ -1762,13 +2044,25 @@ static int message_handler_req_exec_ckpt_sectioncreate (void *message, struct in ckptCheckpointSection->expiration_timer = 0; if (req_lib_ckpt_sectioncreate->expirationTime != SA_TIME_END) { + ckpt_id = malloc (sizeof(struct ckpt_identifier)); + assert(ckpt_id); + memcpy(&ckpt_id->ckpt_name,&req_exec_ckpt_sectioncreate->checkpointName,sizeof(SaNameT)); + memcpy(&ckpt_id->ckpt_section_id, &ckptCheckpointSection->sectionDescriptor.sectionId,sizeof(SaCkptSectionIdT)); + log_printf (LOG_LEVEL_DEBUG, "CKPT: req_exec_ckpt_sectioncreate Enqueuing Timer to Expire section %s in ckpt %s\n", + ckpt_id->ckpt_section_id.id, + (char *)&ckpt_id->ckpt_name.value); poll_timer_add (aisexec_poll_handle, abstime_to_msec (ckptCheckpointSection->sectionDescriptor.expirationTime), - ckptCheckpointSection, + ckpt_id, timer_function_section_expire, &ckptCheckpointSection->expiration_timer); + log_printf (LOG_LEVEL_DEBUG, "CKPT: req_exec_ckpt_sectionicreate expiration timer = 0x%x\n", + ckptCheckpointSection->expiration_timer); } + log_printf (LOG_LEVEL_DEBUG, "CKPT: message_handler_req_exec_ckpt_sectioncreate created section with id = %s, idLen = %d\n", + ckptCheckpointSection->sectionDescriptor.sectionId.id, + ckptCheckpointSection->sectionDescriptor.sectionId.idLen); /* * Add checkpoint section to checkpoint */ @@ -1849,6 +2143,7 @@ static int message_handler_req_exec_ckpt_sectionexpirationtimeset (void *message struct res_lib_ckpt_sectionexpirationtimeset res_lib_ckpt_sectionexpirationtimeset; struct saCkptCheckpoint *ckptCheckpoint; struct saCkptCheckpointSection *ckptCheckpointSection; + struct ckpt_identifier *ckpt_id = 0; SaErrorT error = SA_AIS_OK; log_printf (LOG_LEVEL_DEBUG, "Executive request to set section expiratoin time\n"); @@ -1885,11 +2180,21 @@ static int message_handler_req_exec_ckpt_sectionexpirationtimeset (void *message ckptCheckpointSection->expiration_timer = 0; if (req_lib_ckpt_sectionexpirationtimeset->expirationTime != SA_TIME_END) { + ckpt_id = malloc (sizeof(struct ckpt_identifier)); + assert(ckpt_id); + memcpy(&ckpt_id->ckpt_name,&req_exec_ckpt_sectionexpirationtimeset->checkpointName,sizeof(SaNameT)); + memcpy(&ckpt_id->ckpt_section_id, &ckptCheckpointSection->sectionDescriptor.sectionId,sizeof(SaCkptSectionIdT)); + log_printf (LOG_LEVEL_DEBUG, "CKPT: req_exec_ckpt_sectionexpirationtimeset Enqueuing Timer to Expire section %s in ckpt %s, ref = 0x%x\n", + ckpt_id->ckpt_section_id.id, + (char *)&ckpt_id->ckpt_name.value, + ckpt_id); poll_timer_add (aisexec_poll_handle, abstime_to_msec (ckptCheckpointSection->sectionDescriptor.expirationTime), - ckptCheckpointSection, + ckpt_id, timer_function_section_expire, &ckptCheckpointSection->expiration_timer); + log_printf (LOG_LEVEL_DEBUG, "CKPT: req_exec_ckpt_sectionexpirationtimeset expiration timer = 0x%x\n", + ckptCheckpointSection->expiration_timer); } error_exit: @@ -1964,7 +2269,7 @@ static int message_handler_req_exec_ckpt_sectionwrite (void *message, struct in_ struct req_lib_ckpt_sectionwrite *req_lib_ckpt_sectionwrite = (struct req_lib_ckpt_sectionwrite *)&req_exec_ckpt_sectionwrite->req_lib_ckpt_sectionwrite; struct res_lib_ckpt_sectionwrite res_lib_ckpt_sectionwrite; struct saCkptCheckpoint *ckptCheckpoint; - struct saCkptCheckpointSection *ckptCheckpointSection; + struct saCkptCheckpointSection *ckptCheckpointSection = 0; int sizeRequired; void *sectionData; SaErrorT error = SA_AIS_OK; @@ -1972,6 +2277,7 @@ static int message_handler_req_exec_ckpt_sectionwrite (void *message, struct in_ log_printf (LOG_LEVEL_DEBUG, "Executive request to section write.\n"); ckptCheckpoint = ckpt_checkpoint_find_global (&req_exec_ckpt_sectionwrite->checkpointName); if (ckptCheckpoint == 0) { + log_printf (LOG_LEVEL_ERROR, "CKPT: ckpt_checkpoint_find_global returned 0 Calling error_exit.\n"); error = SA_AIS_ERR_NOT_EXIST; goto error_exit; } @@ -1987,10 +2293,10 @@ static int message_handler_req_exec_ckpt_sectionwrite (void *message, struct in_ req_lib_ckpt_sectionwrite->idLen); if (ckptCheckpointSection == 0) { if (req_lib_ckpt_sectionwrite->idLen == 0) { - log_printf (LOG_LEVEL_ERROR, "CANT FIND DEFAULT SECTION.\n"); + log_printf (LOG_LEVEL_DEBUG, "CANT FIND DEFAULT SECTION.\n"); } else { - printf ("CANT FIND SECTION '%s'\n", + log_printf (LOG_LEVEL_DEBUG, "CANT FIND SECTION '%s'\n", ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite)); } error = SA_AIS_ERR_NOT_EXIST; @@ -2004,6 +2310,7 @@ static int message_handler_req_exec_ckpt_sectionwrite (void *message, struct in_ if (sizeRequired > ckptCheckpointSection->sectionDescriptor.sectionSize) { sectionData = realloc (ckptCheckpointSection->sectionData, sizeRequired); if (sectionData == 0) { + log_printf (LOG_LEVEL_ERROR, "CKPT: sectionData realloc returned 0 Calling error_exit.\n"); error = SA_AIS_ERR_NO_MEMORY; goto error_exit; } @@ -2041,7 +2348,6 @@ error_exit: &res_lib_ckpt_sectionwrite, sizeof (struct res_lib_ckpt_sectionwrite)); } - return (0); } @@ -2273,23 +2579,38 @@ static int message_handler_req_lib_ckpt_checkpointclose (struct conn_info *conn_ struct req_lib_ckpt_checkpointclose *req_lib_ckpt_checkpointclose = (struct req_lib_ckpt_checkpointclose *)message; struct req_exec_ckpt_checkpointclose req_exec_ckpt_checkpointclose; struct iovec iovecs[2]; + struct saCkptCheckpoint *checkpoint; + struct res_lib_ckpt_checkpointclose res_lib_ckpt_checkpointclose; - req_exec_ckpt_checkpointclose.header.size = - sizeof (struct req_exec_ckpt_checkpointclose); - req_exec_ckpt_checkpointclose.header.id = MESSAGE_REQ_EXEC_CKPT_CHECKPOINTCLOSE; + checkpoint = ckpt_checkpoint_find_global (&req_lib_ckpt_checkpointclose->checkpointName); + if (checkpoint && (checkpoint->expired == 0)) { + req_exec_ckpt_checkpointclose.header.size = + sizeof (struct req_exec_ckpt_checkpointclose); + req_exec_ckpt_checkpointclose.header.id = MESSAGE_REQ_EXEC_CKPT_CHECKPOINTCLOSE; - message_source_set (&req_exec_ckpt_checkpointclose.source, conn_info); + message_source_set (&req_exec_ckpt_checkpointclose.source, conn_info); - memcpy (&req_exec_ckpt_checkpointclose.checkpointName, - &req_lib_ckpt_checkpointclose->checkpointName, sizeof (SaNameT)); + memcpy (&req_exec_ckpt_checkpointclose.checkpointName, + &req_lib_ckpt_checkpointclose->checkpointName, sizeof (SaNameT)); - iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointclose; - iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointclose); + iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointclose; + iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointclose); - if (totempg_send_ok (sizeof (struct req_exec_ckpt_checkpointclose))) { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + if (totempg_send_ok (sizeof (struct req_exec_ckpt_checkpointclose))) { + assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + } } + else { + log_printf (LOG_LEVEL_ERROR, "#### CKPT: Could Not Find the Checkpoint to close so Returning Error. ####\n"); + res_lib_ckpt_checkpointclose.header.size = sizeof (struct res_lib_ckpt_checkpointclose); + res_lib_ckpt_checkpointclose.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTCLOSE; + res_lib_ckpt_checkpointclose.header.error = SA_AIS_ERR_NOT_EXIST; + + libais_send_response (conn_info, + &res_lib_ckpt_checkpointclose, + sizeof (struct res_lib_ckpt_checkpointclose)); + } return (0); } @@ -2384,38 +2705,47 @@ static int message_handler_req_lib_ckpt_checkpointstatusget (struct conn_info *c * Count memory used by checkpoint sections */ checkpoint = ckpt_checkpoint_find_global (&req_lib_ckpt_checkpointstatusget->checkpointName); + + if (checkpoint && (checkpoint->expired == 0)) { - for (checkpoint_section_list = checkpoint->checkpointSectionsListHead.next; - checkpoint_section_list != &checkpoint->checkpointSectionsListHead; - checkpoint_section_list = checkpoint_section_list->next) { + for (checkpoint_section_list = checkpoint->checkpointSectionsListHead.next; + checkpoint_section_list != &checkpoint->checkpointSectionsListHead; + checkpoint_section_list = checkpoint_section_list->next) { - checkpointSection = list_entry (checkpoint_section_list, - struct saCkptCheckpointSection, list); + checkpointSection = list_entry (checkpoint_section_list, + struct saCkptCheckpointSection, list); - memoryUsed += checkpointSection->sectionDescriptor.sectionSize; - numberOfSections += 1; + memoryUsed += checkpointSection->sectionDescriptor.sectionSize; + numberOfSections += 1; + } + + /* + * Build checkpoint status get response + */ + res_lib_ckpt_checkpointstatusget.header.size = sizeof (struct res_lib_ckpt_checkpointstatusget); + res_lib_ckpt_checkpointstatusget.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET; + if (checkpoint->active_replica_set == 1) { + res_lib_ckpt_checkpointstatusget.header.error = SA_AIS_OK; + } else { + res_lib_ckpt_checkpointstatusget.header.error = SA_AIS_ERR_NOT_EXIST; + } + + memcpy (&res_lib_ckpt_checkpointstatusget.checkpointDescriptor.checkpointCreationAttributes, + &checkpoint->checkpointCreationAttributes, + sizeof (SaCkptCheckpointCreationAttributesT)); + res_lib_ckpt_checkpointstatusget.checkpointDescriptor.numberOfSections = numberOfSections; + res_lib_ckpt_checkpointstatusget.checkpointDescriptor.memoryUsed = memoryUsed; } + else { + log_printf (LOG_LEVEL_ERROR, "#### CKPT: Could Not Find the Checkpoint's status so Returning Error. ####\n"); - /* - * Build checkpoint status get response - */ - res_lib_ckpt_checkpointstatusget.header.size = sizeof (struct res_lib_ckpt_checkpointstatusget); - res_lib_ckpt_checkpointstatusget.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET; - if (checkpoint->active_replica_set == 1) { - res_lib_ckpt_checkpointstatusget.header.error = SA_AIS_OK; - } else { + res_lib_ckpt_checkpointstatusget.header.size = sizeof (struct res_lib_ckpt_checkpointstatusget); + res_lib_ckpt_checkpointstatusget.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET; res_lib_ckpt_checkpointstatusget.header.error = SA_AIS_ERR_NOT_EXIST; } - - memcpy (&res_lib_ckpt_checkpointstatusget.checkpointDescriptor.checkpointCreationAttributes, - &checkpoint->checkpointCreationAttributes, - sizeof (SaCkptCheckpointCreationAttributesT)); - res_lib_ckpt_checkpointstatusget.checkpointDescriptor.numberOfSections = numberOfSections; - res_lib_ckpt_checkpointstatusget.checkpointDescriptor.memoryUsed = memoryUsed; - log_printf (LOG_LEVEL_DEBUG, "before sending message\n"); libais_send_response (conn_info, &res_lib_ckpt_checkpointstatusget, - sizeof (struct res_lib_ckpt_checkpointstatusget)); + sizeof (struct res_lib_ckpt_checkpointstatusget)); return (0); } @@ -2425,35 +2755,43 @@ static int message_handler_req_lib_ckpt_sectioncreate (struct conn_info *conn_in struct req_exec_ckpt_sectioncreate req_exec_ckpt_sectioncreate; struct iovec iovecs[2]; struct saCkptCheckpoint *checkpoint; + struct res_lib_ckpt_sectioncreate res_lib_ckpt_sectioncreate; log_printf (LOG_LEVEL_DEBUG, "Section create from API fd %d\n", conn_info->fd); checkpoint = ckpt_checkpoint_find_global (&req_lib_ckpt_sectioncreate->checkpointName); - /* - * checkpoint opened is writeable mode so send message to cluster - */ - req_exec_ckpt_sectioncreate.header.id = MESSAGE_REQ_EXEC_CKPT_SECTIONCREATE; - req_exec_ckpt_sectioncreate.header.size = sizeof (struct req_exec_ckpt_sectioncreate); + if (checkpoint && (checkpoint->expired == 0)) { + /* + * checkpoint opened is writeable mode so send message to cluster + */ + req_exec_ckpt_sectioncreate.header.id = MESSAGE_REQ_EXEC_CKPT_SECTIONCREATE; + req_exec_ckpt_sectioncreate.header.size = sizeof (struct req_exec_ckpt_sectioncreate); - memcpy (&req_exec_ckpt_sectioncreate.req_lib_ckpt_sectioncreate, - req_lib_ckpt_sectioncreate, - sizeof (struct req_lib_ckpt_sectioncreate)); + memcpy (&req_exec_ckpt_sectioncreate.req_lib_ckpt_sectioncreate, + req_lib_ckpt_sectioncreate, + sizeof (struct req_lib_ckpt_sectioncreate)); + + memcpy (&req_exec_ckpt_sectioncreate.checkpointName, + &req_lib_ckpt_sectioncreate->checkpointName, + sizeof (SaNameT)); - memcpy (&req_exec_ckpt_sectioncreate.checkpointName, - &req_lib_ckpt_sectioncreate->checkpointName, - sizeof (SaNameT)); - - message_source_set (&req_exec_ckpt_sectioncreate.source, conn_info); - - iovecs[0].iov_base = (char *)&req_exec_ckpt_sectioncreate; - iovecs[0].iov_len = sizeof (req_exec_ckpt_sectioncreate); - /* - * Send section name and initial data in message - */ - iovecs[1].iov_base = ((char *)req_lib_ckpt_sectioncreate) + sizeof (struct req_lib_ckpt_sectioncreate); - iovecs[1].iov_len = req_lib_ckpt_sectioncreate->header.size - sizeof (struct req_lib_ckpt_sectioncreate); - req_exec_ckpt_sectioncreate.header.size += iovecs[1].iov_len; + message_source_set (&req_exec_ckpt_sectioncreate.source, conn_info); + iovecs[0].iov_base = (char *)&req_exec_ckpt_sectioncreate; + iovecs[0].iov_len = sizeof (req_exec_ckpt_sectioncreate); + /* + * Send section name and initial data in message + */ + iovecs[1].iov_base = ((char *)req_lib_ckpt_sectioncreate) + sizeof (struct req_lib_ckpt_sectioncreate); + iovecs[1].iov_len = req_lib_ckpt_sectioncreate->header.size - sizeof (struct req_lib_ckpt_sectioncreate); + req_exec_ckpt_sectioncreate.header.size += iovecs[1].iov_len; + + if (iovecs[1].iov_len) { + log_printf (LOG_LEVEL_DEBUG, "CKPT: message_handler_req_lib_ckpt_sectioncreate Section = %s, idLen = %d\n", + iovecs[1].iov_base, + iovecs[1].iov_len); + } + #ifdef DEBUG printf ("LIBRARY SECTIONCREATE string is %s len is %d\n", (unsigned char *)iovecs[1].iov_base, iovecs[1].iov_len); @@ -2467,11 +2805,24 @@ for (i = 0; i < 14;i++) { } printf ("|\n"); #endif - if (iovecs[1].iov_len > 0) { - log_printf (LOG_LEVEL_DEBUG, "IOV_BASE is %p\n", iovecs[1].iov_base); - assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); - } else { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + if (iovecs[1].iov_len > 0) { + log_printf (LOG_LEVEL_DEBUG, "IOV_BASE is %p\n", iovecs[1].iov_base); + assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); + } else { + assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + } + + } + else { + log_printf (LOG_LEVEL_ERROR, "#### CKPT: Could Not Find the Checkpoint to create a section in so Returning Error. ####\n"); + + res_lib_ckpt_sectioncreate.header.size = sizeof (struct res_lib_ckpt_sectioncreate); + res_lib_ckpt_sectioncreate.header.id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE; + res_lib_ckpt_sectioncreate.header.error = SA_AIS_ERR_NOT_EXIST; + + libais_send_response (conn_info, + &res_lib_ckpt_sectioncreate, + sizeof (struct res_lib_ckpt_sectioncreate)); } return (0); @@ -2564,39 +2915,61 @@ static int message_handler_req_lib_ckpt_sectionwrite (struct conn_info *conn_inf struct req_exec_ckpt_sectionwrite req_exec_ckpt_sectionwrite; struct iovec iovecs[2]; struct saCkptCheckpoint *checkpoint; + struct res_lib_ckpt_sectionwrite res_lib_ckpt_sectionwrite; + + log_printf (LOG_LEVEL_DEBUG, "CKPT: Received data from lib with len = %d and ref = 0x%x\n", + req_lib_ckpt_sectionwrite->dataSize, + req_lib_ckpt_sectionwrite->dataOffset); + + log_printf (LOG_LEVEL_DEBUG, "CKPT: Checkpoint section being written to is %s, idLen = %d\n", + ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite), + req_lib_ckpt_sectionwrite->idLen); log_printf (LOG_LEVEL_DEBUG, "Section write from API fd %d\n", conn_info->fd); checkpoint = ckpt_checkpoint_find_global (&req_lib_ckpt_sectionwrite->checkpointName); - /* - * checkpoint opened is writeable mode so send message to cluster - */ - req_exec_ckpt_sectionwrite.header.id = MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE; - req_exec_ckpt_sectionwrite.header.size = sizeof (struct req_exec_ckpt_sectionwrite); + if (checkpoint && (checkpoint->expired == 0)) { + /* + * checkpoint opened is writeable mode so send message to cluster + */ + req_exec_ckpt_sectionwrite.header.id = MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE; + req_exec_ckpt_sectionwrite.header.size = sizeof (struct req_exec_ckpt_sectionwrite); - memcpy (&req_exec_ckpt_sectionwrite.req_lib_ckpt_sectionwrite, - req_lib_ckpt_sectionwrite, - sizeof (struct req_lib_ckpt_sectionwrite)); + memcpy (&req_exec_ckpt_sectionwrite.req_lib_ckpt_sectionwrite, + req_lib_ckpt_sectionwrite, + sizeof (struct req_lib_ckpt_sectionwrite)); + + memcpy (&req_exec_ckpt_sectionwrite.checkpointName, + &req_lib_ckpt_sectionwrite->checkpointName, + sizeof (SaNameT)); + + message_source_set (&req_exec_ckpt_sectionwrite.source, conn_info); + + iovecs[0].iov_base = (char *)&req_exec_ckpt_sectionwrite; + iovecs[0].iov_len = sizeof (req_exec_ckpt_sectionwrite); + /* + * Send section name and data to write in message + */ + iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite); + iovecs[1].iov_len = req_lib_ckpt_sectionwrite->header.size - sizeof (struct req_lib_ckpt_sectionwrite); + req_exec_ckpt_sectionwrite.header.size += iovecs[1].iov_len; + + if (iovecs[1].iov_len > 0) { + assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); + } else { + assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + } + } + else { + log_printf (LOG_LEVEL_ERROR, "#### CKPT: Could Not Find the Checkpoint to write to Returning Error. ####\n"); - memcpy (&req_exec_ckpt_sectionwrite.checkpointName, - &req_lib_ckpt_sectionwrite->checkpointName, - sizeof (SaNameT)); + res_lib_ckpt_sectionwrite.header.size = sizeof (struct res_lib_ckpt_sectionwrite); + res_lib_ckpt_sectionwrite.header.id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONWRITE; + res_lib_ckpt_sectionwrite.header.error = SA_AIS_ERR_NOT_EXIST; - message_source_set (&req_exec_ckpt_sectionwrite.source, conn_info); - - iovecs[0].iov_base = (char *)&req_exec_ckpt_sectionwrite; - iovecs[0].iov_len = sizeof (req_exec_ckpt_sectionwrite); - /* - * Send section name and data to write in message - */ - iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite); - iovecs[1].iov_len = req_lib_ckpt_sectionwrite->header.size - sizeof (struct req_lib_ckpt_sectionwrite); - req_exec_ckpt_sectionwrite.header.size += iovecs[1].iov_len; - - if (iovecs[1].iov_len > 0) { - assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); - } else { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + libais_send_response (conn_info, + &res_lib_ckpt_sectionwrite, + sizeof (struct res_lib_ckpt_sectionwrite)); } return (0); @@ -2608,41 +2981,55 @@ static int message_handler_req_lib_ckpt_sectionoverwrite (struct conn_info *conn struct req_exec_ckpt_sectionoverwrite req_exec_ckpt_sectionoverwrite; struct iovec iovecs[2]; struct saCkptCheckpoint *checkpoint; + struct res_lib_ckpt_sectionoverwrite res_lib_ckpt_sectionoverwrite; log_printf (LOG_LEVEL_DEBUG, "Section overwrite from API fd %d\n", conn_info->fd); checkpoint = ckpt_checkpoint_find_global (&req_lib_ckpt_sectionoverwrite->checkpointName); - /* - * checkpoint opened is writeable mode so send message to cluster - */ - req_exec_ckpt_sectionoverwrite.header.id = MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE; - req_exec_ckpt_sectionoverwrite.header.size = sizeof (struct req_exec_ckpt_sectionoverwrite); + if (checkpoint && (checkpoint->expired == 0)) { + /* + * checkpoint opened is writeable mode so send message to cluster + */ + req_exec_ckpt_sectionoverwrite.header.id = MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE; + req_exec_ckpt_sectionoverwrite.header.size = sizeof (struct req_exec_ckpt_sectionoverwrite); + + memcpy (&req_exec_ckpt_sectionoverwrite.req_lib_ckpt_sectionoverwrite, + req_lib_ckpt_sectionoverwrite, + sizeof (struct req_lib_ckpt_sectionoverwrite)); + + memcpy (&req_exec_ckpt_sectionoverwrite.checkpointName, + &req_lib_ckpt_sectionoverwrite->checkpointName, + sizeof (SaNameT)); - memcpy (&req_exec_ckpt_sectionoverwrite.req_lib_ckpt_sectionoverwrite, - req_lib_ckpt_sectionoverwrite, - sizeof (struct req_lib_ckpt_sectionoverwrite)); - - memcpy (&req_exec_ckpt_sectionoverwrite.checkpointName, - &req_lib_ckpt_sectionoverwrite->checkpointName, - sizeof (SaNameT)); - - message_source_set (&req_exec_ckpt_sectionoverwrite.source, conn_info); - - iovecs[0].iov_base = (char *)&req_exec_ckpt_sectionoverwrite; - iovecs[0].iov_len = sizeof (req_exec_ckpt_sectionoverwrite); - /* - * Send section name and data to overwrite in message - */ - iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionoverwrite) + sizeof (struct req_lib_ckpt_sectionoverwrite); - iovecs[1].iov_len = req_lib_ckpt_sectionoverwrite->header.size - sizeof (struct req_lib_ckpt_sectionoverwrite); - req_exec_ckpt_sectionoverwrite.header.size += iovecs[1].iov_len; - - if (iovecs[1].iov_len > 0) { - assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); - } else { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + message_source_set (&req_exec_ckpt_sectionoverwrite.source, conn_info); + + iovecs[0].iov_base = (char *)&req_exec_ckpt_sectionoverwrite; + iovecs[0].iov_len = sizeof (req_exec_ckpt_sectionoverwrite); + /* + * Send section name and data to overwrite in message + */ + iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionoverwrite) + sizeof (struct req_lib_ckpt_sectionoverwrite); + iovecs[1].iov_len = req_lib_ckpt_sectionoverwrite->header.size - sizeof (struct req_lib_ckpt_sectionoverwrite); + req_exec_ckpt_sectionoverwrite.header.size += iovecs[1].iov_len; + + if (iovecs[1].iov_len > 0) { + assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); + } else { + assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + } } + else { + log_printf (LOG_LEVEL_ERROR, "#### CKPT: Could Not Find the Checkpoint to over write so Returning Error. ####\n"); + res_lib_ckpt_sectionoverwrite.header.size = sizeof (struct res_lib_ckpt_sectionwrite); + res_lib_ckpt_sectionoverwrite.header.id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONOVERWRITE; + res_lib_ckpt_sectionoverwrite.header.error = SA_AIS_ERR_NOT_EXIST; + + libais_send_response (conn_info, + &res_lib_ckpt_sectionoverwrite, + sizeof (struct res_lib_ckpt_sectionoverwrite)); + } + return (0); } @@ -2652,41 +3039,55 @@ static int message_handler_req_lib_ckpt_sectionread (struct conn_info *conn_info struct req_exec_ckpt_sectionread req_exec_ckpt_sectionread; struct iovec iovecs[2]; struct saCkptCheckpoint *checkpoint; + struct res_lib_ckpt_sectionread res_lib_ckpt_sectionread; log_printf (LOG_LEVEL_DEBUG, "Section overwrite from API fd %d\n", conn_info->fd); checkpoint = ckpt_checkpoint_find_global (&req_lib_ckpt_sectionread->checkpointName); + + if (checkpoint && (checkpoint->expired == 0)) { + /* + * checkpoint opened is writeable mode so send message to cluster + */ + req_exec_ckpt_sectionread.header.id = MESSAGE_REQ_EXEC_CKPT_SECTIONREAD; + req_exec_ckpt_sectionread.header.size = sizeof (struct req_exec_ckpt_sectionread); - /* - * checkpoint opened is writeable mode so send message to cluster - */ - req_exec_ckpt_sectionread.header.id = MESSAGE_REQ_EXEC_CKPT_SECTIONREAD; - req_exec_ckpt_sectionread.header.size = sizeof (struct req_exec_ckpt_sectionread); - - memcpy (&req_exec_ckpt_sectionread.req_lib_ckpt_sectionread, - req_lib_ckpt_sectionread, - sizeof (struct req_lib_ckpt_sectionread)); - - memcpy (&req_exec_ckpt_sectionread.checkpointName, - &req_lib_ckpt_sectionread->checkpointName, - sizeof (SaNameT)); - - message_source_set (&req_exec_ckpt_sectionread.source, conn_info); - - iovecs[0].iov_base = (char *)&req_exec_ckpt_sectionread; - iovecs[0].iov_len = sizeof (req_exec_ckpt_sectionread); - /* - * Send section name and data to overwrite in message - */ - iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionread) + sizeof (struct req_lib_ckpt_sectionread); - iovecs[1].iov_len = req_lib_ckpt_sectionread->header.size - sizeof (struct req_lib_ckpt_sectionread); - req_exec_ckpt_sectionread.header.size += iovecs[1].iov_len; - - if (iovecs[1].iov_len > 0) { - assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); - } else { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + memcpy (&req_exec_ckpt_sectionread.req_lib_ckpt_sectionread, + req_lib_ckpt_sectionread, + sizeof (struct req_lib_ckpt_sectionread)); + + memcpy (&req_exec_ckpt_sectionread.checkpointName, + &req_lib_ckpt_sectionread->checkpointName, + sizeof (SaNameT)); + + message_source_set (&req_exec_ckpt_sectionread.source, conn_info); + + iovecs[0].iov_base = (char *)&req_exec_ckpt_sectionread; + iovecs[0].iov_len = sizeof (req_exec_ckpt_sectionread); + /* + * Send section name and data to overwrite in message + */ + iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionread) + sizeof (struct req_lib_ckpt_sectionread); + iovecs[1].iov_len = req_lib_ckpt_sectionread->header.size - sizeof (struct req_lib_ckpt_sectionread); + req_exec_ckpt_sectionread.header.size += iovecs[1].iov_len; + + if (iovecs[1].iov_len > 0) { + assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); + } else { + assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + } } + else { + log_printf (LOG_LEVEL_ERROR, "#### CKPT: Could Not Find the Checkpoint to read so Returning Error. ####\n"); + res_lib_ckpt_sectionread.header.size = sizeof (struct res_lib_ckpt_sectionread); + res_lib_ckpt_sectionread.header.id = MESSAGE_RES_CKPT_CHECKPOINT_SECTIONREAD; + res_lib_ckpt_sectionread.header.error = SA_AIS_ERR_NOT_EXIST; + + libais_send_response (conn_info, + &res_lib_ckpt_sectionread, + sizeof (struct res_lib_ckpt_sectionread)); + } + return (0); } diff --git a/exec/tlist.c b/exec/tlist.c index 05bd4fa5..6c9fe095 100644 --- a/exec/tlist.c +++ b/exec/tlist.c @@ -147,6 +147,17 @@ void timerlist_del (struct timerlist *timerlist, timer_handle timer_handle) timers_inuse--; free (timer); } + +void timerlist_del_data (struct timerlist *timerlist, timer_handle timer_handle) +{ + struct timer *timer = (struct timer *)timer_handle; + + if (timer->data) { + free (timer->data); + } + timerlist_del(timerlist,timer_handle); +} + static void timerlist_pre_dispatch (struct timerlist *timerlist, timer_handle timer_handle) { struct timer *timer = (struct timer *)timer_handle; diff --git a/exec/tlist.h b/exec/tlist.h index 8b88dbf0..759b46b8 100644 --- a/exec/tlist.h +++ b/exec/tlist.h @@ -57,6 +57,8 @@ timer_handle timerlist_add_future (struct timerlist *timerlist, void timerlist_del (struct timerlist *timerlist, timer_handle timer_handle); +void timerlist_del_data (struct timerlist *timerlist, timer_handle timer_handle); + void timerlist_expire (struct timerlist *timerlist); unsigned int timerlist_timeout_msec (struct timerlist *timerlist); diff --git a/test/testckpt.c b/test/testckpt.c index 2a6cb443..f1c742ce 100644 --- a/test/testckpt.c +++ b/test/testckpt.c @@ -316,7 +316,7 @@ printf ("Please wait, testing expiry of checkpoint sections.\n"); 0, &checkpointHandle2); printf ("%s: Opening unlinked checkpoint\n", - get_test_output (error, 7)); + get_test_output (error, SA_AIS_OK)); error = saCkptCheckpointClose (checkpointHandle); printf ("%s: Closing checkpoint\n", @@ -394,7 +394,7 @@ printf ("Please wait, testing expiry of checkpoint sections.\n"); "Initial Data #2", strlen ("Initial Data #2") + 1); printf ("%s: creating section 2 \n", - get_test_output (error, SA_AIS_OK)); + get_test_output (error, SA_AIS_ERR_EXIST)); error = saCkptSectionExpirationTimeSet (checkpointHandle, §ionId2,