defect 323 - defect 410 - CheckpointOpenAsync and Dispatch not working

(Logical change 1.187)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@620 fd59a12c-fef9-0310-b244-a6a79926bd2f
This commit is contained in:
Steven Dake 2005-04-21 18:32:17 +00:00
parent 3d8dbf1d32
commit 8b7d9fbc4d
4 changed files with 184 additions and 130 deletions

View File

@ -1009,6 +1009,7 @@ static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct i
struct req_exec_ckpt_checkpointopen *req_exec_ckpt_checkpointopen = (struct req_exec_ckpt_checkpointopen *)message;
struct req_lib_ckpt_checkpointopen *req_lib_ckpt_checkpointopen = (struct req_lib_ckpt_checkpointopen *)&req_exec_ckpt_checkpointopen->req_lib_ckpt_checkpointopen;
struct res_lib_ckpt_checkpointopen res_lib_ckpt_checkpointopen;
struct res_lib_ckpt_checkpointopenasync res_lib_ckpt_checkpointopenasync;
struct saCkptCheckpoint *ckptCheckpoint = 0;
struct saCkptCheckpointSection *ckptCheckpointSection = 0;
@ -1120,10 +1121,23 @@ error_exit:
* If this node was the source of the message, respond to this node
*/
if (message_source_is_local(&req_exec_ckpt_checkpointopen->source)) {
res_lib_ckpt_checkpointopen.header.size = sizeof (struct res_lib_ckpt_checkpointopen);
res_lib_ckpt_checkpointopen.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN;
res_lib_ckpt_checkpointopen.header.error = error;
if (req_exec_ckpt_checkpointopen->invocation) {
res_lib_ckpt_checkpointopenasync.header.size = sizeof (struct res_lib_ckpt_checkpointopenasync);
res_lib_ckpt_checkpointopenasync.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC;
res_lib_ckpt_checkpointopenasync.header.error = error;
res_lib_ckpt_checkpointopenasync.checkpointHandle = req_exec_ckpt_checkpointopen->checkpointHandle;
res_lib_ckpt_checkpointopenasync.invocation = req_exec_ckpt_checkpointopen->invocation;
libais_send_response (req_exec_ckpt_checkpointopen->source.conn_info->conn_info_partner, &res_lib_ckpt_checkpointopenasync,
sizeof (struct res_lib_ckpt_checkpointopenasync));
} else {
res_lib_ckpt_checkpointopen.header.size = sizeof (struct res_lib_ckpt_checkpointopen);
res_lib_ckpt_checkpointopen.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN;
res_lib_ckpt_checkpointopen.header.error = error;
libais_send_response (req_exec_ckpt_checkpointopen->source.conn_info, &res_lib_ckpt_checkpointopen,
sizeof (struct res_lib_ckpt_checkpointopen));
}
if (error == SA_AIS_OK) {
checkpoint_cleanup = malloc (sizeof (struct checkpoint_cleanup));
if (checkpoint_cleanup == 0) {
@ -1135,9 +1149,6 @@ error_exit:
&req_exec_ckpt_checkpointopen->source.conn_info->ais_ci.u.libckpt_ci.checkpoint_list);
}
}
libais_send_response (req_exec_ckpt_checkpointopen->source.conn_info, &res_lib_ckpt_checkpointopen,
sizeof (struct res_lib_ckpt_checkpointopen));
}
/* return (error == SA_AIS_OK ? 0 : -1); */
@ -2182,6 +2193,9 @@ static int message_handler_req_lib_ckpt_checkpointopen (struct conn_info *conn_i
memcpy (&req_exec_ckpt_checkpointopen.req_lib_ckpt_checkpointopen,
req_lib_ckpt_checkpointopen,
sizeof (struct req_lib_ckpt_checkpointopen));
req_exec_ckpt_checkpointopen.invocation = 0;
req_exec_ckpt_checkpointopen.checkpointHandle = 0;
iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointopen;
iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointopen);
@ -2193,6 +2207,29 @@ static int message_handler_req_lib_ckpt_checkpointopen (struct conn_info *conn_i
static int message_handler_req_lib_ckpt_checkpointopenasync (struct conn_info *conn_info, void *message)
{
struct req_lib_ckpt_checkpointopenasync *req_lib_ckpt_checkpointopenasync = (struct req_lib_ckpt_checkpointopenasync *)message;
struct req_exec_ckpt_checkpointopen req_exec_ckpt_checkpointopen;
struct iovec iovecs[2];
log_printf (LOG_LEVEL_DEBUG, "Library request to open checkpoint async.\n");
req_exec_ckpt_checkpointopen.header.size =
sizeof (struct req_exec_ckpt_checkpointopen);
req_exec_ckpt_checkpointopen.header.id = MESSAGE_REQ_EXEC_CKPT_CHECKPOINTOPEN;
message_source_set (&req_exec_ckpt_checkpointopen.source, conn_info);
memcpy (&req_exec_ckpt_checkpointopen.req_lib_ckpt_checkpointopen,
req_lib_ckpt_checkpointopenasync,
sizeof (struct req_lib_ckpt_checkpointopen));
req_exec_ckpt_checkpointopen.invocation = req_lib_ckpt_checkpointopenasync->invocation;
req_exec_ckpt_checkpointopen.checkpointHandle = req_lib_ckpt_checkpointopenasync->checkpointHandle;
iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointopen;
iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointopen);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}

