Make checkpoint service work according to specifications with regards to the

unlink operation.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1139 fd59a12c-fef9-0310-b244-a6a79926bd2f
This commit is contained in:
Steven Dake 2006-07-20 04:50:15 +00:00
parent 24cc0d7f15
commit 24777c184f
3 changed files with 293 additions and 113 deletions

View File

@ -125,6 +125,7 @@ static inline void swab_mar_ckpt_refcnt_t (mar_ckpt_refcnt_t *to_swab)
struct checkpoint {
struct list_head list;
mar_name_t name;
mar_uint32_t ckpt_id;
mar_ckpt_checkpoint_creation_attributes_t checkpoint_creation_attributes;
struct list_head sections_list_head;
int referenceCount;
@ -155,11 +156,10 @@ struct ckpt_pd {
struct ckpt_identifier {
mar_name_t ckpt_name;
mar_uint32_t ckpt_id;
mar_ckpt_section_id_t ckpt_section_id;
};
/* TODO static totempg_recovery_plug_handle ckpt_checkpoint_recovery_plug_handle; */
static int ckpt_exec_init_fn (struct objdb_iface_ver0 *);
static int ckpt_lib_exit_fn (void *conn);
@ -318,17 +318,24 @@ void clean_checkpoint_list(struct list_head* head);
static int recovery_checkpoint_open(
mar_name_t *checkpoint_name,
mar_uint32_t ckpt_id,
mar_ckpt_checkpoint_creation_attributes_t *ckptAttributes,
struct ckpt_refcnt *ref_cnt);
static int recovery_section_create (
mar_ckpt_section_descriptor_t *section_descriptor,
mar_name_t *checkpoint_name,
mar_uint32_t ckpt_id,
char* section_id);
static int recovery_section_write(
int section_id_len, char *section_id, mar_name_t *checkpoint_name,
void *new_data, mar_uint32_t data_offset, mar_uint32_t data_size);
int section_id_len,
char *section_id,
mar_name_t *checkpoint_name,
mar_uint32_t ckpt_id,
void *new_data,
mar_uint32_t data_offset,
mar_uint32_t data_size);
static int process_localhost_transition = 0;
@ -338,9 +345,11 @@ DECLARE_LIST_INIT(checkpoint_iteration_list_head);
DECLARE_LIST_INIT(checkpoint_recovery_list_head);
static mar_uint32_t global_ckpt_id = 0;
struct checkpoint_cleanup {
struct list_head list;
struct checkpoint checkpoint;
struct list_head list;
struct checkpoint checkpoint;
};
typedef enum {
@ -597,6 +606,7 @@ struct req_exec_ckpt_checkpointopen {
mar_req_header_t header __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_ckpt_checkpoint_creation_attributes_t checkpoint_creation_attributes __attribute__((aligned(8)));
mar_uint32_t checkpoint_creation_attributes_set __attribute__((aligned(8)));
mar_ckpt_checkpoint_open_flags_t checkpoint_open_flags __attribute__((aligned(8)));
@ -610,18 +620,21 @@ struct req_exec_ckpt_checkpointclose {
mar_req_header_t header __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
};
struct req_exec_ckpt_checkpointretentiondurationset {
mar_req_header_t header __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_time_t retention_duration __attribute__((aligned(8)));
};
struct req_exec_ckpt_checkpointretentiondurationexpire {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
};
struct req_exec_ckpt_checkpointunlink {
@ -634,6 +647,7 @@ struct req_exec_ckpt_sectioncreate {
mar_req_header_t header __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
mar_time_t expiration_time __attribute__((aligned(8)));
mar_uint32_t initial_data_size __attribute__((aligned(8)));
@ -643,6 +657,7 @@ struct req_exec_ckpt_sectiondelete {
mar_req_header_t header __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
};
@ -650,6 +665,7 @@ struct req_exec_ckpt_sectionexpirationtimeset {
mar_req_header_t header __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
mar_time_t expiration_time __attribute__((aligned(8)));
};
@ -658,6 +674,7 @@ struct req_exec_ckpt_sectionwrite {
mar_req_header_t header __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
mar_offset_t data_offset __attribute__((aligned(8)));
mar_offset_t data_size __attribute__((aligned(8)));
@ -667,6 +684,7 @@ struct req_exec_ckpt_sectionoverwrite {
mar_req_header_t header __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
mar_offset_t data_size __attribute__((aligned(8)));
};
@ -675,6 +693,7 @@ struct req_exec_ckpt_sectionread {
mar_req_header_t header __attribute__((aligned(8)));
mar_message_source_t source __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
mar_offset_t data_offset __attribute__((aligned(8)));
mar_offset_t data_size __attribute__((aligned(8)));
@ -684,6 +703,7 @@ struct req_exec_ckpt_synchronize_state {
mar_req_header_t header __attribute__((aligned(8)));
struct memb_ring_id previous_ring_id __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_ckpt_checkpoint_creation_attributes_t checkpoint_creation_attributes __attribute__((aligned(8)));
mar_ckpt_section_descriptor_t section_descriptor __attribute__((aligned(8)));
mar_uint32_t nodeid __attribute__((aligned(8)));
@ -694,6 +714,7 @@ struct req_exec_ckpt_synchronize_section {
mar_req_header_t header __attribute__((aligned(8)));
struct memb_ring_id previous_ring_id __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
mar_offset_t data_offset __attribute__((aligned(8)));
mar_offset_t data_size __attribute__((aligned(8)));
@ -1387,7 +1408,9 @@ static void ckpt_confchg_fn (
}
}
static struct checkpoint *checkpoint_find (mar_name_t *name)
static struct checkpoint *checkpoint_find (
mar_name_t *name,
mar_uint32_t ckpt_id)
{
struct list_head *checkpoint_list;
struct checkpoint *checkpoint;
@ -1399,13 +1422,34 @@ static struct checkpoint *checkpoint_find (mar_name_t *name)
checkpoint = list_entry (checkpoint_list,
struct checkpoint, list);
if (mar_name_match (name, &checkpoint->name)) {
if (mar_name_match (name, &checkpoint->name) &&
ckpt_id == checkpoint->ckpt_id) {
return (checkpoint);
}
}
return (0);
}
static struct checkpoint *checkpoint_find_linked (
mar_name_t *name)
{
struct list_head *checkpoint_list;
struct checkpoint *checkpoint;
for (checkpoint_list = checkpoint_list_head.next;
checkpoint_list != &checkpoint_list_head;
checkpoint_list = checkpoint_list->next) {
checkpoint = list_entry (checkpoint_list,
struct checkpoint, list);
if (mar_name_match (name, &checkpoint->name) &&
checkpoint->unlinked == 0) {
return (checkpoint);
}
}
return (0);
}
static void ckpt_checkpoint_remove_cleanup (
void *conn,
struct checkpoint *checkpoint)
@ -1420,7 +1464,7 @@ static void ckpt_checkpoint_remove_cleanup (
checkpoint_cleanup = list_entry (list, struct checkpoint_cleanup, list);
if (mar_name_match (&checkpoint_cleanup->checkpoint.name, &checkpoint->name)
|| (checkpoint_cleanup->checkpoint.name.length == 0)) {
|| (checkpoint_cleanup->checkpoint.name.length == 0)) {
list_del (&checkpoint_cleanup->list);
free (checkpoint_cleanup);
return;
@ -1554,6 +1598,7 @@ int ckpt_checkpoint_close (struct checkpoint *checkpoint) {
memcpy (&req_exec_ckpt_checkpointclose.checkpoint_name,
&checkpoint->name, sizeof (mar_name_t));
req_exec_ckpt_checkpointclose.ckpt_id = checkpoint->ckpt_id;
memset (&req_exec_ckpt_checkpointclose.source, 0,
sizeof (mar_message_source_t));
@ -1592,6 +1637,7 @@ static void exec_ckpt_checkpointopen_endian_convert (void *msg)
swab_mar_req_header_t (&req_exec_ckpt_checkpointopen->header);
swab_mar_message_source_t (&req_exec_ckpt_checkpointopen->source);
swab_mar_name_t (&req_exec_ckpt_checkpointopen->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_checkpointopen->ckpt_id);
swab_mar_ckpt_checkpoint_creation_attributes_t (&req_exec_ckpt_checkpointopen->checkpoint_creation_attributes);
swab_mar_uint32_t (&req_exec_ckpt_checkpointopen->checkpoint_creation_attributes_set);
swab_mar_ckpt_checkpoint_open_flags_t (&req_exec_ckpt_checkpointopen->checkpoint_open_flags);
@ -1608,6 +1654,7 @@ static void exec_ckpt_checkpointclose_endian_convert (void *msg)
swab_mar_req_header_t (&req_exec_ckpt_checkpointclose->header);
swab_mar_message_source_t (&req_exec_ckpt_checkpointclose->source);
swab_mar_name_t (&req_exec_ckpt_checkpointclose->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_checkpointclose->ckpt_id);
}
static void exec_ckpt_checkpointunlink_endian_convert (void *msg)
{
@ -1625,6 +1672,7 @@ static void exec_ckpt_checkpointretentiondurationset_endian_convert (void *msg)
swab_mar_req_header_t (&req_exec_ckpt_checkpointretentiondurationset->header);
swab_mar_message_source_t (&req_exec_ckpt_checkpointretentiondurationset->source);
swab_mar_name_t (&req_exec_ckpt_checkpointretentiondurationset->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_checkpointretentiondurationset->ckpt_id);
swab_mar_time_t (&req_exec_ckpt_checkpointretentiondurationset->retention_duration);
}
@ -1634,6 +1682,7 @@ static void exec_ckpt_checkpointretentiondurationexpire_endian_convert (void *ms
swab_mar_req_header_t (&req_exec_ckpt_checkpointretentiondurationexpire->header);
swab_mar_name_t (&req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_checkpointretentiondurationexpire->ckpt_id);
}
static void exec_ckpt_sectioncreate_endian_convert (void *msg)
@ -1643,6 +1692,7 @@ static void exec_ckpt_sectioncreate_endian_convert (void *msg)
swab_mar_req_header_t (&req_exec_ckpt_sectioncreate->header);
swab_mar_message_source_t (&req_exec_ckpt_sectioncreate->source);
swab_mar_name_t (&req_exec_ckpt_sectioncreate->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_sectioncreate->ckpt_id);
swab_mar_uint32_t (&req_exec_ckpt_sectioncreate->id_len);
swab_mar_time_t (&req_exec_ckpt_sectioncreate->expiration_time);
swab_mar_uint32_t (&req_exec_ckpt_sectioncreate->initial_data_size);
@ -1655,6 +1705,7 @@ static void exec_ckpt_sectiondelete_endian_convert (void *msg)
swab_mar_req_header_t (&req_exec_ckpt_sectiondelete->header);
swab_mar_message_source_t (&req_exec_ckpt_sectiondelete->source);
swab_mar_name_t (&req_exec_ckpt_sectiondelete->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_sectiondelete->ckpt_id);
swab_mar_uint32_t (&req_exec_ckpt_sectiondelete->id_len);
}
@ -1665,6 +1716,7 @@ static void exec_ckpt_sectrionexpirationtimeset_endian_convert (void *msg)
swab_mar_req_header_t (&req_exec_ckpt_sectionexpirationtimeset->header);
swab_mar_message_source_t (&req_exec_ckpt_sectionexpirationtimeset->source);
swab_mar_name_t (&req_exec_ckpt_sectionexpirationtimeset->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_sectionexpirationtimeset->ckpt_id);
swab_mar_uint32_t (&req_exec_ckpt_sectionexpirationtimeset->id_len);
swab_mar_time_t (&req_exec_ckpt_sectionexpirationtimeset->expiration_time);
}
@ -1676,6 +1728,7 @@ static void exec_ckpt_sectionwrite_endian_convert (void *msg)
swab_mar_req_header_t (&req_exec_ckpt_sectionwrite->header);
swab_mar_message_source_t (&req_exec_ckpt_sectionwrite->source);
swab_mar_name_t (&req_exec_ckpt_sectionwrite->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_sectionwrite->ckpt_id);
swab_mar_uint32_t (&req_exec_ckpt_sectionwrite->id_len);
swab_mar_offset_t (&req_exec_ckpt_sectionwrite->data_size);
}
@ -1687,6 +1740,7 @@ static void exec_ckpt_sectionoverwrite_endian_convert (void *msg)
swab_mar_req_header_t (&req_exec_ckpt_sectionoverwrite->header);
swab_mar_message_source_t (&req_exec_ckpt_sectionoverwrite->source);
swab_mar_name_t (&req_exec_ckpt_sectionoverwrite->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_sectionoverwrite->ckpt_id);
swab_mar_uint32_t (&req_exec_ckpt_sectionoverwrite->id_len);
swab_mar_offset_t (&req_exec_ckpt_sectionoverwrite->data_size);
}
@ -1698,6 +1752,7 @@ static void exec_ckpt_sectionread_endian_convert (void *msg)
swab_mar_req_header_t (&req_exec_ckpt_sectionread->header);
swab_mar_message_source_t (&req_exec_ckpt_sectionread->source);
swab_mar_name_t (&req_exec_ckpt_sectionread->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_sectionread->ckpt_id);
swab_mar_uint32_t (&req_exec_ckpt_sectionread->id_len);
swab_mar_offset_t (&req_exec_ckpt_sectionread->data_offset);
swab_mar_offset_t (&req_exec_ckpt_sectionread->data_size);
@ -1711,6 +1766,7 @@ static void exec_ckpt_synchronize_state_endian_convert (void *msg)
swab_mar_req_header_t (&req_exec_ckpt_synchronize_state->header);
// swab_mar_memb_ring_id_t (&req_exec_ckpt_synchronize_state->memb_ring_id);
swab_mar_name_t (&req_exec_ckpt_synchronize_state->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_synchronize_state->ckpt_id);
swab_mar_ckpt_checkpoint_creation_attributes_t (&req_exec_ckpt_synchronize_state->checkpoint_creation_attributes);
swab_mar_ckpt_section_descriptor_t (&req_exec_ckpt_synchronize_state->section_descriptor);
swab_mar_uint32_t (&req_exec_ckpt_synchronize_state->nodeid);
@ -1725,6 +1781,7 @@ static void exec_ckpt_synchronize_section_endian_convert (void *msg)
swab_mar_req_header_t (&req_exec_ckpt_synchronize_section->header);
// swab_mar_memb_ring_id_t (&req_exec_ckpt_synchronize_section->memb_ring_id);
swab_mar_name_t (&req_exec_ckpt_synchronize_section->checkpoint_name);
swab_mar_uint32_t (&req_exec_ckpt_synchronize_section->ckpt_id);
swab_mar_uint32_t (&req_exec_ckpt_synchronize_section->id_len);
swab_mar_offset_t (&req_exec_ckpt_synchronize_section->data_offset);
swab_mar_offset_t (&req_exec_ckpt_synchronize_section->data_size);
@ -1763,7 +1820,7 @@ static void message_handler_req_exec_ckpt_checkpointopen (
}
}
checkpoint = checkpoint_find (&req_exec_ckpt_checkpointopen->checkpoint_name);
checkpoint = checkpoint_find_linked (&req_exec_ckpt_checkpointopen->checkpoint_name);
/*
* If checkpoint doesn't exist, create one
@ -1779,13 +1836,6 @@ static void message_handler_req_exec_ckpt_checkpointopen (
goto error_exit;
}
checkpoint_section = malloc (sizeof (struct checkpoint_section));
if (checkpoint_section == 0) {
free (checkpoint);
error = SA_AIS_ERR_NO_MEMORY;
goto error_exit;
}
memcpy (&checkpoint->name,
&req_exec_ckpt_checkpointopen->checkpoint_name,
sizeof (mar_name_t));
@ -1796,10 +1846,11 @@ static void message_handler_req_exec_ckpt_checkpointopen (
list_init (&checkpoint->list);
list_init (&checkpoint->sections_list_head);
list_add (&checkpoint->list, &checkpoint_list_head);
checkpoint->referenceCount = 0;
checkpoint->referenceCount = 1;
checkpoint->retention_timer = 0;
checkpoint->expired = 0;
checkpoint->sectionCount = 0;
checkpoint->ckpt_id = global_ckpt_id++;
if ((checkpoint->checkpoint_creation_attributes.creation_flags & (SA_CKPT_WR_ACTIVE_REPLICA | SA_CKPT_WR_ACTIVE_REPLICA_WEAK)) &&
(checkpoint->checkpoint_creation_attributes.creation_flags & SA_CKPT_CHECKPOINT_COLLOCATED) == 0) {
@ -1814,26 +1865,32 @@ static void message_handler_req_exec_ckpt_checkpointopen (
initialize_ckpt_refcnt_array(checkpoint->ckpt_refcnt);
/*
* Add in default checkpoint section
* Create default section id if max_sections is 1
*/
list_init (&checkpoint_section->list);
list_add (&checkpoint_section->list, &checkpoint->sections_list_head);
if (checkpoint->checkpoint_creation_attributes.max_sections == 1) {
/*
* Add in default checkpoint section
*/
checkpoint_section = malloc (sizeof (struct checkpoint_section));
if (checkpoint_section == 0) {
free (checkpoint);
error = SA_AIS_ERR_NO_MEMORY;
goto error_exit;
}
list_init (&checkpoint_section->list);
list_add (&checkpoint_section->list, &checkpoint->sections_list_head);
/*
* Default section id
*/
checkpoint_section->section_descriptor.section_id.id = 0;
checkpoint_section->section_descriptor.section_id.id_len = 0;
checkpoint_section->section_descriptor.expiration_time = SA_TIME_END;
checkpoint_section->section_descriptor.section_state = SA_CKPT_SECTION_VALID;
checkpoint_section->section_descriptor.last_update = 0; /*current time*/
checkpoint_section->section_descriptor.section_size = strlen("Factory installed data\0")+1;
checkpoint_section->section_data = malloc(strlen("Factory installed data\0")+1);
assert(checkpoint_section->section_data);
memcpy(checkpoint_section->section_data, "Factory installed data\0", strlen("Factory installed data\0")+1);
checkpoint_section->expiration_timer = 0;
checkpoint->referenceCount += 1;
checkpoint_section->section_descriptor.section_id.id = 0;
checkpoint_section->section_descriptor.section_id.id_len = 0;
checkpoint_section->section_descriptor.expiration_time = SA_TIME_END;
checkpoint_section->section_descriptor.section_state = SA_CKPT_SECTION_VALID;
checkpoint_section->section_descriptor.last_update = 0; /*current time*/
checkpoint_section->section_descriptor.section_size = 0;
checkpoint_section->section_data = NULL;
checkpoint_section->expiration_timer = 0;
}
} else {
if (req_exec_ckpt_checkpointopen->checkpoint_creation_attributes_set &&
memcmp (&checkpoint->checkpoint_creation_attributes,
@ -1845,13 +1902,7 @@ static void message_handler_req_exec_ckpt_checkpointopen (
}
}
/*
* If the checkpoint has been unlinked, it is an invalid name
*/
if (checkpoint->unlinked) {
error = SA_AIS_ERR_NOT_EXIST; /* TODO this is wrong */
goto error_exit;
}
assert (checkpoint->unlinked == 0);
/*
* Setup connection information and mark checkpoint as referenced
@ -1864,7 +1915,6 @@ static void message_handler_req_exec_ckpt_checkpointopen (
* sent out later as a part of the sync process.
*
*/
proc_index = processor_index_find(nodeid,checkpoint->ckpt_refcnt);
if (proc_index == -1) {/* Could not find, lets set the processor to an index.*/
proc_index = processor_index_set(nodeid,checkpoint->ckpt_refcnt);
@ -1901,6 +1951,9 @@ error_exit:
res_lib_ckpt_checkpointopenasync.header.error = error;
res_lib_ckpt_checkpointopenasync.checkpoint_handle = req_exec_ckpt_checkpointopen->checkpoint_handle;
res_lib_ckpt_checkpointopenasync.invocation = req_exec_ckpt_checkpointopen->invocation;
if (error == SA_AIS_OK) {
res_lib_ckpt_checkpointopenasync.ckpt_id = checkpoint->ckpt_id;
}
openais_conn_send_response (
req_exec_ckpt_checkpointopen->source.conn,
@ -1916,6 +1969,9 @@ error_exit:
*/
res_lib_ckpt_checkpointopen.header.size = sizeof (struct res_lib_ckpt_checkpointopen);
res_lib_ckpt_checkpointopen.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN;
if (error == SA_AIS_OK) {
res_lib_ckpt_checkpointopen.ckpt_id = checkpoint->ckpt_id;
}
res_lib_ckpt_checkpointopen.header.error = error;
openais_conn_send_response (
@ -1943,8 +1999,9 @@ error_exit:
}
}
static int recovery_checkpoint_open(
static int recovery_checkpoint_open (
mar_name_t *checkpoint_name,
mar_uint32_t ckpt_id,
mar_ckpt_checkpoint_creation_attributes_t *ckptAttributes,
struct ckpt_refcnt *ref_cnt)
{
@ -1963,7 +2020,7 @@ static int recovery_checkpoint_open(
}
checkpoint = checkpoint_find (checkpoint_name);
checkpoint = checkpoint_find (checkpoint_name, ckpt_id);
/*
* If checkpoint doesn't exist, create one
@ -1989,6 +2046,10 @@ static int recovery_checkpoint_open(
memcpy (&checkpoint->checkpoint_creation_attributes,
ckptAttributes,
sizeof (mar_ckpt_checkpoint_creation_attributes_t));
checkpoint->ckpt_id = ckpt_id;
/*
* TODO this is wrong - worry about unlinked value of checkpoint sync
*/
checkpoint->unlinked = 0;
list_init (&checkpoint->list);
list_init (&checkpoint->sections_list_head);
@ -2036,14 +2097,6 @@ static int recovery_checkpoint_open(
log_printf (LOG_LEVEL_DEBUG, "recovery CHECKPOINT reopened is %p\n", checkpoint);
}
/*
* If the checkpoint has been unlinked, it is an invalid name
*/
if (checkpoint->unlinked) {
error = SA_AIS_ERR_BAD_OPERATION; /* Is this the correct return ? */
goto error_exit;
}
/*CHECK to see if there are any existing ckpts*/
if ((checkpoint->ckpt_refcnt) && (ckpt_refcnt_total(checkpoint->ckpt_refcnt) > 0)) {
log_printf (LOG_LEVEL_DEBUG,"calling merge_ckpt_refcnts\n");
@ -2061,7 +2114,9 @@ static int recovery_checkpoint_open(
*/
checkpoint->referenceCount = ckpt_refcnt_total(ref_cnt) + 1; /*defect 1192*/
log_printf (LOG_LEVEL_DEBUG, "OPEN checkpoint->referenceCount %d\n",checkpoint->referenceCount);
memcpy(checkpoint->ckpt_refcnt,ref_cnt,sizeof(struct ckpt_refcnt)*PROCESSOR_COUNT_MAX);
memcpy (checkpoint->ckpt_refcnt,
ref_cnt,
sizeof(struct ckpt_refcnt) * PROCESSOR_COUNT_MAX);
/*
* Reset retention duration since this checkpoint was just opened
@ -2099,7 +2154,9 @@ static void message_handler_req_exec_ckpt_synchronize_state (
marshall_from_mar_ckpt_refcnt_t (&local_ckpt_refcnt[i],
&req_exec_ckpt_sync_state->ckpt_refcnt[i]);
}
retcode = recovery_checkpoint_open(&req_exec_ckpt_sync_state->checkpoint_name,
retcode = recovery_checkpoint_open (
&req_exec_ckpt_sync_state->checkpoint_name,
req_exec_ckpt_sync_state->ckpt_id,
&req_exec_ckpt_sync_state->checkpoint_creation_attributes,
local_ckpt_refcnt);
if (retcode != SA_AIS_OK) {
@ -2107,8 +2164,10 @@ static void message_handler_req_exec_ckpt_synchronize_state (
log_printf(LOG_LEVEL_DEBUG, "recovery_checkpoint_open returned %d\n",retcode);
}
retcode = recovery_section_create (&req_exec_ckpt_sync_state->section_descriptor,
retcode = recovery_section_create (
&req_exec_ckpt_sync_state->section_descriptor,
&req_exec_ckpt_sync_state->checkpoint_name,
req_exec_ckpt_sync_state->ckpt_id,
(char*)req_exec_ckpt_sync_state
+ sizeof (struct req_exec_ckpt_synchronize_state));
if (retcode != SA_AIS_OK) {
@ -2133,13 +2192,16 @@ static void message_handler_req_exec_ckpt_synchronize_section (
log_printf(LOG_LEVEL_DEBUG, "message_handler_req_exec_ckpt_synchronize_section ignoring ...\n");
return;
}
/*
* Write the contents of the section to the checkpoint section.
*/
retcode = recovery_section_write(req_exec_ckpt_sync_section->id_len,
retcode = recovery_section_write(
req_exec_ckpt_sync_section->id_len,
(char*)req_exec_ckpt_sync_section +
sizeof (struct req_exec_ckpt_synchronize_section),
&req_exec_ckpt_sync_section->checkpoint_name,
req_exec_ckpt_sync_section->ckpt_id,
(char*)req_exec_ckpt_sync_section
+ sizeof (struct req_exec_ckpt_synchronize_section)
+ req_exec_ckpt_sync_section->id_len,
@ -2163,6 +2225,7 @@ unsigned int abstime_to_msec (mar_time_t time)
(((unsigned long long)tv.tv_usec) / ((unsigned long long)1000)));
msec_time = (((unsigned long long)time) / 1000000) -
(unsigned long long)curr_time;
return ((unsigned int)(msec_time));
}
@ -2175,7 +2238,7 @@ void timer_function_section_expire (void *data)
ckpt_id = (struct ckpt_identifier *)data;
log_printf (LOG_LEVEL_DEBUG, "timer_function_section_expire data = 0x%x \n",data);
if (ckpt_id->ckpt_section_id.id_len && ckpt_id->ckpt_section_id.id) {
log_printf (LOG_LEVEL_DEBUG, "Attempting to Expire section %s in ckpt %s\n",
log_printf (LOG_LEVEL_DEBUG, "Attempting to expire section %s in ckpt %s\n",
ckpt_id->ckpt_section_id.id,
(char *)&ckpt_id->ckpt_name.value);
}
@ -2184,7 +2247,7 @@ void timer_function_section_expire (void *data)
goto free_mem;
}
checkpoint = checkpoint_find (&ckpt_id->ckpt_name);
checkpoint = checkpoint_find (&ckpt_id->ckpt_name, ckpt_id->ckpt_id);
if (checkpoint == 0) {
log_printf (LOG_LEVEL_ERROR, "timer_function_section_expire could not find ckpt %s\n",
(char *)&ckpt_id->ckpt_name.value);
@ -2192,8 +2255,8 @@ void timer_function_section_expire (void *data)
}
checkpoint_section = checkpoint_section_find (checkpoint,
(char *)ckpt_id->ckpt_section_id.id,
(int)ckpt_id->ckpt_section_id.id_len);
(char *)ckpt_id->ckpt_section_id.id,
(int)ckpt_id->ckpt_section_id.id_len);
if (checkpoint_section == 0) {
log_printf (LOG_LEVEL_ERROR, "timer_function_section_expire could not find section %s in ckpt %s\n",
ckpt_id->ckpt_section_id.id,
@ -2207,8 +2270,9 @@ void timer_function_section_expire (void *data)
checkpoint->sectionCount -= 1;
/*
* defect id 1112 "memory leak in checkpoint service" Dont try to delete the timer as the timer
* mechanism takes care of that. Just delete the data after this call
* defect id 1112 "memory leak in checkpoint service" Dont try
* to delete the timer as the timer mechanism takes care of that.
* Just delete the data after this call
*/
checkpoint_section_and_associate_timer_cleanup (checkpoint_section, 0);
free_mem :
@ -2232,6 +2296,8 @@ void timer_function_retention (void *data)
memcpy (&req_exec_ckpt_checkpointretentiondurationexpire.checkpoint_name,
&checkpoint->name,
sizeof (mar_name_t));
req_exec_ckpt_checkpointretentiondurationexpire.ckpt_id =
checkpoint->ckpt_id;
iovec.iov_base = (char *)&req_exec_ckpt_checkpointretentiondurationexpire;
iovec.iov_len = sizeof (req_exec_ckpt_checkpointretentiondurationexpire);
@ -2250,9 +2316,12 @@ static void message_handler_req_exec_ckpt_checkpointclose (
int proc_index;
int release_checkpoint = 0;
log_printf (LOG_LEVEL_DEBUG, "Got EXEC request to close checkpoint %s\n", get_mar_name_t (&req_exec_ckpt_checkpointclose->checkpoint_name));
log_printf (LOG_LEVEL_DEBUG, "Got EXEC request to close checkpoint %s\n",
get_mar_name_t (&req_exec_ckpt_checkpointclose->checkpoint_name));
checkpoint = checkpoint_find (&req_exec_ckpt_checkpointclose->checkpoint_name);
checkpoint = checkpoint_find (
&req_exec_ckpt_checkpointclose->checkpoint_name,
req_exec_ckpt_checkpointclose->ckpt_id);
if (checkpoint == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
@ -2271,8 +2340,8 @@ static void message_handler_req_exec_ckpt_checkpointclose (
}
else {
log_printf (LOG_LEVEL_ERROR,
"Could Not find Processor Info %p info.\n",
checkpoint);
"Could Not find Processor Info %p info.\n",
checkpoint);
}
assert (checkpoint->referenceCount > 0); /*defect 1192*/
log_printf (LOG_LEVEL_DEBUG, "disconnect called, new CKPT ref count is %d\n",
@ -2281,11 +2350,11 @@ static void message_handler_req_exec_ckpt_checkpointclose (
/*
* If checkpoint has been unlinked and this is the last reference, delete it
*/
if (checkpoint->unlinked && checkpoint->referenceCount == 1 ) { /*defect 1192*/
if (checkpoint->unlinked && checkpoint->referenceCount == 1) { /*defect 1192*/
log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");
release_checkpoint = 1;
} else
if (checkpoint->referenceCount == 1 ) { /*defect 1192*/
if (checkpoint->referenceCount == 1) { /*defect 1192*/
openais_timer_add (
checkpoint->checkpoint_creation_attributes.retention_duration / 1000000,
checkpoint,
@ -2294,7 +2363,9 @@ static void message_handler_req_exec_ckpt_checkpointclose (
}
error_exit:
/*Remove the checkpoint from my connections checkpoint list*/
/*
* Remove the checkpoint from my connections checkpoint list
*/
if (message_source_is_local(&req_exec_ckpt_checkpointclose->source)) {
res_lib_ckpt_checkpointclose.header.size = sizeof (struct res_lib_ckpt_checkpointclose);
@ -2328,20 +2399,20 @@ static void message_handler_req_exec_ckpt_checkpointunlink (
SaAisErrorT error = SA_AIS_OK;
log_printf (LOG_LEVEL_DEBUG, "Got EXEC request to unlink checkpoint %p\n", req_exec_ckpt_checkpointunlink);
checkpoint = checkpoint_find (&req_exec_ckpt_checkpointunlink->checkpoint_name);
checkpoint = checkpoint_find_linked (
&req_exec_ckpt_checkpointunlink->checkpoint_name);
if (checkpoint == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
if (checkpoint->unlinked) {
error = SA_AIS_ERR_INVALID_PARAM;
goto error_exit;
}
assert (checkpoint->unlinked == 0);
checkpoint->unlinked = 1;
/*
* Immediately delete entry if reference count is zero
*/
if (checkpoint->referenceCount == 1) { /*defect 1192*/
if (checkpoint->referenceCount == 1) {
/*
* Remove retention timer since this checkpoint was unlinked and is no
* longer referenced
@ -2373,7 +2444,9 @@ static void message_handler_req_exec_ckpt_checkpointretentiondurationset (
struct checkpoint *checkpoint;
SaAisErrorT error = SA_AIS_ERR_BAD_OPERATION;
checkpoint = checkpoint_find (&req_exec_ckpt_checkpointretentiondurationset->checkpoint_name);
checkpoint = checkpoint_find (
&req_exec_ckpt_checkpointretentiondurationset->checkpoint_name,
req_exec_ckpt_checkpointretentiondurationset->ckpt_id);
if (checkpoint) {
log_printf (LOG_LEVEL_DEBUG, "Setting retention duration for checkpoint %s\n",
get_mar_name_t (&req_exec_ckpt_checkpointretentiondurationset->checkpoint_name));
@ -2418,9 +2491,12 @@ static void message_handler_req_exec_ckpt_checkpointretentiondurationexpire (
struct checkpoint *checkpoint;
struct iovec iovec;
checkpoint = checkpoint_find (&req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name);
if (checkpoint && (checkpoint->expired == 0) && (checkpoint->referenceCount == 1)) { /*defect 1192*/
log_printf (LOG_LEVEL_NOTICE, "Expiring checkpoint %s\n", get_mar_name_t (&req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name));
checkpoint = checkpoint_find (
&req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name,
req_exec_ckpt_checkpointretentiondurationexpire->ckpt_id);
if (checkpoint && (checkpoint->expired == 0) && (checkpoint->referenceCount == 1)) {
log_printf (LOG_LEVEL_DEBUG, "Expiring checkpoint %s\n",
get_mar_name_t (&req_exec_ckpt_checkpointretentiondurationexpire->checkpoint_name));
checkpoint->expired = 1;
req_exec_ckpt_checkpointunlink.header.size =
@ -2446,12 +2522,13 @@ static void message_handler_req_exec_ckpt_checkpointretentiondurationexpire (
static int recovery_section_create (
mar_ckpt_section_descriptor_t *section_descriptor,
mar_name_t *checkpoint_name,
mar_uint32_t ckpt_id,
char *section_id)
{
struct checkpoint *checkpoint;
struct checkpoint_section *checkpoint_section;
void *initial_data;
struct ckpt_identifier *ckpt_id = 0;
struct ckpt_identifier *ckpt_identifier = 0;
SaAisErrorT error = SA_AIS_OK;
void *section_id_new;
@ -2463,7 +2540,7 @@ static int recovery_section_create (
&checkpoint_name->value);
}
checkpoint = checkpoint_find (checkpoint_name);
checkpoint = checkpoint_find (checkpoint_name, ckpt_id);
if (checkpoint == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
@ -2561,19 +2638,20 @@ static int recovery_section_create (
checkpoint_section->section_descriptor.section_id.id = section_id_new;
if (section_descriptor->expiration_time != SA_TIME_END) {
ckpt_id = malloc (sizeof(struct ckpt_identifier));
assert(ckpt_id);
memcpy(&ckpt_id->ckpt_name, checkpoint_name,
ckpt_identifier = malloc (sizeof(struct ckpt_identifier));
assert(ckpt_identifier);
memcpy(&ckpt_identifier->ckpt_name, checkpoint_name,
sizeof(mar_name_t));
memcpy(&ckpt_id->ckpt_section_id,
ckpt_identifier->ckpt_id = ckpt_id;
memcpy(&ckpt_identifier->ckpt_section_id,
&checkpoint_section->section_descriptor.section_id,
sizeof(mar_ckpt_section_id_t));
log_printf (LOG_LEVEL_DEBUG, "recovery_section_create Enqueuing Timer to Expire section %s in ckpt %s\n",
ckpt_id->ckpt_section_id.id,
(char *)&ckpt_id->ckpt_name.value);
ckpt_identifier->ckpt_section_id.id,
(char *)&ckpt_identifier->ckpt_name.value);
openais_timer_add (
abstime_to_msec (checkpoint_section->section_descriptor.expiration_time),
ckpt_id,
ckpt_identifier,
timer_function_section_expire,
&checkpoint_section->expiration_timer);
log_printf (LOG_LEVEL_DEBUG, "recovery_section_create expiration timer = 0x%x\n",
@ -2605,7 +2683,9 @@ static void message_handler_req_exec_ckpt_sectioncreate (
SaAisErrorT error = SA_AIS_OK;
log_printf (LOG_LEVEL_DEBUG, "Executive request to create a checkpoint section.\n");
checkpoint = checkpoint_find (&req_exec_ckpt_sectioncreate->checkpoint_name);
checkpoint = checkpoint_find (
&req_exec_ckpt_sectioncreate->checkpoint_name,
req_exec_ckpt_sectioncreate->ckpt_id);
if (checkpoint == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
@ -2707,6 +2787,7 @@ static void message_handler_req_exec_ckpt_sectioncreate (
memcpy(&ckpt_id->ckpt_name,
&req_exec_ckpt_sectioncreate->checkpoint_name,
sizeof(mar_name_t));
ckpt_id->ckpt_id = req_exec_ckpt_sectioncreate->ckpt_id;
memcpy(&ckpt_id->ckpt_section_id,
&checkpoint_section->section_descriptor.section_id,
sizeof(mar_ckpt_section_id_t));
@ -2756,7 +2837,9 @@ static void message_handler_req_exec_ckpt_sectiondelete (
struct checkpoint_section *checkpoint_section;
SaAisErrorT error = SA_AIS_OK;
checkpoint = checkpoint_find (&req_exec_ckpt_sectiondelete->checkpoint_name);
checkpoint = checkpoint_find (
&req_exec_ckpt_sectiondelete->checkpoint_name,
req_exec_ckpt_sectiondelete->ckpt_id);
if (checkpoint == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
@ -2820,15 +2903,17 @@ static void message_handler_req_exec_ckpt_sectionexpirationtimeset (
struct ckpt_identifier *ckpt_id = 0;
SaAisErrorT error = SA_AIS_OK;
log_printf (LOG_LEVEL_DEBUG, "Executive request to set section expiratoin time\n");
checkpoint = checkpoint_find (&req_exec_ckpt_sectionexpirationtimeset->checkpoint_name);
log_printf (LOG_LEVEL_DEBUG, "Executive request to set section expiration time\n");
checkpoint = checkpoint_find (
&req_exec_ckpt_sectionexpirationtimeset->checkpoint_name,
req_exec_ckpt_sectionexpirationtimeset->ckpt_id);
if (checkpoint == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
if (checkpoint->active_replica_set == 0) {
log_printf (LOG_LEVEL_NOTICE, "expirationset: no active replica, returning error.\n");
log_printf (LOG_LEVEL_DEBUG, "expirationset: no active replica, returning error.\n");
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
@ -2866,6 +2951,8 @@ static void message_handler_req_exec_ckpt_sectionexpirationtimeset (
memcpy(&ckpt_id->ckpt_name,
&req_exec_ckpt_sectionexpirationtimeset->checkpoint_name,
sizeof(mar_name_t));
ckpt_id->ckpt_id =
req_exec_ckpt_sectionexpirationtimeset->ckpt_id;
memcpy(&ckpt_id->ckpt_section_id,
&checkpoint_section->section_descriptor.section_id,
sizeof(mar_ckpt_section_id_t));
@ -2897,9 +2984,11 @@ error_exit:
}
}
static int recovery_section_write(int section_id_len,
static int recovery_section_write(
int section_id_len,
char* section_id,
mar_name_t *checkpoint_name,
mar_uint32_t ckpt_id,
void *new_data,
mar_uint32_t data_offset,
mar_uint32_t data_size)
@ -2911,7 +3000,7 @@ static int recovery_section_write(int section_id_len,
char *sd;
log_printf (LOG_LEVEL_DEBUG, "recovery_section_write.\n");
checkpoint = checkpoint_find (checkpoint_name);
checkpoint = checkpoint_find (checkpoint_name, ckpt_id);
if (checkpoint == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
@ -2965,7 +3054,9 @@ static void message_handler_req_exec_ckpt_sectionwrite (
SaAisErrorT error = SA_AIS_OK;
log_printf (LOG_LEVEL_DEBUG, "Executive request to section write.\n");
checkpoint = checkpoint_find (&req_exec_ckpt_sectionwrite->checkpoint_name);
checkpoint = checkpoint_find (
&req_exec_ckpt_sectionwrite->checkpoint_name,
req_exec_ckpt_sectionwrite->ckpt_id);
if (checkpoint == 0) {
log_printf (LOG_LEVEL_ERROR, "checkpoint_find returned 0 Calling error_exit.\n");
error = SA_AIS_ERR_NOT_EXIST;
@ -2973,7 +3064,7 @@ static void message_handler_req_exec_ckpt_sectionwrite (
}
if (checkpoint->active_replica_set == 0) {
log_printf (LOG_LEVEL_NOTICE, "checkpointwrite: no active replica, returning error.\n");
log_printf (LOG_LEVEL_DEBUG, "checkpointwrite: no active replica, returning error.\n");
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
@ -3071,12 +3162,20 @@ static void message_handler_req_exec_ckpt_sectionoverwrite (
SaAisErrorT error = SA_AIS_OK;
log_printf (LOG_LEVEL_DEBUG, "Executive request to section overwrite.\n");
checkpoint = checkpoint_find (&req_exec_ckpt_sectionoverwrite->checkpoint_name);
checkpoint = checkpoint_find (
&req_exec_ckpt_sectionoverwrite->checkpoint_name,
req_exec_ckpt_sectionoverwrite->ckpt_id);
if (checkpoint == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
if (checkpoint->active_replica_set == 0) {
log_printf (LOG_LEVEL_DEBUG, "sectionoverwrite: no active replica, returning error.\n");
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
if (checkpoint->checkpoint_creation_attributes.max_section_size <
req_exec_ckpt_sectionoverwrite->data_size) {
@ -3162,9 +3261,11 @@ static void message_handler_req_exec_ckpt_sectionread (
log_printf (LOG_LEVEL_DEBUG, "Executive request for section read.\n");
checkpoint = checkpoint_find (&req_exec_ckpt_sectionread->checkpoint_name);
checkpoint = checkpoint_find (
&req_exec_ckpt_sectionread->checkpoint_name,
req_exec_ckpt_sectionread->ckpt_id);
if (checkpoint == 0) {
error = SA_AIS_ERR_LIBRARY; /* TODO find the right error for this */
error = SA_AIS_ERR_LIBRARY;
goto error_exit;
}
@ -3283,7 +3384,8 @@ static int ckpt_lib_exit_fn (void *conn)
struct checkpoint_cleanup, list);
if (checkpoint_cleanup->checkpoint.name.length > 0) {
ckpt_checkpoint_close (&checkpoint_cleanup->checkpoint);
ckpt_checkpoint_close (
&checkpoint_cleanup->checkpoint);
}
list_del (&checkpoint_cleanup->list);
@ -3320,6 +3422,8 @@ static void message_handler_req_lib_ckpt_checkpointopen (
memcpy (&req_exec_ckpt_checkpointopen.checkpoint_name,
&req_lib_ckpt_checkpointopen->checkpoint_name,
sizeof (mar_name_t));
req_exec_ckpt_checkpointopen.ckpt_id =
req_lib_ckpt_checkpointopen->ckpt_id;
memcpy (&req_exec_ckpt_checkpointopen.checkpoint_creation_attributes,
&req_lib_ckpt_checkpointopen->checkpoint_creation_attributes,
sizeof (mar_ckpt_checkpoint_creation_attributes_t));
@ -3360,6 +3464,8 @@ static void message_handler_req_lib_ckpt_checkpointclose (
memcpy (&req_exec_ckpt_checkpointclose.checkpoint_name,
&req_lib_ckpt_checkpointclose->checkpoint_name, sizeof (mar_name_t));
req_exec_ckpt_checkpointclose.ckpt_id =
req_lib_ckpt_checkpointclose->ckpt_id;
iovec.iov_base = (char *)&req_exec_ckpt_checkpointclose;
iovec.iov_len = sizeof (req_exec_ckpt_checkpointclose);
@ -3411,6 +3517,8 @@ static void message_handler_req_lib_ckpt_checkpointretentiondurationset (
memcpy (&req_exec_ckpt_checkpointretentiondurationset.checkpoint_name,
&req_lib_ckpt_checkpointretentiondurationset->checkpoint_name,
sizeof (mar_name_t));
req_exec_ckpt_checkpointretentiondurationset.ckpt_id =
req_lib_ckpt_checkpointretentiondurationset->ckpt_id;
req_exec_ckpt_checkpointretentiondurationset.retention_duration =
req_lib_ckpt_checkpointretentiondurationset->retention_duration;
@ -3431,7 +3539,9 @@ static void message_handler_req_lib_ckpt_activereplicaset (
struct checkpoint *checkpoint;
SaAisErrorT error = SA_AIS_OK;
checkpoint = checkpoint_find (&req_lib_ckpt_activereplicaset->checkpoint_name);
checkpoint = checkpoint_find (
&req_lib_ckpt_activereplicaset->checkpoint_name,
req_lib_ckpt_activereplicaset->ckpt_id);
/*
* Make sure checkpoint is collocated and async update option
@ -3466,7 +3576,9 @@ static void message_handler_req_lib_ckpt_checkpointstatusget (
/*
* Count memory used by checkpoint sections
*/
checkpoint = checkpoint_find (&req_lib_ckpt_checkpointstatusget->checkpoint_name);
checkpoint = checkpoint_find (
&req_lib_ckpt_checkpointstatusget->checkpoint_name,
req_lib_ckpt_checkpointstatusget->ckpt_id);
if (checkpoint && (checkpoint->expired == 0)) {
@ -3531,6 +3643,8 @@ static void message_handler_req_lib_ckpt_sectioncreate (
memcpy (&req_exec_ckpt_sectioncreate.checkpoint_name,
&req_lib_ckpt_sectioncreate->checkpoint_name,
sizeof (mar_name_t));
req_exec_ckpt_sectioncreate.ckpt_id =
req_lib_ckpt_sectioncreate->ckpt_id;
req_exec_ckpt_sectioncreate.id_len = req_lib_ckpt_sectioncreate->id_len;
req_exec_ckpt_sectioncreate.expiration_time =
req_lib_ckpt_sectioncreate->expiration_time;
@ -3581,6 +3695,8 @@ static void message_handler_req_lib_ckpt_sectiondelete (
memcpy (&req_exec_ckpt_sectiondelete.checkpoint_name,
&req_lib_ckpt_sectiondelete->checkpoint_name,
sizeof (mar_name_t));
req_exec_ckpt_sectiondelete.ckpt_id =
req_lib_ckpt_sectiondelete->ckpt_id;
req_exec_ckpt_sectiondelete.id_len = req_lib_ckpt_sectiondelete->id_len;
iovecs[0].iov_base = (char *)&req_exec_ckpt_sectiondelete;
@ -3621,6 +3737,8 @@ static void message_handler_req_lib_ckpt_sectionexpirationtimeset (
memcpy (&req_exec_ckpt_sectionexpirationtimeset.checkpoint_name,
&req_lib_ckpt_sectionexpirationtimeset->checkpoint_name,
sizeof (mar_name_t));
req_exec_ckpt_sectionexpirationtimeset.ckpt_id =
req_lib_ckpt_sectionexpirationtimeset->ckpt_id;
req_exec_ckpt_sectionexpirationtimeset.id_len =
req_lib_ckpt_sectionexpirationtimeset->id_len;
req_exec_ckpt_sectionexpirationtimeset.expiration_time =
@ -3678,7 +3796,8 @@ static void message_handler_req_lib_ckpt_sectionwrite (
memcpy (&req_exec_ckpt_sectionwrite.checkpoint_name,
&req_lib_ckpt_sectionwrite->checkpoint_name,
sizeof (mar_name_t));
req_exec_ckpt_sectionwrite.ckpt_id =
req_lib_ckpt_sectionwrite->ckpt_id;
req_exec_ckpt_sectionwrite.id_len =
req_lib_ckpt_sectionwrite->id_len;
req_exec_ckpt_sectionwrite.data_offset =
@ -3724,6 +3843,8 @@ static void message_handler_req_lib_ckpt_sectionoverwrite (
memcpy (&req_exec_ckpt_sectionoverwrite.checkpoint_name,
&req_lib_ckpt_sectionoverwrite->checkpoint_name,
sizeof (mar_name_t));
req_exec_ckpt_sectionoverwrite.ckpt_id =
req_lib_ckpt_sectionoverwrite->ckpt_id;
req_exec_ckpt_sectionoverwrite.id_len =
req_lib_ckpt_sectionoverwrite->id_len;
req_exec_ckpt_sectionoverwrite.data_size =
@ -3770,6 +3891,8 @@ static void message_handler_req_lib_ckpt_sectionread (
memcpy (&req_exec_ckpt_sectionread.checkpoint_name,
&req_lib_ckpt_sectionread->checkpoint_name,
sizeof (mar_name_t));
req_exec_ckpt_sectionread.ckpt_id =
req_lib_ckpt_sectionread->ckpt_id;
req_exec_ckpt_sectionread.id_len =
req_lib_ckpt_sectionread->id_len;
req_exec_ckpt_sectionread.data_offset =
@ -3804,7 +3927,9 @@ static void message_handler_req_lib_ckpt_checkpointsynchronize (
struct res_lib_ckpt_checkpointsynchronize res_lib_ckpt_checkpointsynchronize;
struct checkpoint *checkpoint;
checkpoint = checkpoint_find (&req_lib_ckpt_checkpointsynchronize->checkpoint_name);
checkpoint = checkpoint_find (
&req_lib_ckpt_checkpointsynchronize->checkpoint_name,
req_lib_ckpt_checkpointsynchronize->ckpt_id);
if ((checkpoint->checkpoint_creation_attributes.creation_flags & (SA_CKPT_WR_ACTIVE_REPLICA | SA_CKPT_WR_ACTIVE_REPLICA_WEAK)) == 0) {
res_lib_ckpt_checkpointsynchronize.header.error = SA_AIS_ERR_BAD_OPERATION;
} else
@ -3831,7 +3956,9 @@ static void message_handler_req_lib_ckpt_checkpointsynchronizeasync (
struct res_lib_ckpt_checkpointsynchronizeasync res_lib_ckpt_checkpointsynchronizeasync;
struct checkpoint *checkpoint;
checkpoint = checkpoint_find (&req_lib_ckpt_checkpointsynchronizeasync->checkpoint_name);
checkpoint = checkpoint_find (
&req_lib_ckpt_checkpointsynchronizeasync->checkpoint_name,
req_lib_ckpt_checkpointsynchronizeasync->ckpt_id);
if ((checkpoint->checkpoint_creation_attributes.creation_flags & (SA_CKPT_WR_ACTIVE_REPLICA | SA_CKPT_WR_ACTIVE_REPLICA_WEAK)) == 0) {
res_lib_ckpt_checkpointsynchronizeasync.header.error = SA_AIS_ERR_BAD_OPERATION;
} else
@ -3876,14 +4003,16 @@ static void message_handler_req_lib_ckpt_sectioniterationinitialize (
log_printf (LOG_LEVEL_DEBUG, "section iteration initialize\n");
checkpoint = checkpoint_find (&req_lib_ckpt_sectioniterationinitialize->checkpoint_name);
checkpoint = checkpoint_find (
&req_lib_ckpt_sectioniterationinitialize->checkpoint_name,
req_lib_ckpt_sectioniterationinitialize->ckpt_id);
if (checkpoint == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
if (checkpoint->active_replica_set == 0) {
log_printf (LOG_LEVEL_NOTICE, "iterationinitialize: no active replica, returning error.\n");
log_printf (LOG_LEVEL_DEBUG, "iterationinitialize: no active replica, returning error.\n");
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
@ -4018,7 +4147,7 @@ static void message_handler_req_lib_ckpt_sectioniterationnext (
SaAisErrorT error = SA_AIS_OK;
int section_id_size = 0;
unsigned int res;
struct iteration_instance *iteration_instance = NULL; // this assignment to null makes no sense and is only needed with -O2 or greater TODO
struct iteration_instance *iteration_instance = NULL;
void *iteration_instance_p;
struct ckpt_pd *ckpt_pd = (struct ckpt_pd *)openais_conn_private_data_get (conn);

View File

@ -91,21 +91,25 @@ struct req_lib_ckpt_checkpointopen {
SaAisErrorT fail_with_error __attribute__((aligned(8)));
mar_invocation_t invocation __attribute__((aligned(8)));
mar_uint32_t async_call __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
} __attribute__((aligned(8)));
struct res_lib_ckpt_checkpointopen {
mar_res_header_t header __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
} __attribute__((aligned(8)));
struct res_lib_ckpt_checkpointopenasync {
mar_res_header_t header __attribute__((aligned(8)));
mar_ckpt_checkpoint_handle_t checkpoint_handle __attribute__((aligned(8)));
mar_invocation_t invocation __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
} __attribute__((aligned(8)));
struct req_lib_ckpt_checkpointclose {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
} __attribute__((aligned(8)));
struct res_lib_ckpt_checkpointclose {
@ -125,7 +129,9 @@ struct req_lib_ckpt_checkpointretentiondurationset {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_time_t retention_duration __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
} __attribute__((aligned(8)));
struct res_lib_ckpt_checkpointretentiondurationset {
mar_res_header_t header __attribute__((aligned(8)));
} __attribute__((aligned(8)));
@ -133,6 +139,7 @@ struct res_lib_ckpt_checkpointretentiondurationset {
struct req_lib_ckpt_activereplicaset {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
} __attribute__((aligned(8)));
struct res_lib_ckpt_activereplicaset {
@ -142,11 +149,13 @@ struct res_lib_ckpt_activereplicaset {
struct req_lib_ckpt_checkpointstatusget {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
} __attribute__((aligned(8)));
struct res_lib_ckpt_checkpointstatusget {
mar_res_header_t header __attribute__((aligned(8)));
mar_ckpt_checkpoint_descriptor_t checkpoint_descriptor __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
} __attribute__((aligned(8)));
struct req_lib_ckpt_sectioncreate {
@ -155,6 +164,7 @@ struct req_lib_ckpt_sectioncreate {
mar_uint32_t id_len __attribute__((aligned(8)));
mar_time_t expiration_time __attribute__((aligned(8)));
mar_uint32_t initial_data_size __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
} __attribute__((aligned(8)));
struct res_lib_ckpt_sectioncreate {
@ -164,6 +174,7 @@ struct res_lib_ckpt_sectioncreate {
struct req_lib_ckpt_sectiondelete {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
} __attribute__((aligned(8)));
@ -176,6 +187,7 @@ struct req_lib_ckpt_sectionexpirationtimeset {
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
mar_time_t expiration_time __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
} __attribute__((aligned(8)));
struct res_lib_ckpt_sectionexpirationtimeset {
@ -185,6 +197,7 @@ struct res_lib_ckpt_sectionexpirationtimeset {
struct req_lib_ckpt_sectioniterationinitialize {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_ckpt_sections_chosen_t sections_chosen __attribute__((aligned(8)));
mar_time_t expiration_time __attribute__((aligned(8)));
} __attribute__((aligned(8)));
@ -216,6 +229,7 @@ struct res_lib_ckpt_sectioniterationnext {
struct req_lib_ckpt_sectionwrite {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
mar_offset_t data_offset __attribute__((aligned(8)));
mar_offset_t data_size __attribute__((aligned(8)));
@ -228,6 +242,7 @@ struct res_lib_ckpt_sectionwrite {
struct req_lib_ckpt_sectionoverwrite {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
mar_uint32_t data_size __attribute__((aligned(8)));
} __attribute__((aligned(8)));
@ -239,6 +254,7 @@ struct res_lib_ckpt_sectionoverwrite {
struct req_lib_ckpt_sectionread {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_uint32_t id_len __attribute__((aligned(8)));
mar_offset_t data_offset __attribute__((aligned(8)));
mar_offset_t data_size __attribute__((aligned(8)));
@ -252,6 +268,7 @@ struct res_lib_ckpt_sectionread {
struct req_lib_ckpt_checkpointsynchronize {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
} __attribute__((aligned(8)));
struct res_lib_ckpt_checkpointsynchronize {
@ -261,6 +278,7 @@ struct res_lib_ckpt_checkpointsynchronize {
struct req_lib_ckpt_checkpointsynchronizeasync {
mar_req_header_t header __attribute__((aligned(8)));
mar_name_t checkpoint_name __attribute__((aligned(8)));
mar_uint32_t ckpt_id __attribute__((aligned(8)));
mar_invocation_t invocation __attribute__((aligned(8)));
} __attribute__((aligned(8)));

View File

@ -81,6 +81,7 @@ struct ckptCheckpointInstance {
SaCkptCheckpointHandleT checkpointHandle;
SaCkptCheckpointOpenFlagsT checkpointOpenFlags;
SaNameT checkpointName;
unsigned int checkpointId;
pthread_mutex_t response_mutex;
struct list_head list;
struct list_head section_iteration_list_head;
@ -441,7 +442,10 @@ saCkptDispatch (
res_lib_ckpt_checkpointopenasync->checkpoint_handle,
(void *)&ckptCheckpointInstance);
assert (error == SA_AIS_OK); /* should only be valid handles here */
assert (error == SA_AIS_OK); /* should only be valid handles here */
ckptCheckpointInstance->checkpointId =
res_lib_ckpt_checkpointopenasync->ckpt_id;
/*
* open succeeded without error
*/
@ -652,6 +656,8 @@ saCkptCheckpointOpen (
error = res_lib_ckpt_checkpointopen.header.error;
goto error_put_destroy;
}
ckptCheckpointInstance->checkpointId =
res_lib_ckpt_checkpointopen.ckpt_id;
pthread_mutex_init (&ckptCheckpointInstance->response_mutex, NULL);
@ -816,6 +822,8 @@ saCkptCheckpointClose (
req_lib_ckpt_checkpointclose.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTCLOSE;
marshall_to_mar_name_t (&req_lib_ckpt_checkpointclose.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_checkpointclose.ckpt_id =
ckptCheckpointInstance->checkpointId;
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
@ -901,6 +909,8 @@ saCkptCheckpointRetentionDurationSet (
req_lib_ckpt_checkpointretentiondurationset.retention_duration = retentionDuration;
marshall_to_mar_name_t (&req_lib_ckpt_checkpointretentiondurationset.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_checkpointretentiondurationset.ckpt_id =
ckptCheckpointInstance->checkpointId;
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
@ -940,6 +950,8 @@ saCkptActiveReplicaSet (
req_lib_ckpt_activereplicaset.header.id = MESSAGE_REQ_CKPT_ACTIVEREPLICASET;
marshall_to_mar_name_t (&req_lib_ckpt_activereplicaset.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_activereplicaset.ckpt_id =
ckptCheckpointInstance->checkpointId;
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
@ -981,6 +993,8 @@ saCkptCheckpointStatusGet (
marshall_to_mar_name_t (&req_lib_ckpt_checkpointstatusget.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_checkpointstatusget.ckpt_id =
ckptCheckpointInstance->checkpointId;
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
@ -1044,6 +1058,8 @@ saCkptSectionCreate (
marshall_to_mar_name_t (&req_lib_ckpt_sectioncreate.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_sectioncreate.ckpt_id =
ckptCheckpointInstance->checkpointId;
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
@ -1115,6 +1131,8 @@ saCkptSectionDelete (
marshall_to_mar_name_t (
&req_lib_ckpt_sectiondelete.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_sectiondelete.ckpt_id =
ckptCheckpointInstance->checkpointId;
error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_sectiondelete,
sizeof (struct req_lib_ckpt_sectiondelete));
@ -1176,6 +1194,8 @@ saCkptSectionExpirationTimeSet (
marshall_to_mar_name_t (&req_lib_ckpt_sectionexpirationtimeset.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_sectionexpirationtimeset.ckpt_id =
ckptCheckpointInstance->checkpointId;
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
@ -1277,6 +1297,8 @@ saCkptSectionIterationInitialize (
marshall_to_mar_name_t (
&req_lib_ckpt_sectioniterationinitialize.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_sectioniterationinitialize.ckpt_id =
ckptCheckpointInstance->checkpointId;
pthread_mutex_lock (&ckptSectionIterationInstance->response_mutex);
@ -1483,6 +1505,8 @@ saCkptCheckpointWrite (
marshall_to_mar_name_t (&req_lib_ckpt_sectionwrite.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_sectionwrite.ckpt_id =
ckptCheckpointInstance->checkpointId;
iov_len = 0;
/* TODO check for zero length stuff */
@ -1569,6 +1593,8 @@ saCkptSectionOverwrite (
req_lib_ckpt_sectionoverwrite.data_size = dataSize;
marshall_to_mar_name_t (&req_lib_ckpt_sectionoverwrite.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_sectionoverwrite.ckpt_id =
ckptCheckpointInstance->checkpointId;
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
@ -1645,6 +1671,8 @@ saCkptCheckpointRead (
marshall_to_mar_name_t (&req_lib_ckpt_sectionread.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_sectionread.ckpt_id =
ckptCheckpointInstance->checkpointId;
iov[0].iov_base = (char *)&req_lib_ckpt_sectionread;
iov[0].iov_len = sizeof (struct req_lib_ckpt_sectionread);
@ -1735,6 +1763,8 @@ saCkptCheckpointSynchronize (
req_lib_ckpt_checkpointsynchronize.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE;
marshall_to_mar_name_t (&req_lib_ckpt_checkpointsynchronize.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_checkpointsynchronize.ckpt_id =
ckptCheckpointInstance->checkpointId;
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
@ -1790,8 +1820,11 @@ saCkptCheckpointSynchronizeAsync (
req_lib_ckpt_checkpointsynchronizeasync.header.size = sizeof (struct req_lib_ckpt_checkpointsynchronizeasync);
req_lib_ckpt_checkpointsynchronizeasync.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC;
marshall_to_mar_name_t (&req_lib_ckpt_checkpointsynchronizeasync.checkpoint_name,
marshall_to_mar_name_t (
&req_lib_ckpt_checkpointsynchronizeasync.checkpoint_name,
&ckptCheckpointInstance->checkpointName);
req_lib_ckpt_checkpointsynchronizeasync.ckpt_id =
ckptCheckpointInstance->checkpointId;
req_lib_ckpt_checkpointsynchronizeasync.invocation = invocation;
pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);