Add flow control to amf.

(Logical change 1.55)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@178 fd59a12c-fef9-0310-b244-a6a79926bd2f
This commit is contained in:
Steven Dake 2004-08-20 22:18:34 +00:00
parent 52a1621c74
commit f3cf1df94c

147
lib/amf.c
View File

@ -51,7 +51,7 @@
#include "util.h"
struct message_overlay {
struct message_header header;
struct req_header header;
char data[4096];
};
@ -197,13 +197,13 @@ saAmfDispatch (
int dispatch_avail;
struct amfInstance *amfInstance;
SaAmfCallbacksT callbacks;
struct res_amf_healthcheckcallback *res_amf_healthcheckcallback;
struct res_amf_readinessstatesetcallback *res_amf_readinessstatesetcallback;
struct res_amf_csisetcallback *res_amf_csisetcallback;
struct res_amf_csiremovecallback *res_amf_csiremovecallback;
struct res_amf_protectiongrouptrackcallback *res_amf_protectiongrouptrackcallback;
struct message_header **queue_msg;
struct message_header *msg;
struct res_lib_amf_healthcheckcallback *res_lib_amf_healthcheckcallback;
struct res_lib_amf_readinessstatesetcallback *res_lib_amf_readinessstatesetcallback;
struct res_lib_amf_csisetcallback *res_lib_amf_csisetcallback;
struct res_lib_amf_csiremovecallback *res_lib_amf_csiremovecallback;
struct res_lib_amf_protectiongrouptrackcallback *res_lib_amf_protectiongrouptrackcallback;
struct req_header **queue_msg;
struct req_header *msg;
int empty;
int ignore_dispatch = 0;
int cont = 1; /* always continue do loop except when set to 0 */
@ -274,13 +274,13 @@ saAmfDispatch (
* Queue empty, read response from socket
*/
error = saRecvRetry (amfInstance->fd, &dispatch_data.header,
sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL);
if (error != SA_OK) {
goto error_unlock;
}
if (dispatch_data.header.size > sizeof (struct message_header)) {
if (dispatch_data.header.size > sizeof (struct req_header)) {
error = saRecvRetry (amfInstance->fd, &dispatch_data.data,
dispatch_data.header.size - sizeof (struct message_header),
dispatch_data.header.size - sizeof (struct req_header),
MSG_WAITALL | MSG_NOSIGNAL);
if (error != SA_OK) {
goto error_unlock;
@ -313,54 +313,54 @@ saAmfDispatch (
break;
case MESSAGE_RES_AMF_HEALTHCHECKCALLBACK:
res_amf_healthcheckcallback = (struct res_amf_healthcheckcallback *)&dispatch_data;
res_lib_amf_healthcheckcallback = (struct res_lib_amf_healthcheckcallback *)&dispatch_data;
callbacks.saAmfHealthcheckCallback (
res_amf_healthcheckcallback->invocation,
&res_amf_healthcheckcallback->compName,
res_amf_healthcheckcallback->checkType);
res_lib_amf_healthcheckcallback->invocation,
&res_lib_amf_healthcheckcallback->compName,
res_lib_amf_healthcheckcallback->checkType);
break;
case MESSAGE_RES_AMF_READINESSSTATESETCALLBACK:
res_amf_readinessstatesetcallback = (struct res_amf_readinessstatesetcallback *)&dispatch_data;
res_lib_amf_readinessstatesetcallback = (struct res_lib_amf_readinessstatesetcallback *)&dispatch_data;
callbacks.saAmfReadinessStateSetCallback (
res_amf_readinessstatesetcallback->invocation,
&res_amf_readinessstatesetcallback->compName,
res_amf_readinessstatesetcallback->readinessState);
res_lib_amf_readinessstatesetcallback->invocation,
&res_lib_amf_readinessstatesetcallback->compName,
res_lib_amf_readinessstatesetcallback->readinessState);
break;
case MESSAGE_RES_AMF_CSISETCALLBACK:
res_amf_csisetcallback = (struct res_amf_csisetcallback *)&dispatch_data;
res_lib_amf_csisetcallback = (struct res_lib_amf_csisetcallback *)&dispatch_data;
callbacks.saAmfCSISetCallback (
res_amf_csisetcallback->invocation,
&res_amf_csisetcallback->compName,
&res_amf_csisetcallback->csiName,
res_amf_csisetcallback->csiFlags,
&res_amf_csisetcallback->haState,
&res_amf_csisetcallback->activeCompName,
res_amf_csisetcallback->transitionDescriptor);
res_lib_amf_csisetcallback->invocation,
&res_lib_amf_csisetcallback->compName,
&res_lib_amf_csisetcallback->csiName,
res_lib_amf_csisetcallback->csiFlags,
&res_lib_amf_csisetcallback->haState,
&res_lib_amf_csisetcallback->activeCompName,
res_lib_amf_csisetcallback->transitionDescriptor);
break;
case MESSAGE_RES_AMF_CSIREMOVECALLBACK:
res_amf_csiremovecallback = (struct res_amf_csiremovecallback *)&dispatch_data;
res_lib_amf_csiremovecallback = (struct res_lib_amf_csiremovecallback *)&dispatch_data;
callbacks.saAmfCSIRemoveCallback (
res_amf_csiremovecallback->invocation,
&res_amf_csiremovecallback->compName,
&res_amf_csiremovecallback->csiName,
&res_amf_csiremovecallback->csiFlags);
res_lib_amf_csiremovecallback->invocation,
&res_lib_amf_csiremovecallback->compName,
&res_lib_amf_csiremovecallback->csiName,
&res_lib_amf_csiremovecallback->csiFlags);
break;
case MESSAGE_RES_AMF_PROTECTIONGROUPTRACKCALLBACK:
res_amf_protectiongrouptrackcallback = (struct res_amf_protectiongrouptrackcallback *)&dispatch_data;
memcpy (res_amf_protectiongrouptrackcallback->notificationBufferAddress,
res_amf_protectiongrouptrackcallback->notificationBuffer,
res_amf_protectiongrouptrackcallback->numberOfItems * sizeof (SaAmfProtectionGroupNotificationT));
res_lib_amf_protectiongrouptrackcallback = (struct res_lib_amf_protectiongrouptrackcallback *)&dispatch_data;
memcpy (res_lib_amf_protectiongrouptrackcallback->notificationBufferAddress,
res_lib_amf_protectiongrouptrackcallback->notificationBuffer,
res_lib_amf_protectiongrouptrackcallback->numberOfItems * sizeof (SaAmfProtectionGroupNotificationT));
callbacks.saAmfProtectionGroupTrackCallback(
&res_amf_protectiongrouptrackcallback->csiName,
res_amf_protectiongrouptrackcallback->notificationBufferAddress,
res_amf_protectiongrouptrackcallback->numberOfItems,
res_amf_protectiongrouptrackcallback->numberOfMembers,
res_amf_protectiongrouptrackcallback->error);
&res_lib_amf_protectiongrouptrackcallback->csiName,
res_lib_amf_protectiongrouptrackcallback->notificationBufferAddress,
res_lib_amf_protectiongrouptrackcallback->numberOfItems,
res_lib_amf_protectiongrouptrackcallback->numberOfMembers,
res_lib_amf_protectiongrouptrackcallback->error);
break;
default:
@ -443,7 +443,6 @@ saAmfComponentRegister (
struct req_lib_amf_componentregister req_lib_amf_componentregister;
struct res_lib_amf_componentregister res_lib_amf_componentregister;
req_lib_amf_componentregister.header.magic = MESSAGE_MAGIC;
req_lib_amf_componentregister.header.size = sizeof (struct req_lib_amf_componentregister);
req_lib_amf_componentregister.header.id = MESSAGE_REQ_AMF_COMPONENTREGISTER;
memcpy (&req_lib_amf_componentregister.compName, compName, sizeof (SaNameT));
@ -477,12 +476,12 @@ saAmfComponentRegister (
goto error_unlock;
}
if (res_lib_amf_componentregister.error == SA_OK) {
if (res_lib_amf_componentregister.header.error == SA_OK) {
amfInstance->compRegistered = 1;
memcpy (&amfInstance->compName, compName, sizeof (SaNameT));
}
error = res_lib_amf_componentregister.error;
error = res_lib_amf_componentregister.header.error;
error_unlock:
pthread_mutex_unlock (&amfInstance->mutex);
@ -501,7 +500,6 @@ saAmfComponentUnregister (
struct amfInstance *amfInstance;
SaErrorT error;
req_lib_amf_componentunregister.header.magic = MESSAGE_MAGIC;
req_lib_amf_componentunregister.header.size = sizeof (struct req_lib_amf_componentunregister);
req_lib_amf_componentunregister.header.id = MESSAGE_REQ_AMF_COMPONENTUNREGISTER;
memcpy (&req_lib_amf_componentunregister.compName, compName, sizeof (SaNameT));
@ -536,10 +534,10 @@ saAmfComponentUnregister (
goto error_unlock;
}
if (res_lib_amf_componentunregister.error == SA_OK) {
if (res_lib_amf_componentunregister.header.error == SA_OK) {
amfInstance->compRegistered = 0;
}
error = res_lib_amf_componentunregister.error;
error = res_lib_amf_componentunregister.header.error;
error_unlock:
pthread_mutex_unlock (&amfInstance->mutex);
@ -583,13 +581,12 @@ saAmfReadinessStateGet (
int fd;
SaErrorT error;
struct req_amf_readinessstateget req_amf_readinessstateget;
struct res_amf_readinessstateget res_amf_readinessstateget;
struct res_lib_amf_readinessstateget res_lib_amf_readinessstateget;
error = saServiceConnect (&fd, MESSAGE_REQ_AMF_INIT);
if (error != SA_OK) {
goto exit_noclose;
}
req_amf_readinessstateget.header.magic = MESSAGE_MAGIC;
req_amf_readinessstateget.header.id = MESSAGE_RES_AMF_READINESSSTATEGET;
req_amf_readinessstateget.header.size = sizeof (struct req_amf_readinessstateget);
memcpy (&req_amf_readinessstateget.compName, compName, sizeof (SaNameT));
@ -600,12 +597,12 @@ saAmfReadinessStateGet (
goto exit_close;
}
error = saRecvRetry (fd, &res_amf_readinessstateget,
sizeof (struct res_amf_readinessstateget), MSG_WAITALL | MSG_NOSIGNAL);
error = saRecvRetry (fd, &res_lib_amf_readinessstateget,
sizeof (struct res_lib_amf_readinessstateget), MSG_WAITALL | MSG_NOSIGNAL);
if (error == SA_OK) {
memcpy (readinessState, &res_amf_readinessstateget.readinessState,
memcpy (readinessState, &res_lib_amf_readinessstateget.readinessState,
sizeof (SaAmfReadinessStateT));
error = res_amf_readinessstateget.error;
error = res_lib_amf_readinessstateget.header.error;
}
exit_close:
@ -627,7 +624,6 @@ saAmfStoppingComplete (
if (errorResult != SA_OK) {
goto exit_noclose;
}
req_amf_stoppingcomplete.header.magic = MESSAGE_MAGIC;
req_amf_stoppingcomplete.header.id = MESSAGE_REQ_AMF_STOPPINGCOMPLETE;
req_amf_stoppingcomplete.header.size = sizeof (struct req_amf_stoppingcomplete);
req_amf_stoppingcomplete.invocation = invocation;
@ -648,7 +644,7 @@ saAmfHAStateGet (
SaAmfHAStateT *haState) {
struct req_amf_hastateget req_amf_hastateget;
struct res_amf_hastateget res_amf_hastateget;
struct res_lib_amf_hastateget res_lib_amf_hastateget;
int fd;
SaErrorT error;
@ -656,7 +652,6 @@ saAmfHAStateGet (
if (error != SA_OK) {
goto exit_noclose;
}
req_amf_hastateget.header.magic = MESSAGE_MAGIC;
req_amf_hastateget.header.id = MESSAGE_REQ_AMF_HASTATEGET;
req_amf_hastateget.header.size = sizeof (struct req_amf_hastateget);
memcpy (&req_amf_hastateget.compName, compName, sizeof (SaNameT));
@ -668,11 +663,11 @@ saAmfHAStateGet (
goto exit_close;
}
error = saRecvRetry (fd, &res_amf_hastateget,
sizeof (struct res_amf_hastateget), MSG_WAITALL | MSG_NOSIGNAL);
error = saRecvRetry (fd, &res_lib_amf_hastateget,
sizeof (struct res_lib_amf_hastateget), MSG_WAITALL | MSG_NOSIGNAL);
if (error == SA_OK) {
memcpy (haState, &res_amf_hastateget.haState, sizeof (SaAmfHAStateT));
error = res_amf_hastateget.error;
memcpy (haState, &res_lib_amf_hastateget.haState, sizeof (SaAmfHAStateT));
error = res_lib_amf_hastateget.header.error;
}
exit_close:
@ -691,10 +686,9 @@ saAmfProtectionGroupTrackStart (
struct amfInstance *amfInstance;
struct req_amf_protectiongrouptrackstart req_amf_protectiongrouptrackstart;
struct res_amf_protectiongrouptrackstart res_amf_protectiongrouptrackstart;
struct res_lib_amf_protectiongrouptrackstart res_lib_amf_protectiongrouptrackstart;
SaErrorT error;
req_amf_protectiongrouptrackstart.header.magic = MESSAGE_MAGIC;
req_amf_protectiongrouptrackstart.header.size = sizeof (struct req_amf_protectiongrouptrackstart);
req_amf_protectiongrouptrackstart.header.id = MESSAGE_REQ_AMF_PROTECTIONGROUPTRACKSTART;
memcpy (&req_amf_protectiongrouptrackstart.csiName, csiName, sizeof (SaNameT));
@ -715,10 +709,10 @@ saAmfProtectionGroupTrackStart (
goto error_unlock;
}
error = saRecvQueue (amfInstance->fd, &res_amf_protectiongrouptrackstart,
error = saRecvQueue (amfInstance->fd, &res_lib_amf_protectiongrouptrackstart,
&amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTART);
error = res_amf_protectiongrouptrackstart.error;
error = res_lib_amf_protectiongrouptrackstart.header.error;
error_unlock:
pthread_mutex_unlock (&amfInstance->mutex);
@ -733,10 +727,9 @@ saAmfProtectionGroupTrackStop (
struct amfInstance *amfInstance;
struct req_amf_protectiongrouptrackstop req_amf_protectiongrouptrackstop;
struct res_amf_protectiongrouptrackstop res_amf_protectiongrouptrackstop;
struct res_lib_amf_protectiongrouptrackstop res_lib_amf_protectiongrouptrackstop;
SaErrorT error;
req_amf_protectiongrouptrackstop.header.magic = MESSAGE_MAGIC;
req_amf_protectiongrouptrackstop.header.size = sizeof (struct req_amf_protectiongrouptrackstop);
req_amf_protectiongrouptrackstop.header.id = MESSAGE_REQ_AMF_PROTECTIONGROUPTRACKSTOP;
memcpy (&req_amf_protectiongrouptrackstop.csiName, csiName, sizeof (SaNameT));
@ -754,10 +747,10 @@ saAmfProtectionGroupTrackStop (
goto error_unlock;
}
error = saRecvQueue (amfInstance->fd, &res_amf_protectiongrouptrackstop,
error = saRecvQueue (amfInstance->fd, &res_lib_amf_protectiongrouptrackstop,
&amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTOP);
error = res_amf_protectiongrouptrackstop.error;
error = res_lib_amf_protectiongrouptrackstop.header.error;
error_unlock:
pthread_mutex_unlock (&amfInstance->mutex);
@ -782,7 +775,6 @@ saAmfErrorReport (
if (error != SA_OK) {
goto exit_noclose;
}
req_lib_amf_errorreport.header.magic = MESSAGE_MAGIC;
req_lib_amf_errorreport.header.id = MESSAGE_REQ_AMF_ERRORREPORT;
req_lib_amf_errorreport.header.size = sizeof (struct req_lib_amf_errorreport);
memcpy (&req_lib_amf_errorreport.reportingComponent, reportingComponent, sizeof (SaNameT));
@ -807,7 +799,7 @@ saAmfErrorReport (
goto exit_close;
}
error = res_lib_amf_errorreport.error;
error = res_lib_amf_errorreport.header.error;
exit_close:
close (fd);
@ -828,7 +820,6 @@ saAmfErrorCancelAll (
if (error != SA_OK) {
goto exit_noclose;
}
req_lib_amf_errorcancelall.header.magic = MESSAGE_MAGIC;
req_lib_amf_errorcancelall.header.id = MESSAGE_REQ_AMF_ERRORCANCELALL;
req_lib_amf_errorcancelall.header.size = sizeof (struct req_lib_amf_errorcancelall);
memcpy (&req_lib_amf_errorcancelall.compName, compName, sizeof (SaNameT));
@ -845,7 +836,7 @@ saAmfErrorCancelAll (
goto exit_close;
}
error = res_lib_amf_errorcancelall.error;
error = res_lib_amf_errorcancelall.header.error;
exit_close:
close (fd);
@ -862,13 +853,12 @@ saAmfComponentCapabilityModelGet (
int fd;
SaErrorT error;
struct req_amf_componentcapabilitymodelget req_amf_componentcapabilitymodelget;
struct res_amf_componentcapabilitymodelget res_amf_componentcapabilitymodelget;
struct res_lib_amf_componentcapabilitymodelget res_lib_amf_componentcapabilitymodelget;
error = saServiceConnect (&fd, MESSAGE_REQ_AMF_INIT);
if (error != SA_OK) {
goto exit_noclose;
}
req_amf_componentcapabilitymodelget.header.magic = MESSAGE_MAGIC;
req_amf_componentcapabilitymodelget.header.id = MESSAGE_REQ_AMF_COMPONENTCAPABILITYMODELGET;
req_amf_componentcapabilitymodelget.header.size = sizeof (struct req_amf_componentcapabilitymodelget);
memcpy (&req_amf_componentcapabilitymodelget.compName, compName, sizeof (SaNameT));
@ -879,13 +869,13 @@ saAmfComponentCapabilityModelGet (
goto exit_close;
}
error = saRecvRetry (fd, &res_amf_componentcapabilitymodelget,
sizeof (struct res_amf_componentcapabilitymodelget), MSG_WAITALL | MSG_NOSIGNAL);
error = saRecvRetry (fd, &res_lib_amf_componentcapabilitymodelget,
sizeof (struct res_lib_amf_componentcapabilitymodelget), MSG_WAITALL | MSG_NOSIGNAL);
if (error == SA_OK) {
memcpy (componentCapabilityModel,
&res_amf_componentcapabilitymodelget.componentCapabilityModel,
&res_lib_amf_componentcapabilitymodelget.componentCapabilityModel,
sizeof (SaAmfComponentCapabilityModelT));
error = res_amf_componentcapabilitymodelget.error;
error = res_lib_amf_componentcapabilitymodelget.header.error;
}
exit_close:
@ -916,7 +906,6 @@ saAmfResponse (
if (errorResult != SA_OK) {
goto exit_noclose;
}
req_amf_response.header.magic = MESSAGE_MAGIC;
req_amf_response.header.id = MESSAGE_REQ_AMF_RESPONSE;
req_amf_response.header.size = sizeof (struct req_amf_response);
req_amf_response.invocation = invocation;