From d73d78e8fa8fb0a13790438d64214e7fbd828234 Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Thu, 23 Sep 2004 22:12:13 +0000 Subject: [PATCH] Finalize expiration of sections and checkpoints. (Logical change 1.80) git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@286 fd59a12c-fef9-0310-b244-a6a79926bd2f --- exec/ckpt.c | 179 +++++++++++++++++++++++++++++++++++++++++----------- exec/ckpt.h | 2 + 2 files changed, 144 insertions(+), 37 deletions(-) diff --git a/exec/ckpt.c b/exec/ckpt.c index b7d00947..433f44c2 100644 --- a/exec/ckpt.c +++ b/exec/ckpt.c @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -76,6 +77,8 @@ static int message_handler_req_exec_ckpt_checkpointunlink (void *message, struct static int message_handler_req_exec_ckpt_checkpointretentiondurationset (void *message, struct in_addr source_addr); +static int message_handler_req_exec_ckpt_checkpointretentiondurationexpire (void *message, struct in_addr source_addr); + static int message_handler_req_exec_ckpt_sectioncreate (void *message, struct in_addr source_addr); static int message_handler_req_exec_ckpt_sectiondelete (void *message, struct in_addr source_addr); @@ -268,6 +271,7 @@ static int (*ckpt_checkpoint_aisexec_handler_fns[]) (void *msg, struct in_addr s message_handler_req_exec_ckpt_checkpointclose, message_handler_req_exec_ckpt_checkpointunlink, message_handler_req_exec_ckpt_checkpointretentiondurationset, + message_handler_req_exec_ckpt_checkpointretentiondurationexpire, message_handler_req_exec_ckpt_sectioncreate, message_handler_req_exec_ckpt_sectiondelete, message_handler_req_exec_ckpt_sectionexpirationtimeset, @@ -372,14 +376,21 @@ static struct saCkptCheckpointSection *findCheckpointSection ( return 0; } +void checkpoint_section_release (struct saCkptCheckpointSection *section) +{ + list_del (§ion->list); + free (section->sectionDescriptor.sectionId.id); + free (section->sectionData); + poll_timer_delete (*gmi_poll_handle, section->expiration_timer); + free (section); +} + void checkpoint_release (struct saCkptCheckpoint *checkpoint) { struct list_head *list; struct saCkptCheckpointSection *section; - int *buf = (struct saCkptCheckpoint *)checkpoint; poll_timer_delete (*gmi_poll_handle, checkpoint->retention_timer); - assert (*buf != 0xdeadbeef); /* * Release all checkpoint sections for this checkpoint @@ -390,13 +401,10 @@ void checkpoint_release (struct saCkptCheckpoint *checkpoint) section = list_entry (list, struct saCkptCheckpointSection, list); - free (section->sectionDescriptor.sectionId.id); - free (section->sectionData); list = list->next; - free (section); + checkpoint_section_release (section); } list_del (&checkpoint->list); - *buf = 0xdeadbeef; free (checkpoint); } @@ -404,6 +412,9 @@ int sendCkptCheckpointClose (struct saCkptCheckpoint *checkpoint) { struct req_exec_ckpt_checkpointclose req_exec_ckpt_checkpointclose; struct iovec iovecs[2]; + if (checkpoint->expired == 1) { + return (0); + } req_exec_ckpt_checkpointclose.header.size = sizeof (struct req_exec_ckpt_checkpointclose); req_exec_ckpt_checkpointclose.header.id = MESSAGE_REQ_EXEC_CKPT_CHECKPOINTCLOSE; @@ -523,13 +534,13 @@ static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct i list_add (&ckptCheckpoint->list, &checkpointListHead); ckptCheckpoint->referenceCount = 0; ckptCheckpoint->retention_timer = 0; + ckptCheckpoint->expired = 0; /* * Add in default checkpoint section */ list_init (&ckptCheckpointSection->list); list_add (&ckptCheckpointSection->list, &ckptCheckpoint->checkpointSectionsListHead); - ckptCheckpointSection->sectionDescriptor.expirationTime = 0xFFFFFFFF; //SA_END_TIME; /* * Default section id @@ -537,10 +548,11 @@ static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct i ckptCheckpointSection->sectionDescriptor.sectionId.id = 0; ckptCheckpointSection->sectionDescriptor.sectionId.idLen = 0; ckptCheckpointSection->sectionDescriptor.sectionSize = 0; - ckptCheckpointSection->sectionDescriptor.expirationTime = 0xffffffff; /* SA_END_TIME */ + ckptCheckpointSection->sectionDescriptor.expirationTime = SA_TIME_END; ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID; ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; // current time ckptCheckpointSection->sectionData = 0; + ckptCheckpointSection->expiration_timer = 0; } /* @@ -585,28 +597,49 @@ error_exit: return (0); } +unsigned int abstime_to_msec (SaTimeT time) +{ + struct timeval tv; + unsigned long long curr_time; + unsigned long long msec_time; + + gettimeofday (&tv, NULL); + curr_time = ((((unsigned long long)tv.tv_sec) * ((unsigned long)1000)) + + (((unsigned long long)tv.tv_usec) / ((unsigned long long)1000))); + msec_time = (((unsigned long long)time) / 1000000) - + (unsigned long long)curr_time; + return ((unsigned int)(msec_time)); +} + +void timer_function_section_expire (void *data) +{ + struct saCkptCheckpointSection *section = (struct saCkptCheckpointSection *)data; + if (section->sectionDescriptor.sectionId.id) { + log_printf (LOG_LEVEL_NOTICE, "CKPT: Expiring section %s\n", section->sectionDescriptor.sectionId.id); + } + checkpoint_section_release (section); +} + void timer_function_retention (void *data) { - struct req_exec_ckpt_checkpointunlink req_exec_ckpt_checkpointunlink; - struct iovec iovecs[2]; + struct saCkptCheckpoint *checkpoint = (struct saCkptCheckpoint *)data; + struct req_exec_ckpt_checkpointretentiondurationexpire req_exec_ckpt_checkpointretentiondurationexpire; int result; + struct iovec iovec; - req_exec_ckpt_checkpointunlink.header.size = - sizeof (struct req_exec_ckpt_checkpointunlink); - req_exec_ckpt_checkpointunlink.header.id = MESSAGE_REQ_EXEC_CKPT_CHECKPOINTUNLINK; + checkpoint->retention_timer = 0; + req_exec_ckpt_checkpointretentiondurationexpire.header.size = + sizeof (struct req_exec_ckpt_checkpointretentiondurationexpire); + req_exec_ckpt_checkpointretentiondurationexpire.header.id = MESSAGE_REQ_EXEC_CKPT_CHECKPOINTRETENTIONDURATIONEXPIRE; - req_exec_ckpt_checkpointunlink.source.conn_info = 0; - req_exec_ckpt_checkpointunlink.source.in_addr.s_addr = 0; + memcpy (&req_exec_ckpt_checkpointretentiondurationexpire.checkpointName, + &checkpoint->name, + sizeof (SaNameT)); - printf ("Retention timer expired\n"); - memcpy (&req_exec_ckpt_checkpointunlink.req_lib_ckpt_checkpointunlink.checkpointName, - data, - sizeof (SaNameT)); + iovec.iov_base = (char *)&req_exec_ckpt_checkpointretentiondurationexpire; + iovec.iov_len = sizeof (req_exec_ckpt_checkpointretentiondurationexpire); - iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointunlink; - iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointunlink); - - result = gmi_mcast (&aisexec_groupname, iovecs, 1, GMI_PRIO_MED); + result = gmi_mcast (&aisexec_groupname, &iovec, 1, GMI_PRIO_MED); } extern int message_handler_req_exec_ckpt_checkpointclose (void *message, struct in_addr source_addr) @@ -633,10 +666,10 @@ extern int message_handler_req_exec_ckpt_checkpointclose (void *message, struct checkpoint_release (checkpoint); } else if (checkpoint->referenceCount == 0) { - poll_timer_add (*gmi_poll_handle, - checkpoint->checkpointCreationAttributes.retentionDuration / 100000, - &checkpoint->name, - timer_function_retention, + poll_timer_add (*gmi_poll_handle, + checkpoint->checkpointCreationAttributes.retentionDuration / 1000000, + checkpoint, + timer_function_retention, &checkpoint->retention_timer); } @@ -691,16 +724,69 @@ error_exit: static int message_handler_req_exec_ckpt_checkpointretentiondurationset (void *message, struct in_addr source_addr) { struct req_exec_ckpt_checkpointretentiondurationset *req_exec_ckpt_checkpointretentiondurationset = (struct req_exec_ckpt_checkpointretentiondurationset *)message; + struct res_lib_ckpt_checkpointretentiondurationset res_lib_ckpt_checkpointretentiondurationset; struct saCkptCheckpoint *checkpoint; - log_printf (LOG_LEVEL_DEBUG, "Got EXEC request to set retention duratione checkpoint %p\n", req_exec_ckpt_checkpointretentiondurationset); - checkpoint = findCheckpoint (&req_exec_ckpt_checkpointretentiondurationset->checkpointName); if (checkpoint) { - log_printf (LOG_LEVEL_DEBUG, "setting retention duration\n"); + log_printf (LOG_LEVEL_NOTICE, "CKPT: Setting retention duration for checkpoint %s\n", + getSaNameT (&req_exec_ckpt_checkpointretentiondurationset->checkpointName)); checkpoint->checkpointCreationAttributes.retentionDuration = req_exec_ckpt_checkpointretentiondurationset->retentionDuration; + + if (checkpoint->expired == 0 && checkpoint->referenceCount == 0) { + poll_timer_delete (*gmi_poll_handle, checkpoint->retention_timer); + + poll_timer_add (*gmi_poll_handle, + checkpoint->checkpointCreationAttributes.retentionDuration / 1000000, + checkpoint, + timer_function_retention, + &checkpoint->retention_timer); + } } + /* + * Respond to library if this processor sent the duration set request + */ + if (req_exec_ckpt_checkpointretentiondurationset->source.in_addr.s_addr == this_ip.sin_addr.s_addr) { + res_lib_ckpt_checkpointretentiondurationset.header.size = sizeof (struct res_lib_ckpt_checkpointretentiondurationset); + res_lib_ckpt_checkpointretentiondurationset.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET; + res_lib_ckpt_checkpointretentiondurationset.header.error = SA_OK; + + libais_send_response (req_exec_ckpt_checkpointretentiondurationset->source.conn_info, + &res_lib_ckpt_checkpointretentiondurationset, + sizeof (struct res_lib_ckpt_checkpointretentiondurationset)); + } + return (0); +} + +static int message_handler_req_exec_ckpt_checkpointretentiondurationexpire (void *message, struct in_addr source_addr) +{ + struct req_exec_ckpt_checkpointretentiondurationexpire *req_exec_ckpt_checkpointretentiondurationexpire = (struct req_exec_ckpt_checkpointretentiondurationexpire *)message; + struct req_exec_ckpt_checkpointunlink req_exec_ckpt_checkpointunlink; + struct saCkptCheckpoint *checkpoint; + struct iovec iovecs[2]; + + checkpoint = findCheckpoint (&req_exec_ckpt_checkpointretentiondurationexpire->checkpointName); + if (checkpoint && checkpoint->expired == 0) { + log_printf (LOG_LEVEL_NOTICE, "CKPT: Expiring checkpoint %s\n", getSaNameT (&req_exec_ckpt_checkpointretentiondurationexpire->checkpointName)); + checkpoint->expired = 1; + + req_exec_ckpt_checkpointunlink.header.size = + sizeof (struct req_exec_ckpt_checkpointunlink); + req_exec_ckpt_checkpointunlink.header.id = MESSAGE_REQ_EXEC_CKPT_CHECKPOINTUNLINK; + + req_exec_ckpt_checkpointunlink.source.conn_info = 0; + req_exec_ckpt_checkpointunlink.source.in_addr.s_addr = 0; + + memcpy (&req_exec_ckpt_checkpointunlink.req_lib_ckpt_checkpointunlink.checkpointName, + &req_exec_ckpt_checkpointretentiondurationexpire->checkpointName, + sizeof (SaNameT)); + + iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointunlink; + iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointunlink); + + gmi_mcast (&aisexec_groupname, iovecs, 1, GMI_PRIO_MED); + } return (0); } @@ -775,13 +861,22 @@ static int message_handler_req_exec_ckpt_sectioncreate (void *message, struct in /* * Configure checkpoint section */ - ckptCheckpointSection->sectionDescriptor.expirationTime = req_lib_ckpt_sectioncreate->expirationTime; ckptCheckpointSection->sectionDescriptor.sectionId.id = sectionId; ckptCheckpointSection->sectionDescriptor.sectionId.idLen = req_lib_ckpt_sectioncreate->idLen; ckptCheckpointSection->sectionDescriptor.sectionSize = req_lib_ckpt_sectioncreate->initialDataSize; + ckptCheckpointSection->sectionDescriptor.expirationTime = req_lib_ckpt_sectioncreate->expirationTime; ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID; ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; // TODO current time ckptCheckpointSection->sectionData = initialData; + ckptCheckpointSection->expiration_timer = 0; + + if (req_lib_ckpt_sectioncreate->expirationTime != SA_TIME_END) { + poll_timer_add (*gmi_poll_handle, + abstime_to_msec (ckptCheckpointSection->sectionDescriptor.expirationTime), + ckptCheckpointSection, + timer_function_section_expire, + &ckptCheckpointSection->expiration_timer); + } /* * Add checkpoint section to checkpoint @@ -839,10 +934,7 @@ printf ("section not found\n"); /* * Delete checkpoint section */ - list_del (&ckptCheckpointSection->list); - free (ckptCheckpointSection->sectionDescriptor.sectionId.id); - free (ckptCheckpointSection->sectionData); - free (ckptCheckpointSection); + checkpoint_section_release (ckptCheckpointSection); /* * return result to CKPT library @@ -898,6 +990,17 @@ static int message_handler_req_exec_ckpt_sectionexpirationtimeset (void *message ckptCheckpointSection->sectionDescriptor.expirationTime = req_lib_ckpt_sectionexpirationtimeset->expirationTime; + poll_timer_delete (*gmi_poll_handle, ckptCheckpointSection->expiration_timer); + ckptCheckpointSection->expiration_timer = 0; + + if (req_lib_ckpt_sectionexpirationtimeset->expirationTime != SA_TIME_END) { + poll_timer_add (*gmi_poll_handle, + abstime_to_msec (ckptCheckpointSection->sectionDescriptor.expirationTime), + ckptCheckpointSection, + timer_function_section_expire, + &ckptCheckpointSection->expiration_timer); + } + error_exit: if (req_exec_ckpt_sectionexpirationtimeset->source.in_addr.s_addr == this_ip.sin_addr.s_addr) { res_lib_ckpt_sectionexpirationtimeset.header.size = sizeof (struct res_lib_ckpt_sectionexpirationtimeset); @@ -947,7 +1050,6 @@ printf ("CANT FIND SECTION '%s'\n", */ sizeRequired = req_lib_ckpt_sectionwrite->dataOffset + req_lib_ckpt_sectionwrite->dataSize; if (sizeRequired > ckptCheckpointSection->sectionDescriptor.sectionSize) { -printf ("reallocating data\n"); sectionData = realloc (ckptCheckpointSection->sectionData, sizeRequired); if (sectionData == 0) { error = SA_ERR_NO_MEMORY; @@ -1289,6 +1391,10 @@ static int message_handler_req_lib_ckpt_checkpointretentiondurationset (struct c log_printf (LOG_LEVEL_DEBUG, "DURATION SET FROM API fd %d\n", conn_info); req_exec_ckpt_checkpointretentiondurationset.header.id = MESSAGE_REQ_EXEC_CKPT_CHECKPOINTRETENTIONDURATIONSET; req_exec_ckpt_checkpointretentiondurationset.header.size = sizeof (struct req_exec_ckpt_checkpointretentiondurationset); + + req_exec_ckpt_checkpointretentiondurationset.source.conn_info = conn_info; + req_exec_ckpt_checkpointretentiondurationset.source.in_addr.s_addr = this_ip.sin_addr.s_addr; + memcpy (&req_exec_ckpt_checkpointretentiondurationset.checkpointName, &conn_info->ais_ci.u.libckpt_ci.checkpoint->name, sizeof (SaNameT)); @@ -1455,7 +1561,6 @@ static int message_handler_req_lib_ckpt_sectiondelete (struct conn_info *conn_in iovecs[1].iov_len = req_lib_ckpt_sectiondelete->header.size - sizeof (struct req_lib_ckpt_sectiondelete); if (iovecs[1].iov_len > 0) { - log_printf (LOG_LEVEL_DEBUG, "IOV_BASE is %s\n", iovecs[1].iov_base); gmi_mcast (&aisexec_groupname, iovecs, 2, GMI_PRIO_MED); } else { gmi_mcast (&aisexec_groupname, iovecs, 1, GMI_PRIO_MED); diff --git a/exec/ckpt.h b/exec/ckpt.h index 8b61d75c..3c3be642 100644 --- a/exec/ckpt.h +++ b/exec/ckpt.h @@ -43,6 +43,7 @@ struct saCkptCheckpointSection { struct list_head list; SaCkptSectionDescriptorT sectionDescriptor; void *sectionData; + poll_timer_handle expiration_timer; }; struct saCkptCheckpoint { @@ -53,6 +54,7 @@ struct saCkptCheckpoint { int referenceCount; int unlinked; poll_timer_handle retention_timer; + int expired; }; struct saCkptSectionIteratorEntry {