View File

@ -113,6 +113,8 @@ struct req_exec_ckpt_checkpointopen {
struct req_header header;
struct message_source source;
struct req_lib_ckpt_checkpointopen req_lib_ckpt_checkpointopen;
SaCkptCheckpointHandleT checkpointHandle;
SaInvocationT invocation;
};
@ -121,6 +123,7 @@ struct req_lib_ckpt_checkpointopenasync {
SaNameT checkpointName;
SaCkptCheckpointCreationAttributesT checkpointCreationAttributes;
SaCkptCheckpointOpenFlagsT checkpointOpenFlags;
SaCkptCheckpointHandleT checkpointHandle;
SaInvocationT invocation;
};

View File

@ -52,7 +52,7 @@
#include "../include/ipc_ckpt.h"
struct message_overlay {
struct req_header header;
struct res_header header;
char data[4096];
};
@ -232,151 +232,107 @@ saCkptSelectionObjectGet (
return (SA_AIS_OK);
}
#ifdef COMPILE_OUT
SaAisErrorT
saCkptDispatch (
const SaCkptHandleT ckptHandle,
SaDispatchFlagsT dispatchFlags)
{
fd_set read_fds;
struct pollfd ufds;
int poll_fd;
int timeout = 1;
SaCkptCallbacksT callbacks;
SaAisErrorT error;
int dispatch_avail;
struct timeval *timeout = 0;
struct ckptInstance *ckptInstance;
struct req_header **queue_msg;
struct req_header *msg;
int empty;
int ignore_dispatch = 0;
int cont = 1; /* always continue do loop except when set to 0 */
struct message_overlay dispatch_data;
struct res_lib_ckpt_checkpointopenasync *res_lib_ckpt_checkpointopenasync;
error = saHandleInstanceGet (&ckptHandleDatabase, ckptHandle, (void *)&ckptInstance);
if (error != SA_AIS_OK) {
return (error);
goto error_put;
}
/*
* Timeout instantly for SA_DISPATCH_ALL
*/
if (dispatchFlags & SA_DISPATCH_ALL) {
timeout = &zerousec;
if (dispatchFlags == SA_DISPATCH_ALL) {
timeout = 0;
}
do {
/*
* Read data directly from socket
*/
FD_ZERO (&read_fds);
FD_SET (ckptInstance->response_fd, &read_fds);
poll_fd = ckptInstance->dispatch_fd;
ufds.fd = poll_fd;
ufds.events = POLLIN;
ufds.revents = 0;
error = saSelectRetry (ckptInstance->response_fd + 1, &read_fds, 0, 0, timeout);
error = saPollRetry(&ufds, 1, timeout);
if (error != SA_AIS_OK) {
goto error_exit;
goto error_put;
}
pthread_mutex_lock(&ckptInstance->dispatch_mutex);
if (ckptInstance->finalize == 1) {
#ifdef DBGLOGS
fprintf(stderr,"CKPT: This Ckpt has been finalised in a separate call so exit the dispatch process\n");
#endif
error = SA_AIS_OK;
goto error_unlock;
}
dispatch_avail = FD_ISSET (ckptInstance->response_fd, &read_fds);
if ((ufds.revents & (POLLERR|POLLHUP|POLLNVAL)) != 0) {
error = SA_AIS_ERR_BAD_HANDLE;
goto error_unlock;
}
dispatch_avail = (ufds.revents & POLLIN);
if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
pthread_mutex_unlock(&ckptInstance->dispatch_mutex);
break; /* exit do while cont is 1 loop */
} else
if (dispatch_avail == 0) {
continue; /* next select */
pthread_mutex_unlock(&ckptInstance->dispatch_mutex);
continue;
}
saQueueIsEmpty(&ckptInstance->inq, &empty);
if (empty == 0) {
/*
* Queue is not empty, read data from queue
*/
saQueueItemGet (&ckptInstance->inq, (void **)&queue_msg);
msg = *queue_msg;
memcpy (&ckptInstance->message, msg, msg->size);
saQueueItemRemove (&ckptInstance->inq);
free (msg);
} else {
/*
* Queue empty, read response from socket
*/
error = saRecvRetry (ckptInstance->response_fd, &ckptInstance->message.header, sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL);
memset(&dispatch_data,0, sizeof(struct message_overlay));
error = saRecvRetry (ckptInstance->dispatch_fd, &dispatch_data.header, sizeof (struct res_header), MSG_WAITALL | MSG_NOSIGNAL);
if (error != SA_AIS_OK) {
goto error_unlock;
}
if (dispatch_data.header.size > sizeof (struct res_header)) {
error = saRecvRetry (ckptInstance->dispatch_fd, &dispatch_data.data,
dispatch_data.header.size - sizeof (struct res_header),
MSG_WAITALL | MSG_NOSIGNAL);
if (error != SA_AIS_OK) {
goto error_exit;
}
if (ckptInstance->message.header.size > sizeof (struct req_header)) {
error = saRecvRetry (ckptInstance->response_fd, &ckptInstance->message.data,
ckptInstance->message.header.size - sizeof (struct req_header),
MSG_WAITALL | MSG_NOSIGNAL);
if (error != SA_AIS_OK) {
goto error_exit;
}
goto error_unlock;
}
}
/*
* Make copy of callbacks, message data, unlock instance,
* and call callback. A risk of this dispatch method is that
* the callback routines may operate at the same time that
* CkptFinalize has been called in another thread.
*/
memcpy(&callbacks,&ckptInstance->callbacks, sizeof(ckptInstance->callbacks));
pthread_mutex_unlock(&ckptInstance->dispatch_mutex);
/*
* Dispatch incoming response
*/
switch (ckptInstance->message.header.id) {
#ifdef COMPILE_OUT
case MESSAGE_RES_CKPT_CHECKPOINT_ACTIVATEPOLL:
/*
* This is a do nothing message which the node executive sends
* to activate the file handle in poll when the library has
* queued a message into amfHandle->inq
* The dispatch is ignored for the following two cases:
* 1) setting of timeout to zero for the DISPATCH_ALL case
* 2) expiration of the do loop for the DISPATCH_ONE case
*/
ignore_dispatch = 1;
break;
switch (dispatch_data.header.id) {
case MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC:
res_lib_ckpt_checkpointopenasync = (struct res_lib_ckpt_checkpointopenasync *) &dispatch_data;
case MESSAGE_RES_CKPT_CHECKPOINT_HEALTHCHECKCALLBACK:
res_amf_healthcheckcallback = (struct res_amf_healthcheckcallback *)&ckptInstance->message;
amfInstance->callbacks.saAmfHealthcheckCallback (
res_amf_healthcheckcallback->invocation,
&res_amf_healthcheckcallback->compName,
res_amf_healthcheckcallback->checkType);
callbacks.saCkptCheckpointOpenCallback(res_lib_ckpt_checkpointopenasync->invocation,
res_lib_ckpt_checkpointopenasync->checkpointHandle,
res_lib_ckpt_checkpointopenasync->header.error);
break;
case MESSAGE_RES_CKPT_CHECKPOINT_READINESSSTATESETCALLBACK:
res_amf_readinessstatesetcallback = (struct res_amf_readinessstatesetcallback *)&ckptInstance->message;
amfInstance->callbacks.saAmfReadinessStateSetCallback (
res_amf_readinessstatesetcallback->invocation,
&res_amf_readinessstatesetcallback->compName,
res_amf_readinessstatesetcallback->readinessState);
break;
case MESSAGE_RES_CKPT_CHECKPOINT_CSISETCALLBACK:
res_amf_csisetcallback = (struct res_amf_csisetcallback *)&ckptInstance->message;
amfInstance->callbacks.saAmfCSISetCallback (
res_amf_csisetcallback->invocation,
&res_amf_csisetcallback->compName,
&res_amf_csisetcallback->csiName,
res_amf_csisetcallback->csiFlags,
&res_amf_csisetcallback->haState,
&res_amf_csisetcallback->activeCompName,
res_amf_csisetcallback->transitionDescriptor);
break;
case MESSAGE_RES_CKPT_CHECKPOINT_CSIREMOVECALLBACK:
res_amf_csiremovecallback = (struct res_amf_csiremovecallback *)&ckptInstance->message;
amfInstance->callbacks.saAmfCSIRemoveCallback (
res_amf_csiremovecallback->invocation,
&res_amf_csiremovecallback->compName,
&res_amf_csiremovecallback->csiName,
&res_amf_csiremovecallback->csiFlags);
break;
case MESSAGE_RES_CKPT_CHECKPOINT_PROTECTIONGROUPTRACKCALLBACK:
res_amf_protectiongrouptrackcallback = (struct res_amf_protectiongrouptrackcallback *)&ckptInstance->message;
memcpy (res_amf_protectiongrouptrackcallback->notificationBufferAddress,
res_amf_protectiongrouptrackcallback->notificationBuffer,
res_amf_protectiongrouptrackcallback->numberOfItems * sizeof (SaAmfProtectionGroupNotificationT));
amfInstance->callbacks.saAmfProtectionGroupTrackCallback(
&res_amf_protectiongrouptrackcallback->csiName,
res_amf_protectiongrouptrackcallback->notificationBufferAddress,
res_amf_protectiongrouptrackcallback->numberOfItems,
res_amf_protectiongrouptrackcallback->numberOfMembers,
res_amf_protectiongrouptrackcallback->error);
break;
#endif
default:
/* TODO */
break;
@ -386,26 +342,20 @@ saCkptDispatch (
*/
switch (dispatchFlags) {
case SA_DISPATCH_ONE:
if (ignore_dispatch) {
ignore_dispatch = 0;
} else {
cont = 0;
}
break;
case SA_DISPATCH_ALL:
if (ignore_dispatch) {
ignore_dispatch = 0;
}
break;
case SA_DISPATCH_BLOCKING:
break;
}
} while (cont);
error_exit:
error_unlock:
pthread_mutex_unlock(&ckptInstance->dispatch_mutex);
error_put:
saHandleInstancePut(&ckptHandleDatabase, ckptHandle);
return (error);
}
#endif
SaAisErrorT
saCkptFinalize (
@ -535,15 +485,36 @@ saCkptCheckpointOpenAsync (
const SaCkptCheckpointCreationAttributesT *checkpointCreationAttributes,
SaCkptCheckpointOpenFlagsT checkpointOpenFlags)
{
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct ckptInstance *ckptInstance;
SaCkptCheckpointHandleT checkpointHandle;
SaAisErrorT error;
struct req_lib_ckpt_checkpointopenasync req_lib_ckpt_checkpointopenasync;
error = saHandleCreate (&checkpointHandleDatabase,
sizeof (struct ckptCheckpointInstance), &checkpointHandle);
if (error != SA_AIS_OK) {
goto error_no_destroy;
}
error = saHandleInstanceGet (&checkpointHandleDatabase, checkpointHandle,
(void *)&ckptCheckpointInstance);
if (error != SA_AIS_OK) {
goto error_destroy;
}
error = saHandleInstanceGet (&ckptHandleDatabase, ckptHandle, (void *)&ckptInstance);
if (error != SA_AIS_OK) {
return (error);
goto error_put_destroy;
}
ckptCheckpointInstance->response_fd = ckptInstance->response_fd;
ckptCheckpointInstance->maxSectionIdSize =
checkpointCreationAttributes->maxSectionIdSize;
ckptCheckpointInstance->ckptHandle = ckptHandle;
ckptCheckpointInstance->checkpointOpenFlags = checkpointOpenFlags;
memcpy (&ckptCheckpointInstance->checkpointName, checkpointName, sizeof (SaNameT));
req_lib_ckpt_checkpointopenasync.header.size = sizeof (struct req_lib_ckpt_checkpointopenasync);
req_lib_ckpt_checkpointopenasync.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC;
req_lib_ckpt_checkpointopenasync.invocation = invocation;
@ -553,16 +524,27 @@ saCkptCheckpointOpenAsync (
sizeof (SaCkptCheckpointCreationAttributesT));
req_lib_ckpt_checkpointopenasync.checkpointOpenFlags = checkpointOpenFlags;
req_lib_ckpt_checkpointopenasync.checkpointHandle = checkpointHandle;
pthread_mutex_lock (&ckptInstance->response_mutex);
error = saSendRetry (ckptInstance->response_fd, &req_lib_ckpt_checkpointopenasync,
error = saSendRetry (ckptInstance->response_fd, &req_lib_ckpt_checkpointopenasync,
sizeof (struct req_lib_ckpt_checkpointopenasync), MSG_NOSIGNAL);
if (error != SA_AIS_OK) {
goto error_put_ckpt_destroy;
}
pthread_mutex_init (&ckptCheckpointInstance->response_mutex, NULL);
pthread_mutex_unlock (&ckptInstance->response_mutex);
saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
return (error);
error_put_ckpt_destroy:
saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
error_put_destroy:
saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
error_destroy:
saHandleDestroy (&checkpointHandleDatabase, checkpointHandle);
error_no_destroy:
return (error);
}

View File

@ -50,6 +50,7 @@
#define SECONDS_TO_EXPIRE 4
int ckptinv;
SaInvocationT open_invocation = 16;
void printSaNameT (SaNameT *name)
{
int i;
@ -169,14 +170,27 @@ SaCkptIOVectorElementT default_write_vector[] = {
}
};
SaCkptCheckpointHandleT checkpointHandle;
void OpenCallBack (
SaInvocationT invocation,
const SaCkptCheckpointHandleT chckpointHandle,
SaAisErrorT error) {
printf ("%s: This is a call back for open for invocation = %d\n",
get_test_output (error, SA_AIS_OK), (int)invocation);
memcpy(&checkpointHandle, &chckpointHandle, sizeof(SaCkptCheckpointHandleT));
}
SaCkptCallbacksT callbacks = {
0,
&OpenCallBack,
0
};
int main (void) {
SaCkptHandleT ckptHandle;
SaCkptCheckpointHandleT checkpointHandle;
SaCkptCheckpointHandleT checkpointHandle2;
SaCkptCheckpointHandleT checkpointHandleRead;
SaCkptCheckpointDescriptorT checkpointStatus;
@ -190,6 +204,24 @@ int main (void) {
int sel_fd;
error = saCkptInitialize (&ckptHandle, &callbacks, &version);
error = saCkptCheckpointOpenAsync (ckptHandle,
open_invocation,
&checkpointName,
&checkpointCreationAttributes,
SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE);
printf ("%s: initial asynchronous open of checkpoint\n",
get_test_output (error, SA_AIS_OK));
error = saCkptDispatch (ckptHandle, SA_DISPATCH_ONE);
printf ("%s: Dispatch response for open async of checkpoint\n",
get_test_output (error, SA_AIS_OK));
error = saCkptCheckpointClose (checkpointHandle);
printf ("%s: Closing checkpoint\n",
get_test_output (error, SA_AIS_OK));
error = saCkptCheckpointOpen (ckptHandle,
&checkpointName,