From f3cf1df94c6d20ef858777ff0124f3dc23a30765 Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Fri, 20 Aug 2004 22:18:34 +0000 Subject: [PATCH] Add flow control to amf. (Logical change 1.55) git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@178 fd59a12c-fef9-0310-b244-a6a79926bd2f --- lib/amf.c | 147 +++++++++++++++++++++++++----------------------------- 1 file changed, 68 insertions(+), 79 deletions(-) diff --git a/lib/amf.c b/lib/amf.c index 9840c7fd..f7cfff4d 100644 --- a/lib/amf.c +++ b/lib/amf.c @@ -51,7 +51,7 @@ #include "util.h" struct message_overlay { - struct message_header header; + struct req_header header; char data[4096]; }; @@ -197,13 +197,13 @@ saAmfDispatch ( int dispatch_avail; struct amfInstance *amfInstance; SaAmfCallbacksT callbacks; - struct res_amf_healthcheckcallback *res_amf_healthcheckcallback; - struct res_amf_readinessstatesetcallback *res_amf_readinessstatesetcallback; - struct res_amf_csisetcallback *res_amf_csisetcallback; - struct res_amf_csiremovecallback *res_amf_csiremovecallback; - struct res_amf_protectiongrouptrackcallback *res_amf_protectiongrouptrackcallback; - struct message_header **queue_msg; - struct message_header *msg; + struct res_lib_amf_healthcheckcallback *res_lib_amf_healthcheckcallback; + struct res_lib_amf_readinessstatesetcallback *res_lib_amf_readinessstatesetcallback; + struct res_lib_amf_csisetcallback *res_lib_amf_csisetcallback; + struct res_lib_amf_csiremovecallback *res_lib_amf_csiremovecallback; + struct res_lib_amf_protectiongrouptrackcallback *res_lib_amf_protectiongrouptrackcallback; + struct req_header **queue_msg; + struct req_header *msg; int empty; int ignore_dispatch = 0; int cont = 1; /* always continue do loop except when set to 0 */ @@ -274,13 +274,13 @@ saAmfDispatch ( * Queue empty, read response from socket */ error = saRecvRetry (amfInstance->fd, &dispatch_data.header, - sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL); + sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL); if (error != SA_OK) { goto error_unlock; } - if (dispatch_data.header.size > sizeof (struct message_header)) { + if (dispatch_data.header.size > sizeof (struct req_header)) { error = saRecvRetry (amfInstance->fd, &dispatch_data.data, - dispatch_data.header.size - sizeof (struct message_header), + dispatch_data.header.size - sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL); if (error != SA_OK) { goto error_unlock; @@ -313,54 +313,54 @@ saAmfDispatch ( break; case MESSAGE_RES_AMF_HEALTHCHECKCALLBACK: - res_amf_healthcheckcallback = (struct res_amf_healthcheckcallback *)&dispatch_data; + res_lib_amf_healthcheckcallback = (struct res_lib_amf_healthcheckcallback *)&dispatch_data; callbacks.saAmfHealthcheckCallback ( - res_amf_healthcheckcallback->invocation, - &res_amf_healthcheckcallback->compName, - res_amf_healthcheckcallback->checkType); + res_lib_amf_healthcheckcallback->invocation, + &res_lib_amf_healthcheckcallback->compName, + res_lib_amf_healthcheckcallback->checkType); break; case MESSAGE_RES_AMF_READINESSSTATESETCALLBACK: - res_amf_readinessstatesetcallback = (struct res_amf_readinessstatesetcallback *)&dispatch_data; + res_lib_amf_readinessstatesetcallback = (struct res_lib_amf_readinessstatesetcallback *)&dispatch_data; callbacks.saAmfReadinessStateSetCallback ( - res_amf_readinessstatesetcallback->invocation, - &res_amf_readinessstatesetcallback->compName, - res_amf_readinessstatesetcallback->readinessState); + res_lib_amf_readinessstatesetcallback->invocation, + &res_lib_amf_readinessstatesetcallback->compName, + res_lib_amf_readinessstatesetcallback->readinessState); break; case MESSAGE_RES_AMF_CSISETCALLBACK: - res_amf_csisetcallback = (struct res_amf_csisetcallback *)&dispatch_data; + res_lib_amf_csisetcallback = (struct res_lib_amf_csisetcallback *)&dispatch_data; callbacks.saAmfCSISetCallback ( - res_amf_csisetcallback->invocation, - &res_amf_csisetcallback->compName, - &res_amf_csisetcallback->csiName, - res_amf_csisetcallback->csiFlags, - &res_amf_csisetcallback->haState, - &res_amf_csisetcallback->activeCompName, - res_amf_csisetcallback->transitionDescriptor); + res_lib_amf_csisetcallback->invocation, + &res_lib_amf_csisetcallback->compName, + &res_lib_amf_csisetcallback->csiName, + res_lib_amf_csisetcallback->csiFlags, + &res_lib_amf_csisetcallback->haState, + &res_lib_amf_csisetcallback->activeCompName, + res_lib_amf_csisetcallback->transitionDescriptor); break; case MESSAGE_RES_AMF_CSIREMOVECALLBACK: - res_amf_csiremovecallback = (struct res_amf_csiremovecallback *)&dispatch_data; + res_lib_amf_csiremovecallback = (struct res_lib_amf_csiremovecallback *)&dispatch_data; callbacks.saAmfCSIRemoveCallback ( - res_amf_csiremovecallback->invocation, - &res_amf_csiremovecallback->compName, - &res_amf_csiremovecallback->csiName, - &res_amf_csiremovecallback->csiFlags); + res_lib_amf_csiremovecallback->invocation, + &res_lib_amf_csiremovecallback->compName, + &res_lib_amf_csiremovecallback->csiName, + &res_lib_amf_csiremovecallback->csiFlags); break; case MESSAGE_RES_AMF_PROTECTIONGROUPTRACKCALLBACK: - res_amf_protectiongrouptrackcallback = (struct res_amf_protectiongrouptrackcallback *)&dispatch_data; - memcpy (res_amf_protectiongrouptrackcallback->notificationBufferAddress, - res_amf_protectiongrouptrackcallback->notificationBuffer, - res_amf_protectiongrouptrackcallback->numberOfItems * sizeof (SaAmfProtectionGroupNotificationT)); + res_lib_amf_protectiongrouptrackcallback = (struct res_lib_amf_protectiongrouptrackcallback *)&dispatch_data; + memcpy (res_lib_amf_protectiongrouptrackcallback->notificationBufferAddress, + res_lib_amf_protectiongrouptrackcallback->notificationBuffer, + res_lib_amf_protectiongrouptrackcallback->numberOfItems * sizeof (SaAmfProtectionGroupNotificationT)); callbacks.saAmfProtectionGroupTrackCallback( - &res_amf_protectiongrouptrackcallback->csiName, - res_amf_protectiongrouptrackcallback->notificationBufferAddress, - res_amf_protectiongrouptrackcallback->numberOfItems, - res_amf_protectiongrouptrackcallback->numberOfMembers, - res_amf_protectiongrouptrackcallback->error); + &res_lib_amf_protectiongrouptrackcallback->csiName, + res_lib_amf_protectiongrouptrackcallback->notificationBufferAddress, + res_lib_amf_protectiongrouptrackcallback->numberOfItems, + res_lib_amf_protectiongrouptrackcallback->numberOfMembers, + res_lib_amf_protectiongrouptrackcallback->error); break; default: @@ -443,7 +443,6 @@ saAmfComponentRegister ( struct req_lib_amf_componentregister req_lib_amf_componentregister; struct res_lib_amf_componentregister res_lib_amf_componentregister; - req_lib_amf_componentregister.header.magic = MESSAGE_MAGIC; req_lib_amf_componentregister.header.size = sizeof (struct req_lib_amf_componentregister); req_lib_amf_componentregister.header.id = MESSAGE_REQ_AMF_COMPONENTREGISTER; memcpy (&req_lib_amf_componentregister.compName, compName, sizeof (SaNameT)); @@ -477,12 +476,12 @@ saAmfComponentRegister ( goto error_unlock; } - if (res_lib_amf_componentregister.error == SA_OK) { + if (res_lib_amf_componentregister.header.error == SA_OK) { amfInstance->compRegistered = 1; memcpy (&amfInstance->compName, compName, sizeof (SaNameT)); } - error = res_lib_amf_componentregister.error; + error = res_lib_amf_componentregister.header.error; error_unlock: pthread_mutex_unlock (&amfInstance->mutex); @@ -501,7 +500,6 @@ saAmfComponentUnregister ( struct amfInstance *amfInstance; SaErrorT error; - req_lib_amf_componentunregister.header.magic = MESSAGE_MAGIC; req_lib_amf_componentunregister.header.size = sizeof (struct req_lib_amf_componentunregister); req_lib_amf_componentunregister.header.id = MESSAGE_REQ_AMF_COMPONENTUNREGISTER; memcpy (&req_lib_amf_componentunregister.compName, compName, sizeof (SaNameT)); @@ -536,10 +534,10 @@ saAmfComponentUnregister ( goto error_unlock; } - if (res_lib_amf_componentunregister.error == SA_OK) { + if (res_lib_amf_componentunregister.header.error == SA_OK) { amfInstance->compRegistered = 0; } - error = res_lib_amf_componentunregister.error; + error = res_lib_amf_componentunregister.header.error; error_unlock: pthread_mutex_unlock (&amfInstance->mutex); @@ -583,13 +581,12 @@ saAmfReadinessStateGet ( int fd; SaErrorT error; struct req_amf_readinessstateget req_amf_readinessstateget; - struct res_amf_readinessstateget res_amf_readinessstateget; + struct res_lib_amf_readinessstateget res_lib_amf_readinessstateget; error = saServiceConnect (&fd, MESSAGE_REQ_AMF_INIT); if (error != SA_OK) { goto exit_noclose; } - req_amf_readinessstateget.header.magic = MESSAGE_MAGIC; req_amf_readinessstateget.header.id = MESSAGE_RES_AMF_READINESSSTATEGET; req_amf_readinessstateget.header.size = sizeof (struct req_amf_readinessstateget); memcpy (&req_amf_readinessstateget.compName, compName, sizeof (SaNameT)); @@ -600,12 +597,12 @@ saAmfReadinessStateGet ( goto exit_close; } - error = saRecvRetry (fd, &res_amf_readinessstateget, - sizeof (struct res_amf_readinessstateget), MSG_WAITALL | MSG_NOSIGNAL); + error = saRecvRetry (fd, &res_lib_amf_readinessstateget, + sizeof (struct res_lib_amf_readinessstateget), MSG_WAITALL | MSG_NOSIGNAL); if (error == SA_OK) { - memcpy (readinessState, &res_amf_readinessstateget.readinessState, + memcpy (readinessState, &res_lib_amf_readinessstateget.readinessState, sizeof (SaAmfReadinessStateT)); - error = res_amf_readinessstateget.error; + error = res_lib_amf_readinessstateget.header.error; } exit_close: @@ -627,7 +624,6 @@ saAmfStoppingComplete ( if (errorResult != SA_OK) { goto exit_noclose; } - req_amf_stoppingcomplete.header.magic = MESSAGE_MAGIC; req_amf_stoppingcomplete.header.id = MESSAGE_REQ_AMF_STOPPINGCOMPLETE; req_amf_stoppingcomplete.header.size = sizeof (struct req_amf_stoppingcomplete); req_amf_stoppingcomplete.invocation = invocation; @@ -648,7 +644,7 @@ saAmfHAStateGet ( SaAmfHAStateT *haState) { struct req_amf_hastateget req_amf_hastateget; - struct res_amf_hastateget res_amf_hastateget; + struct res_lib_amf_hastateget res_lib_amf_hastateget; int fd; SaErrorT error; @@ -656,7 +652,6 @@ saAmfHAStateGet ( if (error != SA_OK) { goto exit_noclose; } - req_amf_hastateget.header.magic = MESSAGE_MAGIC; req_amf_hastateget.header.id = MESSAGE_REQ_AMF_HASTATEGET; req_amf_hastateget.header.size = sizeof (struct req_amf_hastateget); memcpy (&req_amf_hastateget.compName, compName, sizeof (SaNameT)); @@ -668,11 +663,11 @@ saAmfHAStateGet ( goto exit_close; } - error = saRecvRetry (fd, &res_amf_hastateget, - sizeof (struct res_amf_hastateget), MSG_WAITALL | MSG_NOSIGNAL); + error = saRecvRetry (fd, &res_lib_amf_hastateget, + sizeof (struct res_lib_amf_hastateget), MSG_WAITALL | MSG_NOSIGNAL); if (error == SA_OK) { - memcpy (haState, &res_amf_hastateget.haState, sizeof (SaAmfHAStateT)); - error = res_amf_hastateget.error; + memcpy (haState, &res_lib_amf_hastateget.haState, sizeof (SaAmfHAStateT)); + error = res_lib_amf_hastateget.header.error; } exit_close: @@ -691,10 +686,9 @@ saAmfProtectionGroupTrackStart ( struct amfInstance *amfInstance; struct req_amf_protectiongrouptrackstart req_amf_protectiongrouptrackstart; - struct res_amf_protectiongrouptrackstart res_amf_protectiongrouptrackstart; + struct res_lib_amf_protectiongrouptrackstart res_lib_amf_protectiongrouptrackstart; SaErrorT error; - req_amf_protectiongrouptrackstart.header.magic = MESSAGE_MAGIC; req_amf_protectiongrouptrackstart.header.size = sizeof (struct req_amf_protectiongrouptrackstart); req_amf_protectiongrouptrackstart.header.id = MESSAGE_REQ_AMF_PROTECTIONGROUPTRACKSTART; memcpy (&req_amf_protectiongrouptrackstart.csiName, csiName, sizeof (SaNameT)); @@ -715,10 +709,10 @@ saAmfProtectionGroupTrackStart ( goto error_unlock; } - error = saRecvQueue (amfInstance->fd, &res_amf_protectiongrouptrackstart, + error = saRecvQueue (amfInstance->fd, &res_lib_amf_protectiongrouptrackstart, &amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTART); - error = res_amf_protectiongrouptrackstart.error; + error = res_lib_amf_protectiongrouptrackstart.header.error; error_unlock: pthread_mutex_unlock (&amfInstance->mutex); @@ -733,10 +727,9 @@ saAmfProtectionGroupTrackStop ( struct amfInstance *amfInstance; struct req_amf_protectiongrouptrackstop req_amf_protectiongrouptrackstop; - struct res_amf_protectiongrouptrackstop res_amf_protectiongrouptrackstop; + struct res_lib_amf_protectiongrouptrackstop res_lib_amf_protectiongrouptrackstop; SaErrorT error; - req_amf_protectiongrouptrackstop.header.magic = MESSAGE_MAGIC; req_amf_protectiongrouptrackstop.header.size = sizeof (struct req_amf_protectiongrouptrackstop); req_amf_protectiongrouptrackstop.header.id = MESSAGE_REQ_AMF_PROTECTIONGROUPTRACKSTOP; memcpy (&req_amf_protectiongrouptrackstop.csiName, csiName, sizeof (SaNameT)); @@ -754,10 +747,10 @@ saAmfProtectionGroupTrackStop ( goto error_unlock; } - error = saRecvQueue (amfInstance->fd, &res_amf_protectiongrouptrackstop, + error = saRecvQueue (amfInstance->fd, &res_lib_amf_protectiongrouptrackstop, &amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTOP); - error = res_amf_protectiongrouptrackstop.error; + error = res_lib_amf_protectiongrouptrackstop.header.error; error_unlock: pthread_mutex_unlock (&amfInstance->mutex); @@ -782,7 +775,6 @@ saAmfErrorReport ( if (error != SA_OK) { goto exit_noclose; } - req_lib_amf_errorreport.header.magic = MESSAGE_MAGIC; req_lib_amf_errorreport.header.id = MESSAGE_REQ_AMF_ERRORREPORT; req_lib_amf_errorreport.header.size = sizeof (struct req_lib_amf_errorreport); memcpy (&req_lib_amf_errorreport.reportingComponent, reportingComponent, sizeof (SaNameT)); @@ -807,7 +799,7 @@ saAmfErrorReport ( goto exit_close; } - error = res_lib_amf_errorreport.error; + error = res_lib_amf_errorreport.header.error; exit_close: close (fd); @@ -828,7 +820,6 @@ saAmfErrorCancelAll ( if (error != SA_OK) { goto exit_noclose; } - req_lib_amf_errorcancelall.header.magic = MESSAGE_MAGIC; req_lib_amf_errorcancelall.header.id = MESSAGE_REQ_AMF_ERRORCANCELALL; req_lib_amf_errorcancelall.header.size = sizeof (struct req_lib_amf_errorcancelall); memcpy (&req_lib_amf_errorcancelall.compName, compName, sizeof (SaNameT)); @@ -845,7 +836,7 @@ saAmfErrorCancelAll ( goto exit_close; } - error = res_lib_amf_errorcancelall.error; + error = res_lib_amf_errorcancelall.header.error; exit_close: close (fd); @@ -862,13 +853,12 @@ saAmfComponentCapabilityModelGet ( int fd; SaErrorT error; struct req_amf_componentcapabilitymodelget req_amf_componentcapabilitymodelget; - struct res_amf_componentcapabilitymodelget res_amf_componentcapabilitymodelget; + struct res_lib_amf_componentcapabilitymodelget res_lib_amf_componentcapabilitymodelget; error = saServiceConnect (&fd, MESSAGE_REQ_AMF_INIT); if (error != SA_OK) { goto exit_noclose; } - req_amf_componentcapabilitymodelget.header.magic = MESSAGE_MAGIC; req_amf_componentcapabilitymodelget.header.id = MESSAGE_REQ_AMF_COMPONENTCAPABILITYMODELGET; req_amf_componentcapabilitymodelget.header.size = sizeof (struct req_amf_componentcapabilitymodelget); memcpy (&req_amf_componentcapabilitymodelget.compName, compName, sizeof (SaNameT)); @@ -879,13 +869,13 @@ saAmfComponentCapabilityModelGet ( goto exit_close; } - error = saRecvRetry (fd, &res_amf_componentcapabilitymodelget, - sizeof (struct res_amf_componentcapabilitymodelget), MSG_WAITALL | MSG_NOSIGNAL); + error = saRecvRetry (fd, &res_lib_amf_componentcapabilitymodelget, + sizeof (struct res_lib_amf_componentcapabilitymodelget), MSG_WAITALL | MSG_NOSIGNAL); if (error == SA_OK) { memcpy (componentCapabilityModel, - &res_amf_componentcapabilitymodelget.componentCapabilityModel, + &res_lib_amf_componentcapabilitymodelget.componentCapabilityModel, sizeof (SaAmfComponentCapabilityModelT)); - error = res_amf_componentcapabilitymodelget.error; + error = res_lib_amf_componentcapabilitymodelget.header.error; } exit_close: @@ -916,7 +906,6 @@ saAmfResponse ( if (errorResult != SA_OK) { goto exit_noclose; } - req_amf_response.header.magic = MESSAGE_MAGIC; req_amf_response.header.id = MESSAGE_REQ_AMF_RESPONSE; req_amf_response.header.size = sizeof (struct req_amf_response); req_amf_response.invocation = invocation;