diff --git a/exec/ckpt.c b/exec/ckpt.c index db0cb3bb..5c3a9c6f 100644 --- a/exec/ckpt.c +++ b/exec/ckpt.c @@ -1009,6 +1009,7 @@ static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct i struct req_exec_ckpt_checkpointopen *req_exec_ckpt_checkpointopen = (struct req_exec_ckpt_checkpointopen *)message; struct req_lib_ckpt_checkpointopen *req_lib_ckpt_checkpointopen = (struct req_lib_ckpt_checkpointopen *)&req_exec_ckpt_checkpointopen->req_lib_ckpt_checkpointopen; struct res_lib_ckpt_checkpointopen res_lib_ckpt_checkpointopen; + struct res_lib_ckpt_checkpointopenasync res_lib_ckpt_checkpointopenasync; struct saCkptCheckpoint *ckptCheckpoint = 0; struct saCkptCheckpointSection *ckptCheckpointSection = 0; @@ -1120,10 +1121,23 @@ error_exit: * If this node was the source of the message, respond to this node */ if (message_source_is_local(&req_exec_ckpt_checkpointopen->source)) { - res_lib_ckpt_checkpointopen.header.size = sizeof (struct res_lib_ckpt_checkpointopen); - res_lib_ckpt_checkpointopen.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN; - res_lib_ckpt_checkpointopen.header.error = error; + if (req_exec_ckpt_checkpointopen->invocation) { + res_lib_ckpt_checkpointopenasync.header.size = sizeof (struct res_lib_ckpt_checkpointopenasync); + res_lib_ckpt_checkpointopenasync.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC; + res_lib_ckpt_checkpointopenasync.header.error = error; + res_lib_ckpt_checkpointopenasync.checkpointHandle = req_exec_ckpt_checkpointopen->checkpointHandle; + res_lib_ckpt_checkpointopenasync.invocation = req_exec_ckpt_checkpointopen->invocation; + libais_send_response (req_exec_ckpt_checkpointopen->source.conn_info->conn_info_partner, &res_lib_ckpt_checkpointopenasync, + sizeof (struct res_lib_ckpt_checkpointopenasync)); + } else { + res_lib_ckpt_checkpointopen.header.size = sizeof (struct res_lib_ckpt_checkpointopen); + res_lib_ckpt_checkpointopen.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN; + res_lib_ckpt_checkpointopen.header.error = error; + + libais_send_response (req_exec_ckpt_checkpointopen->source.conn_info, &res_lib_ckpt_checkpointopen, + sizeof (struct res_lib_ckpt_checkpointopen)); + } if (error == SA_AIS_OK) { checkpoint_cleanup = malloc (sizeof (struct checkpoint_cleanup)); if (checkpoint_cleanup == 0) { @@ -1135,9 +1149,6 @@ error_exit: &req_exec_ckpt_checkpointopen->source.conn_info->ais_ci.u.libckpt_ci.checkpoint_list); } } - - libais_send_response (req_exec_ckpt_checkpointopen->source.conn_info, &res_lib_ckpt_checkpointopen, - sizeof (struct res_lib_ckpt_checkpointopen)); } /* return (error == SA_AIS_OK ? 0 : -1); */ @@ -2182,6 +2193,9 @@ static int message_handler_req_lib_ckpt_checkpointopen (struct conn_info *conn_i memcpy (&req_exec_ckpt_checkpointopen.req_lib_ckpt_checkpointopen, req_lib_ckpt_checkpointopen, sizeof (struct req_lib_ckpt_checkpointopen)); + + req_exec_ckpt_checkpointopen.invocation = 0; + req_exec_ckpt_checkpointopen.checkpointHandle = 0; iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointopen; iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointopen); @@ -2193,6 +2207,29 @@ static int message_handler_req_lib_ckpt_checkpointopen (struct conn_info *conn_i static int message_handler_req_lib_ckpt_checkpointopenasync (struct conn_info *conn_info, void *message) { + struct req_lib_ckpt_checkpointopenasync *req_lib_ckpt_checkpointopenasync = (struct req_lib_ckpt_checkpointopenasync *)message; + struct req_exec_ckpt_checkpointopen req_exec_ckpt_checkpointopen; + struct iovec iovecs[2]; + + log_printf (LOG_LEVEL_DEBUG, "Library request to open checkpoint async.\n"); + req_exec_ckpt_checkpointopen.header.size = + sizeof (struct req_exec_ckpt_checkpointopen); + req_exec_ckpt_checkpointopen.header.id = MESSAGE_REQ_EXEC_CKPT_CHECKPOINTOPEN; + + message_source_set (&req_exec_ckpt_checkpointopen.source, conn_info); + + memcpy (&req_exec_ckpt_checkpointopen.req_lib_ckpt_checkpointopen, + req_lib_ckpt_checkpointopenasync, + sizeof (struct req_lib_ckpt_checkpointopen)); + + req_exec_ckpt_checkpointopen.invocation = req_lib_ckpt_checkpointopenasync->invocation; + req_exec_ckpt_checkpointopen.checkpointHandle = req_lib_ckpt_checkpointopenasync->checkpointHandle; + + iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointopen; + iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointopen); + + assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + return (0); } diff --git a/include/ipc_ckpt.h b/include/ipc_ckpt.h index e8d1c96b..d603f5f2 100644 --- a/include/ipc_ckpt.h +++ b/include/ipc_ckpt.h @@ -113,6 +113,8 @@ struct req_exec_ckpt_checkpointopen { struct req_header header; struct message_source source; struct req_lib_ckpt_checkpointopen req_lib_ckpt_checkpointopen; + SaCkptCheckpointHandleT checkpointHandle; + SaInvocationT invocation; }; @@ -121,6 +123,7 @@ struct req_lib_ckpt_checkpointopenasync { SaNameT checkpointName; SaCkptCheckpointCreationAttributesT checkpointCreationAttributes; SaCkptCheckpointOpenFlagsT checkpointOpenFlags; + SaCkptCheckpointHandleT checkpointHandle; SaInvocationT invocation; }; diff --git a/lib/ckpt.c b/lib/ckpt.c index 0d16b1f2..ed7220bd 100644 --- a/lib/ckpt.c +++ b/lib/ckpt.c @@ -52,7 +52,7 @@ #include "../include/ipc_ckpt.h" struct message_overlay { - struct req_header header; + struct res_header header; char data[4096]; }; @@ -232,151 +232,107 @@ saCkptSelectionObjectGet ( return (SA_AIS_OK); } -#ifdef COMPILE_OUT SaAisErrorT saCkptDispatch ( const SaCkptHandleT ckptHandle, SaDispatchFlagsT dispatchFlags) { - fd_set read_fds; + struct pollfd ufds; + int poll_fd; + int timeout = 1; + SaCkptCallbacksT callbacks; SaAisErrorT error; int dispatch_avail; - struct timeval *timeout = 0; struct ckptInstance *ckptInstance; - struct req_header **queue_msg; - struct req_header *msg; - int empty; - int ignore_dispatch = 0; int cont = 1; /* always continue do loop except when set to 0 */ + struct message_overlay dispatch_data; + struct res_lib_ckpt_checkpointopenasync *res_lib_ckpt_checkpointopenasync; + error = saHandleInstanceGet (&ckptHandleDatabase, ckptHandle, (void *)&ckptInstance); if (error != SA_AIS_OK) { - return (error); + goto error_put; } /* * Timeout instantly for SA_DISPATCH_ALL */ - if (dispatchFlags & SA_DISPATCH_ALL) { - timeout = &zerousec; + if (dispatchFlags == SA_DISPATCH_ALL) { + timeout = 0; } do { /* * Read data directly from socket */ - FD_ZERO (&read_fds); - FD_SET (ckptInstance->response_fd, &read_fds); + poll_fd = ckptInstance->dispatch_fd; + ufds.fd = poll_fd; + ufds.events = POLLIN; + ufds.revents = 0; - error = saSelectRetry (ckptInstance->response_fd + 1, &read_fds, 0, 0, timeout); + error = saPollRetry(&ufds, 1, timeout); if (error != SA_AIS_OK) { - goto error_exit; + goto error_put; + } + pthread_mutex_lock(&ckptInstance->dispatch_mutex); + + if (ckptInstance->finalize == 1) { +#ifdef DBGLOGS + fprintf(stderr,"CKPT: This Ckpt has been finalised in a separate call so exit the dispatch process\n"); +#endif + error = SA_AIS_OK; + goto error_unlock; } - dispatch_avail = FD_ISSET (ckptInstance->response_fd, &read_fds); + if ((ufds.revents & (POLLERR|POLLHUP|POLLNVAL)) != 0) { + error = SA_AIS_ERR_BAD_HANDLE; + goto error_unlock; + } + + dispatch_avail = (ufds.revents & POLLIN); + if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) { + pthread_mutex_unlock(&ckptInstance->dispatch_mutex); break; /* exit do while cont is 1 loop */ } else if (dispatch_avail == 0) { - continue; /* next select */ + pthread_mutex_unlock(&ckptInstance->dispatch_mutex); + continue; } - - saQueueIsEmpty(&ckptInstance->inq, &empty); - if (empty == 0) { - /* - * Queue is not empty, read data from queue - */ - saQueueItemGet (&ckptInstance->inq, (void **)&queue_msg); - msg = *queue_msg; - memcpy (&ckptInstance->message, msg, msg->size); - saQueueItemRemove (&ckptInstance->inq); - free (msg); - } else { - /* - * Queue empty, read response from socket - */ - error = saRecvRetry (ckptInstance->response_fd, &ckptInstance->message.header, sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL); + + memset(&dispatch_data,0, sizeof(struct message_overlay)); + error = saRecvRetry (ckptInstance->dispatch_fd, &dispatch_data.header, sizeof (struct res_header), MSG_WAITALL | MSG_NOSIGNAL); + if (error != SA_AIS_OK) { + goto error_unlock; + } + if (dispatch_data.header.size > sizeof (struct res_header)) { + error = saRecvRetry (ckptInstance->dispatch_fd, &dispatch_data.data, + dispatch_data.header.size - sizeof (struct res_header), + MSG_WAITALL | MSG_NOSIGNAL); if (error != SA_AIS_OK) { - goto error_exit; - } - if (ckptInstance->message.header.size > sizeof (struct req_header)) { - error = saRecvRetry (ckptInstance->response_fd, &ckptInstance->message.data, - ckptInstance->message.header.size - sizeof (struct req_header), - MSG_WAITALL | MSG_NOSIGNAL); - if (error != SA_AIS_OK) { - goto error_exit; - } + goto error_unlock; } } - + /* + * Make copy of callbacks, message data, unlock instance, + * and call callback. A risk of this dispatch method is that + * the callback routines may operate at the same time that + * CkptFinalize has been called in another thread. + */ + memcpy(&callbacks,&ckptInstance->callbacks, sizeof(ckptInstance->callbacks)); + pthread_mutex_unlock(&ckptInstance->dispatch_mutex); /* * Dispatch incoming response */ - switch (ckptInstance->message.header.id) { -#ifdef COMPILE_OUT - case MESSAGE_RES_CKPT_CHECKPOINT_ACTIVATEPOLL: - /* - * This is a do nothing message which the node executive sends - * to activate the file handle in poll when the library has - * queued a message into amfHandle->inq - * The dispatch is ignored for the following two cases: - * 1) setting of timeout to zero for the DISPATCH_ALL case - * 2) expiration of the do loop for the DISPATCH_ONE case - */ - ignore_dispatch = 1; - break; + switch (dispatch_data.header.id) { + case MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC: + res_lib_ckpt_checkpointopenasync = (struct res_lib_ckpt_checkpointopenasync *) &dispatch_data; - case MESSAGE_RES_CKPT_CHECKPOINT_HEALTHCHECKCALLBACK: - res_amf_healthcheckcallback = (struct res_amf_healthcheckcallback *)&ckptInstance->message; - amfInstance->callbacks.saAmfHealthcheckCallback ( - res_amf_healthcheckcallback->invocation, - &res_amf_healthcheckcallback->compName, - res_amf_healthcheckcallback->checkType); + callbacks.saCkptCheckpointOpenCallback(res_lib_ckpt_checkpointopenasync->invocation, + res_lib_ckpt_checkpointopenasync->checkpointHandle, + res_lib_ckpt_checkpointopenasync->header.error); + break; - - case MESSAGE_RES_CKPT_CHECKPOINT_READINESSSTATESETCALLBACK: - res_amf_readinessstatesetcallback = (struct res_amf_readinessstatesetcallback *)&ckptInstance->message; - amfInstance->callbacks.saAmfReadinessStateSetCallback ( - res_amf_readinessstatesetcallback->invocation, - &res_amf_readinessstatesetcallback->compName, - res_amf_readinessstatesetcallback->readinessState); - break; - - case MESSAGE_RES_CKPT_CHECKPOINT_CSISETCALLBACK: - res_amf_csisetcallback = (struct res_amf_csisetcallback *)&ckptInstance->message; - amfInstance->callbacks.saAmfCSISetCallback ( - res_amf_csisetcallback->invocation, - &res_amf_csisetcallback->compName, - &res_amf_csisetcallback->csiName, - res_amf_csisetcallback->csiFlags, - &res_amf_csisetcallback->haState, - &res_amf_csisetcallback->activeCompName, - res_amf_csisetcallback->transitionDescriptor); - break; - - case MESSAGE_RES_CKPT_CHECKPOINT_CSIREMOVECALLBACK: - res_amf_csiremovecallback = (struct res_amf_csiremovecallback *)&ckptInstance->message; - amfInstance->callbacks.saAmfCSIRemoveCallback ( - res_amf_csiremovecallback->invocation, - &res_amf_csiremovecallback->compName, - &res_amf_csiremovecallback->csiName, - &res_amf_csiremovecallback->csiFlags); - break; - - case MESSAGE_RES_CKPT_CHECKPOINT_PROTECTIONGROUPTRACKCALLBACK: - res_amf_protectiongrouptrackcallback = (struct res_amf_protectiongrouptrackcallback *)&ckptInstance->message; - memcpy (res_amf_protectiongrouptrackcallback->notificationBufferAddress, - res_amf_protectiongrouptrackcallback->notificationBuffer, - res_amf_protectiongrouptrackcallback->numberOfItems * sizeof (SaAmfProtectionGroupNotificationT)); - amfInstance->callbacks.saAmfProtectionGroupTrackCallback( - &res_amf_protectiongrouptrackcallback->csiName, - res_amf_protectiongrouptrackcallback->notificationBufferAddress, - res_amf_protectiongrouptrackcallback->numberOfItems, - res_amf_protectiongrouptrackcallback->numberOfMembers, - res_amf_protectiongrouptrackcallback->error); - break; - -#endif default: /* TODO */ break; @@ -386,26 +342,20 @@ saCkptDispatch ( */ switch (dispatchFlags) { case SA_DISPATCH_ONE: - if (ignore_dispatch) { - ignore_dispatch = 0; - } else { cont = 0; - } break; case SA_DISPATCH_ALL: - if (ignore_dispatch) { - ignore_dispatch = 0; - } break; case SA_DISPATCH_BLOCKING: break; } } while (cont); - -error_exit: +error_unlock: + pthread_mutex_unlock(&ckptInstance->dispatch_mutex); +error_put: + saHandleInstancePut(&ckptHandleDatabase, ckptHandle); return (error); } -#endif SaAisErrorT saCkptFinalize ( @@ -535,15 +485,36 @@ saCkptCheckpointOpenAsync ( const SaCkptCheckpointCreationAttributesT *checkpointCreationAttributes, SaCkptCheckpointOpenFlagsT checkpointOpenFlags) { + struct ckptCheckpointInstance *ckptCheckpointInstance; struct ckptInstance *ckptInstance; + SaCkptCheckpointHandleT checkpointHandle; SaAisErrorT error; struct req_lib_ckpt_checkpointopenasync req_lib_ckpt_checkpointopenasync; + error = saHandleCreate (&checkpointHandleDatabase, + sizeof (struct ckptCheckpointInstance), &checkpointHandle); + if (error != SA_AIS_OK) { + goto error_no_destroy; + } + + error = saHandleInstanceGet (&checkpointHandleDatabase, checkpointHandle, + (void *)&ckptCheckpointInstance); + if (error != SA_AIS_OK) { + goto error_destroy; + } + error = saHandleInstanceGet (&ckptHandleDatabase, ckptHandle, (void *)&ckptInstance); if (error != SA_AIS_OK) { - return (error); + goto error_put_destroy; } + ckptCheckpointInstance->response_fd = ckptInstance->response_fd; + ckptCheckpointInstance->maxSectionIdSize = + checkpointCreationAttributes->maxSectionIdSize; + ckptCheckpointInstance->ckptHandle = ckptHandle; + ckptCheckpointInstance->checkpointOpenFlags = checkpointOpenFlags; + memcpy (&ckptCheckpointInstance->checkpointName, checkpointName, sizeof (SaNameT)); + req_lib_ckpt_checkpointopenasync.header.size = sizeof (struct req_lib_ckpt_checkpointopenasync); req_lib_ckpt_checkpointopenasync.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC; req_lib_ckpt_checkpointopenasync.invocation = invocation; @@ -553,16 +524,27 @@ saCkptCheckpointOpenAsync ( sizeof (SaCkptCheckpointCreationAttributesT)); req_lib_ckpt_checkpointopenasync.checkpointOpenFlags = checkpointOpenFlags; + req_lib_ckpt_checkpointopenasync.checkpointHandle = checkpointHandle; - pthread_mutex_lock (&ckptInstance->response_mutex); - - error = saSendRetry (ckptInstance->response_fd, &req_lib_ckpt_checkpointopenasync, + error = saSendRetry (ckptInstance->response_fd, &req_lib_ckpt_checkpointopenasync, sizeof (struct req_lib_ckpt_checkpointopenasync), MSG_NOSIGNAL); + + if (error != SA_AIS_OK) { + goto error_put_ckpt_destroy; + } + + pthread_mutex_init (&ckptCheckpointInstance->response_mutex, NULL); - pthread_mutex_unlock (&ckptInstance->response_mutex); + saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle); + return (error); +error_put_ckpt_destroy: saHandleInstancePut (&ckptHandleDatabase, ckptHandle); - +error_put_destroy: + saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle); +error_destroy: + saHandleDestroy (&checkpointHandleDatabase, checkpointHandle); +error_no_destroy: return (error); } diff --git a/test/testckpt.c b/test/testckpt.c index 49661409..f6ec43a2 100644 --- a/test/testckpt.c +++ b/test/testckpt.c @@ -50,6 +50,7 @@ #define SECONDS_TO_EXPIRE 4 int ckptinv; +SaInvocationT open_invocation = 16; void printSaNameT (SaNameT *name) { int i; @@ -169,14 +170,27 @@ SaCkptIOVectorElementT default_write_vector[] = { } }; +SaCkptCheckpointHandleT checkpointHandle; + +void OpenCallBack ( + SaInvocationT invocation, + const SaCkptCheckpointHandleT chckpointHandle, + SaAisErrorT error) { + + printf ("%s: This is a call back for open for invocation = %d\n", + get_test_output (error, SA_AIS_OK), (int)invocation); + + memcpy(&checkpointHandle, &chckpointHandle, sizeof(SaCkptCheckpointHandleT)); + +} + SaCkptCallbacksT callbacks = { - 0, + &OpenCallBack, 0 }; int main (void) { SaCkptHandleT ckptHandle; - SaCkptCheckpointHandleT checkpointHandle; SaCkptCheckpointHandleT checkpointHandle2; SaCkptCheckpointHandleT checkpointHandleRead; SaCkptCheckpointDescriptorT checkpointStatus; @@ -190,6 +204,24 @@ int main (void) { int sel_fd; error = saCkptInitialize (&ckptHandle, &callbacks, &version); + + error = saCkptCheckpointOpenAsync (ckptHandle, + open_invocation, + &checkpointName, + &checkpointCreationAttributes, + SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE); + printf ("%s: initial asynchronous open of checkpoint\n", + get_test_output (error, SA_AIS_OK)); + + + error = saCkptDispatch (ckptHandle, SA_DISPATCH_ONE); + + printf ("%s: Dispatch response for open async of checkpoint\n", + get_test_output (error, SA_AIS_OK)); + + error = saCkptCheckpointClose (checkpointHandle); + printf ("%s: Closing checkpoint\n", + get_test_output (error, SA_AIS_OK)); error = saCkptCheckpointOpen (ckptHandle, &checkpointName,