enhancement 989

improve process group membership interface in totem_pg


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@869 fd59a12c-fef9-0310-b244-a6a79926bd2f
This commit is contained in:
Steven Dake 2005-12-27 18:36:38 +00:00
parent 1020c951b3
commit 90c88404cd
11 changed files with 561 additions and 129 deletions

View File

@ -532,7 +532,7 @@ static void component_unregister (
iovecs[0].iov_base = (char *)&req_exec_amf_componentunregister;
iovecs[0].iov_len = sizeof (req_exec_amf_componentunregister);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
static void component_register (
@ -569,7 +569,7 @@ static void component_register (
iovecs[0].iov_base = (char *)&req_exec_amf_componentregister;
iovecs[0].iov_len = sizeof (req_exec_amf_componentregister);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
/***
@ -770,7 +770,7 @@ static void ha_state_group_set (
iovecs[0].iov_base = (char *)&req_exec_amf_hastateset;
iovecs[0].iov_len = sizeof (req_exec_amf_hastateset);
totempg_mcast (iovecs, 1, TOTEMPG_AGREED);
totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED);
}
void readiness_state_api_set (struct saAmfComponent *component,
@ -839,7 +839,7 @@ static void readiness_state_group_set (
iovecs[0].iov_base = (char *)&req_exec_amf_readinessstateset;
iovecs[0].iov_len = sizeof (req_exec_amf_readinessstateset);
totempg_mcast (iovecs, 1, TOTEMPG_AGREED);
totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED);
}
static void dsmDisabledUnlockedRegisteredOrErrorCancel (
@ -1463,7 +1463,7 @@ void error_report (
iovecs[0].iov_base = (char *)&req_exec_amf_errorreport;
iovecs[0].iov_len = sizeof (req_exec_amf_errorreport);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
int healthcheck_instance = 0;
@ -2410,7 +2410,7 @@ static int message_handler_req_amf_componentregister (struct conn_info *conn_inf
iovecs[0].iov_base = (char *)&req_exec_amf_componentregister;
iovecs[0].iov_len = sizeof (req_exec_amf_componentregister);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -2439,7 +2439,7 @@ static int message_handler_req_amf_componentunregister (struct conn_info *conn_i
iovecs[0].iov_base = (char *)&req_exec_amf_componentunregister;
iovecs[0].iov_len = sizeof (req_exec_amf_componentunregister);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -2620,7 +2620,7 @@ static int message_handler_req_amf_errorreport (struct conn_info *conn_info, voi
// iovecs[1].iov_base = (char *)&req_lib_amf_errorreport;
// iovecs[1].iov_len = sizeof (req_lib_amf_errorreport);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -2650,7 +2650,7 @@ static int message_handler_req_amf_errorcancelall (struct conn_info *conn_info,
// iovecs[1].iov_base = (char *)&req_lib_amf_errorcancelall;
// iovecs[1].iov_len = sizeof (req_lib_amf_errorcancelall);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}

View File

@ -755,8 +755,8 @@ static int ckpt_recovery_process (void)
* Check to see if we can queue the new message and if you can
* then mcast the message else break and create callback.
*/
if (totempg_send_ok(iovecs[0].iov_len + iovecs[1].iov_len)){
assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0);
if (totempg_groups_send_ok_joined (openais_group_handle, iovecs, 2)) {
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
log_printf (LOG_LEVEL_DEBUG, "CKPT: Multicasted Sync State Message.\n");
}
else {
@ -829,8 +829,8 @@ static int ckpt_recovery_process (void)
* Check to see if we can queue the new message and if you can
* then mcast the message else break and create callback.
*/
if (totempg_send_ok(iovecs[0].iov_len + iovecs[1].iov_len + iovecs[2].iov_len)){
assert (totempg_mcast (iovecs, 3, TOTEMPG_AGREED) == 0);
if (totempg_groups_send_ok_joined (openais_group_handle, iovecs, 2)) {
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 3, TOTEMPG_AGREED) == 0);
log_printf (LOG_LEVEL_DEBUG, "CKPT: Multicasted Sync Section Message.\n");
}
else {
@ -1284,8 +1284,8 @@ int ckpt_checkpoint_close (struct saCkptCheckpoint *checkpoint) {
iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointclose;
iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointclose);
if (totempg_send_ok (sizeof (struct req_exec_ckpt_checkpointclose))) {
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
if (totempg_groups_send_ok_joined (openais_group_handle, iovecs, 1)) {
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -1833,7 +1833,7 @@ void timer_function_retention (void *data)
iovec.iov_base = (char *)&req_exec_ckpt_checkpointretentiondurationexpire;
iovec.iov_len = sizeof (req_exec_ckpt_checkpointretentiondurationexpire);
assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
}
extern int message_handler_req_exec_ckpt_checkpointclose (void *message, struct totem_ip_address *source_addr, int endian_conversion_required)
@ -2021,7 +2021,7 @@ static int message_handler_req_exec_ckpt_checkpointretentiondurationexpire (void
iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointunlink;
iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointunlink);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
return (0);
}
@ -2802,7 +2802,7 @@ static int message_handler_req_lib_ckpt_checkpointopen (struct conn_info *conn_i
iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointopen;
iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointopen);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -2832,7 +2832,7 @@ static int message_handler_req_lib_ckpt_checkpointopenasync (struct conn_info *c
iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointopen;
iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointopen);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -2859,8 +2859,8 @@ static int message_handler_req_lib_ckpt_checkpointclose (struct conn_info *conn_
iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointclose;
iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointclose);
if (totempg_send_ok (sizeof (struct req_exec_ckpt_checkpointclose))) {
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
if (totempg_groups_send_ok_joined (openais_group_handle, iovecs, 1)) {
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
}
else {
@ -2896,7 +2896,7 @@ static int message_handler_req_lib_ckpt_checkpointunlink (struct conn_info *conn
iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointunlink;
iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointunlink);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -2921,7 +2921,7 @@ static int message_handler_req_lib_ckpt_checkpointretentiondurationset (struct c
iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointretentiondurationset;
iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointretentiondurationset);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -3070,9 +3070,9 @@ printf ("|\n");
#endif
if (iovecs[1].iov_len > 0) {
log_printf (LOG_LEVEL_DEBUG, "IOV_BASE is %p\n", iovecs[1].iov_base);
assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
} else {
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
}
@ -3123,9 +3123,9 @@ static int message_handler_req_lib_ckpt_sectiondelete (struct conn_info *conn_in
req_exec_ckpt_sectiondelete.header.size += iovecs[1].iov_len;
if (iovecs[1].iov_len > 0) {
assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
} else {
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
return (0);
@ -3163,9 +3163,9 @@ static int message_handler_req_lib_ckpt_sectionexpirationtimeset (struct conn_in
if (iovecs[1].iov_len > 0) {
log_printf (LOG_LEVEL_DEBUG, "IOV_BASE is %p\n", iovecs[1].iov_base);
assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
} else {
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
return (0);
@ -3218,9 +3218,9 @@ static int message_handler_req_lib_ckpt_sectionwrite (struct conn_info *conn_inf
req_exec_ckpt_sectionwrite.header.size += iovecs[1].iov_len;
if (iovecs[1].iov_len > 0) {
assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
} else {
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
}
else {
@ -3276,9 +3276,9 @@ static int message_handler_req_lib_ckpt_sectionoverwrite (struct conn_info *conn
req_exec_ckpt_sectionoverwrite.header.size += iovecs[1].iov_len;
if (iovecs[1].iov_len > 0) {
assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
} else {
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
}
else {
@ -3340,9 +3340,9 @@ static int message_handler_req_lib_ckpt_sectionread (struct conn_info *conn_info
req_exec_ckpt_sectionread.header.size += iovecs[1].iov_len;
if (iovecs[1].iov_len > 0) {
assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0);
} else {
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
return (0); /* DO NOT REMOVE */
}

View File

@ -386,7 +386,7 @@ static int clm_nodejoin_send (void)
req_exec_clm_iovec.iov_base = (char *)&req_exec_clm_nodejoin;
req_exec_clm_iovec.iov_len = sizeof (req_exec_clm_nodejoin);
result = totempg_mcast (&req_exec_clm_iovec, 1, TOTEMPG_AGREED);
result = totempg_groups_mcast_joined (openais_group_handle, &req_exec_clm_iovec, 1, TOTEMPG_AGREED);
return (result);
}

View File

@ -340,9 +340,9 @@ static int message_handler_req_evs_mcast_joined (struct conn_info *conn_info, vo
req_exec_evs_mcast_iovec[2].iov_base = &req_lib_evs_mcast_joined->msg;
req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_joined->msg_len;
// TODO this doesn't seem to work for some reason
send_ok = totempg_send_ok (req_lib_evs_mcast_joined->msg_len);
send_ok = totempg_groups_send_ok_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3);
res = totempg_mcast (req_exec_evs_mcast_iovec, 3, TOTEMPG_AGREED);
res = totempg_groups_mcast_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3, TOTEMPG_AGREED);
// TODO
if (res == 0) {
error = EVS_OK;
@ -389,8 +389,8 @@ static int message_handler_req_evs_mcast_groups (struct conn_info *conn_info, vo
req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_groups->msg_len;
// TODO this is wacky
send_ok = totempg_send_ok (req_lib_evs_mcast_groups->msg_len);
res = totempg_mcast (req_exec_evs_mcast_iovec, 3, TOTEMPG_AGREED);
send_ok = totempg_groups_send_ok_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3);
res = totempg_groups_mcast_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3, TOTEMPG_AGREED);
if (res == 0) {
error = EVS_OK;
}

View File

@ -1115,7 +1115,7 @@ static SaErrorT evt_open_channel(SaNameT *cn, SaUint8T flgs)
chn_iovec.iov_base = &cpkt;
chn_iovec.iov_len = cpkt.chc_head.size;
log_printf(CHAN_OPEN_DEBUG, "evt_open_channel: Send open mcast\n");
res = totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED);
res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1, TOTEMPG_AGREED);
log_printf(CHAN_OPEN_DEBUG, "evt_open_channel: Open mcast result: %d\n",
res);
if (res != 0) {
@ -1151,7 +1151,7 @@ static SaErrorT evt_close_channel(SaNameT *cn, uint64_t unlink_id)
cpkt.u.chcu.chcu_unlink_id = unlink_id;
chn_iovec.iov_base = &cpkt;
chn_iovec.iov_len = cpkt.chc_head.size;
res = totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED);
res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1, TOTEMPG_AGREED);
if (res != 0) {
ret = SA_AIS_ERR_LIBRARY;
}
@ -2338,7 +2338,7 @@ static int lib_evt_unlink_channel(struct conn_info *conn_info, void *message)
cpkt.u.chcu.chcu_unlink_id = ucp->ucp_unlink_id;
chn_iovec.iov_base = &cpkt;
chn_iovec.iov_len = cpkt.chc_head.size;
if (totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED) == 0) {
if (totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1, TOTEMPG_AGREED) == 0) {
return 0;
}
@ -2609,7 +2609,7 @@ static int lib_evt_event_publish(struct conn_info *conn_info, void *message)
*/
pub_iovec.iov_base = req;
pub_iovec.iov_len = req->led_head.size;
result = totempg_mcast (&pub_iovec, 1, TOTEMPG_AGREED);
result = totempg_groups_mcast_joined (openais_group_handle, &pub_iovec, 1, TOTEMPG_AGREED);
if (result != 0) {
error = SA_AIS_ERR_LIBRARY;
}
@ -2670,7 +2670,7 @@ static int lib_evt_event_clear_retentiontime(struct conn_info *conn_info,
cpkt.u.chc_event_id = req->iec_event_id;
rtn_iovec.iov_base = &cpkt;
rtn_iovec.iov_len = cpkt.chc_head.size;
ret = totempg_mcast (&rtn_iovec, 1, TOTEMPG_AGREED);
ret = totempg_groups_mcast_joined (openais_group_handle, &rtn_iovec, 1, TOTEMPG_AGREED);
if (ret == 0) {
return 0;
}
@ -3983,7 +3983,7 @@ static int evt_sync_process(void)
cpkt.u.chc_set_id.chc_last_id = md->mn_last_msg_id;
chn_iovec.iov_base = &cpkt;
chn_iovec.iov_len = cpkt.chc_head.size;
res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED);
res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1,TOTEMPG_AGREED);
if (res != 0) {
log_printf(RECOVERY_DEBUG,
"Unable to send event id to %s\n",
@ -4048,7 +4048,7 @@ static int evt_sync_process(void)
cpkt.u.chc_set_opens.chc_open_count = eci->esc_local_opens;
chn_iovec.iov_base = &cpkt;
chn_iovec.iov_len = cpkt.chc_head.size;
res = totempg_mcast(&chn_iovec, 1,TOTEMPG_AGREED);
res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1,TOTEMPG_AGREED);
if (res != 0) {
/*
@ -4063,7 +4063,7 @@ static int evt_sync_process(void)
cpkt.chc_op = EVT_OPEN_COUNT_DONE;
chn_iovec.iov_base = &cpkt;
chn_iovec.iov_len = cpkt.chc_head.size;
res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED);
res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1,TOTEMPG_AGREED);
if (res != 0) {
/*
* Try again later.
@ -4110,7 +4110,7 @@ static int evt_sync_process(void)
evt->ed_event.led_head.id = MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA;
chn_iovec.iov_base = &evt->ed_event;
chn_iovec.iov_len = evt->ed_event.led_head.size;
res = totempg_mcast(&chn_iovec, 1, TOTEMPG_AGREED);
res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1, TOTEMPG_AGREED);
if (res != 0) {
/*
@ -4137,7 +4137,7 @@ static int evt_sync_process(void)
cpkt.chc_op = EVT_CONF_DONE;
chn_iovec.iov_base = &cpkt;
chn_iovec.iov_len = cpkt.chc_head.size;
res = totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED);
res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1, TOTEMPG_AGREED);
recovery_phase = evt_wait_send_retained_events;
return 1;

View File

@ -441,8 +441,8 @@ int lck_resource_close (struct resource *resource)
iovec.iov_base = (char *)&req_exec_lck_resourceclose;
iovec.iov_len = sizeof (req_exec_lck_resourceclose);
if (totempg_send_ok (sizeof (struct req_exec_lck_resourceclose))) {
assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0);
if (totempg_groups_send_ok_joined (openais_group_handle, &iovec, 1)) {
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -471,7 +471,7 @@ void resource_lock_orphan (struct resource_lock *resource_lock)
iovec.iov_base = (char *)&req_exec_lck_resourcelockorphan;
iovec.iov_len = sizeof (req_exec_lck_resourcelockorphan);
assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
// AAA
}
@ -1235,7 +1235,7 @@ static int message_handler_req_lib_lck_resourceopen (struct conn_info *conn_info
iovec.iov_base = (char *)&req_exec_lck_resourceopen;
iovec.iov_len = sizeof (req_exec_lck_resourceopen);
assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -1268,7 +1268,7 @@ static int message_handler_req_lib_lck_resourceopenasync (struct conn_info *conn
iovec.iov_base = (char *)&req_exec_lck_resourceopen;
iovec.iov_len = sizeof (req_exec_lck_resourceopen);
assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -1298,8 +1298,8 @@ static int message_handler_req_lib_lck_resourceclose (struct conn_info *conn_inf
iovecs[0].iov_base = (char *)&req_exec_lck_resourceclose;
iovecs[0].iov_len = sizeof (req_exec_lck_resourceclose);
if (totempg_send_ok (sizeof (struct req_exec_lck_resourceclose))) {
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
if (totempg_groups_send_ok_joined (openais_group_handle, iovecs, 1)) {
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
}
}
else {
@ -1343,7 +1343,7 @@ static int message_handler_req_lib_lck_resourcelock (struct conn_info *conn_info
iovecs[0].iov_base = (char *)&req_exec_lck_resourcelock;
iovecs[0].iov_len = sizeof (req_exec_lck_resourcelock);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -1374,7 +1374,7 @@ static int message_handler_req_lib_lck_resourcelockasync (struct conn_info *conn
iovecs[0].iov_base = (char *)&req_exec_lck_resourcelock;
iovecs[0].iov_len = sizeof (req_exec_lck_resourcelock);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -1405,7 +1405,7 @@ static int message_handler_req_lib_lck_resourceunlock (struct conn_info *conn_in
iovec.iov_base = (char *)&req_exec_lck_resourceunlock;
iovec.iov_len = sizeof (req_exec_lck_resourceunlock);
assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -1436,7 +1436,7 @@ static int message_handler_req_lib_lck_resourceunlockasync (struct conn_info *co
iovec.iov_base = (char *)&req_exec_lck_resourceunlock;
iovec.iov_len = sizeof (req_exec_lck_resourceunlock);
assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
return (0);
}
@ -1463,7 +1463,7 @@ static int message_handler_req_lib_lck_lockpurge (struct conn_info *conn_info, v
iovecs[0].iov_base = (char *)&req_exec_lck_lockpurge;
iovecs[0].iov_len = sizeof (req_exec_lck_lockpurge);
assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
return (0);
}

View File

@ -187,6 +187,14 @@ struct totem_ip_address this_non_loopback_ip;
char *socketname = "libais.socket";
totempg_groups_handle openais_group_handle;
struct totempg_group openais_group = {
.group = "a",
.group_len = 1
};
static int libais_connection_active (struct conn_info *conn_info)
{
return (conn_info->state == CONN_STATE_ACTIVE);
@ -592,6 +600,8 @@ static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent,
struct ucred *cred;
int on = 0;
int send_ok = 0;
int send_ok_joined = 0;
struct iovec send_ok_joined_iovec;
struct res_overlay res_overlay;
if (revent & (POLLERR|POLLHUP)) {
@ -705,10 +715,15 @@ retry_recv:
* to queue a message, otherwise tell the library we are busy and to
* try again later
*/
send_ok_joined_iovec.iov_base = header;
send_ok_joined_iovec.iov_len = header->size;
send_ok_joined = totempg_groups_send_ok_joined (openais_group_handle,
&send_ok_joined_iovec, 1);
send_ok =
(ais_service_handlers[service]->libais_handlers[header->id].flow_control == FLOW_CONTROL_NOT_REQUIRED) ||
((ais_service_handlers[service]->libais_handlers[header->id].flow_control == FLOW_CONTROL_REQUIRED) &&
(totempg_send_ok (header->size)) &&
(send_ok_joined) &&
(sync_in_process() == 0));
if (send_ok) {
@ -1139,10 +1154,18 @@ int main (int argc, char **argv)
totempg_initialize (
aisexec_poll_handle,
&openais_config.totem_config,
&openais_config.totem_config);
totempg_groups_initialize (
&openais_group_handle,
deliver_fn,
confchg_fn);
totempg_groups_join (
openais_group_handle,
&openais_group,
1);
this_ip = &openais_config.totem_config.interfaces[0].boundto;
/*

View File

@ -148,9 +148,11 @@ enum nodeexec_message_types {
extern struct totem_ip_address *this_ip;
poll_handle aisexec_poll_handle;
extern struct totempg_group openais_group;
extern struct gmi_groupname aisexec_groupname;
extern totempg_groups_handle openais_group_handle;
poll_handle aisexec_poll_handle;
extern int libais_send_response (struct conn_info *conn_info, void *msg, int mlen);
@ -158,4 +160,5 @@ extern int message_source_is_local(struct message_source *source);
extern void message_source_set(struct message_source *source, struct conn_info *conn_info);
#endif /* AIS_EXEC_H_DEFINED */

View File

@ -114,7 +114,7 @@ static int sync_barrier_send (struct memb_ring_id *ring_id)
iovec.iov_base = (char *)&req_exec_sync_barrier_start;
iovec.iov_len = sizeof (req_exec_sync_barrier_start);
res = totempg_mcast (&iovec, 1, TOTEMPG_AGREED);
res = totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED);
return (res);
}

View File

@ -82,16 +82,16 @@
*/
#include <netinet/in.h>
#include "totempg.h"
#include "totemsrp.h"
#include "totemmrp.h"
#include <sys/uio.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include "totempg.h"
#include "totemmrp.h"
#include "totemsrp.h"
#include "hdb.h"
#include "swab.h"
#define min(a,b) ((a) < (b)) ? a : b
@ -140,19 +140,6 @@ static int mcast_packed_msg_count = 0;
struct totem_config *totempg_totem_config;
static void (*app_deliver_fn) (
struct totem_ip_address *source_addr,
struct iovec *iovec,
int iov_len,
int endian_conversion_required) = 0;
static void (*app_confchg_fn) (
enum totem_configuration_type configuration_type,
struct totem_ip_address *member_list, int member_list_entries,
struct totem_ip_address *left_list, int left_list_entries,
struct totem_ip_address *joined_list, int joined_list_entries,
struct memb_ring_id *ring_id) = 0;
struct assembly {
struct totem_ip_address addr;
unsigned char data[MESSAGE_SIZE_MAX];
@ -174,11 +161,41 @@ int assembly_list_entries = 0;
* the buffer is a continuation of a previously packed fragment.
*/
static unsigned char *fragmentation_data;
int fragment_size = 0;
int fragment_continuation = 0;
static int fragment_size = 0;
static int fragment_continuation = 0;
static struct iovec iov_delv;
static unsigned int totempg_max_handle = 0;
struct totempg_group_instance {
void (*deliver_fn) (
struct totem_ip_address *source_addr,
struct iovec *iovec,
int iov_len,
int endian_conversion_required);
void (*confchg_fn) (
enum totem_configuration_type configuration_type,
struct totem_ip_address *member_list, int member_list_entries,
struct totem_ip_address *left_list, int left_list_entries,
struct totem_ip_address *joined_list, int joined_list_entries,
struct memb_ring_id *ring_id);
struct totempg_group *groups;
int groups_cnt;
};
static struct saHandleDatabase totempg_groups_instance_database = {
.handleCount = 0,
.handles = 0,
.handleInstanceDestructor = 0
};
static int send_ok (int msg_size);
static struct assembly *find_assembly (struct totem_ip_address *addr)
{
int i;
@ -191,6 +208,110 @@ static struct assembly *find_assembly (struct totem_ip_address *addr)
return (0);
}
static inline void app_confchg_fn (
enum totem_configuration_type configuration_type,
struct totem_ip_address *member_list, int member_list_entries,
struct totem_ip_address *left_list, int left_list_entries,
struct totem_ip_address *joined_list, int joined_list_entries,
struct memb_ring_id *ring_id)
{
int i;
SaAisErrorT error;
struct totempg_group_instance *instance;
for (i = 0; i <= totempg_max_handle; i++) {
error = saHandleInstanceGet (&totempg_groups_instance_database,
i, (void *)&instance);
if (error == SA_OK) {
instance->confchg_fn (
configuration_type,
member_list,
member_list_entries,
left_list,
left_list_entries,
joined_list,
joined_list_entries,
ring_id);
saHandleInstancePut (&totempg_groups_instance_database, i);
}
}
}
static inline int group_matches (
struct iovec *iovec,
unsigned int iov_len,
struct totempg_group *groups_b,
unsigned int group_b_cnt,
unsigned int *adjust_iovec)
{
unsigned short *group_len;
char *group_name;
int i;
int j;
assert (iov_len == 1);
group_len = (unsigned short *)iovec->iov_base;
group_name = ((char *)iovec->iov_base) +
sizeof (unsigned short) * (group_len[0] + 1);
/*
* Calculate amount to adjust the iovec by before delivering to app
*/
*adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
for (i = 1; i < group_len[0] + 1; i++) {
*adjust_iovec += group_len[i];
}
/*
* Determine if this message should be delivered to this instance
*/
for (i = 1; i < group_len[0] + 1; i++) {
for (j = 0; j < group_b_cnt; j++) {
if ((group_len[i] == groups_b[j].group_len) &&
(memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
return (1);
}
}
group_name += group_len[i];
}
return (0);
}
static inline void app_deliver_fn (
struct totem_ip_address *source_addr,
struct iovec *iovec,
unsigned int iov_len,
int endian_conversion_required)
{
int i;
SaAisErrorT error;
struct totempg_group_instance *instance;
struct iovec stripped_iovec;
unsigned int adjust_iovec;
for (i = 0; i <= totempg_max_handle; i++) {
error = saHandleInstanceGet (&totempg_groups_instance_database,
i, (void *)&instance);
if (error == SA_OK) {
assert (iov_len == 1);
if (group_matches (iovec, iov_len, instance->groups, instance->groups_cnt, &adjust_iovec)) {
stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
instance->deliver_fn (
source_addr,
&stripped_iovec,
iov_len,
endian_conversion_required);
}
saHandleInstancePut (&totempg_groups_instance_database, i);
}
}
}
static void totempg_confchg_fn (
enum totem_configuration_type configuration_type,
struct totem_ip_address *member_list, int member_list_entries,
@ -459,26 +580,10 @@ int callback_token_received_fn (enum totem_callback_token_type type,
*/
int totempg_initialize (
poll_handle poll_handle,
struct totem_config *totem_config,
void (*deliver_fn) (
struct totem_ip_address *source_addr,
struct iovec *iovec,
int iov_len,
int endian_conversion_required),
void (*confchg_fn) (
enum totem_configuration_type configuration_type,
struct totem_ip_address *member_list, int member_list_entries,
struct totem_ip_address *left_list, int left_list_entries,
struct totem_ip_address *joined_list, int joined_list_entries,
struct memb_ring_id *ring_id))
struct totem_config *totem_config)
{
int res;
app_deliver_fn = deliver_fn;
app_confchg_fn = confchg_fn;
totempg_totem_config = totem_config;
fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
@ -514,7 +619,7 @@ static unsigned char next_fragment = 1;
/*
* Multicast a message
*/
int totempg_mcast (
static int mcast_msg (
struct iovec *iovec,
int iov_len,
int guarantee)
@ -542,7 +647,7 @@ int totempg_mcast (
total_size += iovec[i].iov_len;
}
if( totempg_send_ok (total_size + sizeof(unsigned short) *
if (send_ok (total_size + sizeof(unsigned short) *
(mcast_packed_msg_count+1)) == 0) {
return(-1);
@ -616,7 +721,7 @@ int totempg_mcast (
iovecs[0].iov_len = sizeof(struct totempg_mcast);
iovecs[1].iov_base = mcast_packed_msg_lens;
iovecs[1].iov_len = mcast_packed_msg_count *
sizeof(unsigned short);
sizeof(unsigned short);
iovecs[2].iov_base = data_ptr;
iovecs[2].iov_len = max_packet_size;
assert (totemmrp_avail() > 0);
@ -662,7 +767,7 @@ int totempg_mcast (
/*
* Determine if a message of msg_size could be queued
*/
int totempg_send_ok (
static int send_ok (
int msg_size)
{
int avail = 0;
@ -675,12 +780,11 @@ int totempg_send_ok (
* a full message, so add +1
* totempg_totem_config->net_mtu - 25 is for the totempg_mcast header
*/
total = (msg_size / (totempg_totem_config->net_mtu - 25)) + 1;
return (avail >= total);
}
int totempg_callback_token_create (
void **handle_out,
enum totem_callback_token_type type,
@ -700,3 +804,266 @@ void totempg_callback_token_destroy (
/*
* vi: set autoindent tabstop=4 shiftwidth=4 :
*/
int totempg_groups_initialize (
totempg_groups_handle *handle,
void (*deliver_fn) (
struct totem_ip_address *source_addr,
struct iovec *iovec,
int iov_len,
int endian_conversion_required),
void (*confchg_fn) (
enum totem_configuration_type configuration_type,
struct totem_ip_address *member_list, int member_list_entries,
struct totem_ip_address *left_list, int left_list_entries,
struct totem_ip_address *joined_list, int joined_list_entries,
struct memb_ring_id *ring_id))
{
SaAisErrorT error;
struct totempg_group_instance *instance;
error = saHandleCreate (&totempg_groups_instance_database,
sizeof (struct totempg_group_instance), handle);
if (error != SA_OK) {
goto error_exit;
}
if (*handle > totempg_max_handle) {
totempg_max_handle = *handle;
}
error = saHandleInstanceGet (&totempg_groups_instance_database, *handle,
(void *)&instance);
if (error != SA_OK) {
goto error_destroy;
}
instance->deliver_fn = deliver_fn;
instance->confchg_fn = confchg_fn;
instance->groups = 0;
instance->groups_cnt = 0;
saHandleInstancePut (&totempg_groups_instance_database, *handle);
return (0);
error_destroy:
saHandleDestroy (&totempg_groups_instance_database, *handle);
error_exit:
return (-1);
}
int totempg_groups_join (
totempg_groups_handle handle,
struct totempg_group *groups,
int group_cnt)
{
SaAisErrorT error;
struct totempg_group_instance *instance;
struct totempg_group *new_groups;
error = saHandleInstanceGet (&totempg_groups_instance_database, handle,
(void *)&instance);
if (error != SA_OK) {
goto error_exit;
}
new_groups = realloc (instance->groups,
sizeof (struct totempg_group) *
(instance->groups_cnt + group_cnt));
if (new_groups == 0) {
error = ENOMEM;
goto error_exit;
}
memcpy (&new_groups[instance->groups_cnt],
groups, group_cnt * sizeof (struct totempg_group));
instance->groups = new_groups;
instance->groups_cnt = instance->groups_cnt = group_cnt;
saHandleInstancePut (&totempg_groups_instance_database, handle);
return (0);
error_exit:
return (error);
}
int totempg_groups_leave (
totempg_groups_handle handle,
struct totempg_group *groups,
int group_cnt)
{
SaAisErrorT error;
struct totempg_group_instance *instance;
error = saHandleInstanceGet (&totempg_groups_instance_database, handle,
(void *)&instance);
if (error != SA_OK) {
goto error_exit;
}
saHandleInstancePut (&totempg_groups_instance_database, handle);
return (0);
error_exit:
return (error);
}
#define MAX_IOVECS_FROM_APP 32
#define MAX_GROUPS_PER_MSG 32
int totempg_groups_mcast_joined (
totempg_groups_handle handle,
struct iovec *iovec,
int iov_len,
int guarantee)
{
SaAisErrorT error;
struct totempg_group_instance *instance;
unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
int i;
error = saHandleInstanceGet (&totempg_groups_instance_database, handle,
(void *)&instance);
if (error != SA_OK) {
goto error_exit;
}
/*
* Build group_len structure and the iovec_mcast structure
*/
group_len[0] = instance->groups_cnt;
for (i = 0; i < instance->groups_cnt; i++) {
group_len[i + 1] = instance->groups[i].group_len;
iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
iovec_mcast[i + 1].iov_base = instance->groups[i].group;
}
iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
iovec_mcast[0].iov_base = group_len;
for (i = 0; i < iov_len; i++) {
iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
}
mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
saHandleInstancePut (&totempg_groups_instance_database, handle);
return (0);
error_exit:
return (error);
}
int totempg_groups_send_ok_joined (
totempg_groups_handle handle,
struct iovec *iovec,
int iov_len)
{
SaAisErrorT error;
struct totempg_group_instance *instance;
unsigned int size = 0;
unsigned int i;
unsigned int res;
error = saHandleInstanceGet (&totempg_groups_instance_database, handle,
(void *)&instance);
if (error != SA_OK) {
goto error_exit;
}
for (i = 0; i < instance->groups_cnt; i++) {
size += instance->groups[i].group_len;
}
for (i = 0; i < iov_len; i++) {
size += iovec[i].iov_len;
}
res = send_ok (size);
saHandleInstancePut (&totempg_groups_instance_database, handle);
return (res);
error_exit:
return (error);
}
int totempg_groups_mcast_groups (
totempg_groups_handle handle,
int guarantee,
struct totempg_group *groups,
int groups_cnt,
struct iovec *iovec,
int iov_len)
{
SaAisErrorT error;
struct totempg_group_instance *instance;
unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
int i;
error = saHandleInstanceGet (&totempg_groups_instance_database, handle,
(void *)&instance);
if (error != SA_OK) {
goto error_exit;
}
/*
* Build group_len structure and the iovec_mcast structure
*/
group_len[0] = groups_cnt;
for (i = 0; i < groups_cnt; i++) {
group_len[i + 1] = groups[i].group_len;
iovec_mcast[i + 1].iov_len = groups[i].group_len;
iovec_mcast[i + 1].iov_base = groups[i].group;
}
iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
iovec_mcast[0].iov_base = group_len;
for (i = 0; i < iov_len; i++) {
iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
}
mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
saHandleInstancePut (&totempg_groups_instance_database, handle);
return (0);
error_exit:
return (error);
}
int totempg_groups_send_ok_groups (
totempg_groups_handle handle,
struct totempg_group *groups,
int groups_cnt,
struct iovec *iovec,
int iov_len)
{
SaAisErrorT error;
struct totempg_group_instance *instance;
unsigned int size = 0;
unsigned int i;
unsigned int res;
error = saHandleInstanceGet (&totempg_groups_instance_database, handle,
(void *)&instance);
if (error != SA_OK) {
goto error_exit;
}
for (i = 0; i < groups_cnt; i++) {
size += groups[i].group_len;
}
for (i = 0; i < iov_len; i++) {
size += iovec[i].iov_len;
}
res = send_ok (size);
saHandleInstancePut (&totempg_groups_instance_database, handle);
return (0);
error_exit:
return (error);
}

View File

@ -40,6 +40,13 @@
#include "totemsrp.h"
#include "totem.h"
typedef unsigned int totempg_groups_handle;
struct totempg_group {
void *group;
int group_len;
};
#define TOTEMPG_AGREED 0
#define TOTEMPG_SAFE 1
@ -49,11 +56,28 @@
*/
/*
* Initialize the totem process group abstraction
* Initialize the totem process groups abstraction
*/
int totempg_initialize (
poll_handle poll_handle,
struct totem_config *totem_config,
struct totem_config *totem_config
);
void totempg_finalize (void);
int totempg_callback_token_create (void **handle_out,
enum totem_callback_token_type type,
int delete,
int (*callback_fn) (enum totem_callback_token_type type, void *),
void *data);
void totempg_callback_token_destroy (void *handle);
/*
* Initialize a groups instance
*/
int totempg_groups_initialize (
totempg_groups_handle *handle,
void (*deliver_fn) (
struct totem_ip_address *source_addr,
@ -68,28 +92,43 @@ int totempg_initialize (
struct totem_ip_address *joined_list, int joined_list_entries,
struct memb_ring_id *ring_id));
void totempg_finalize (void);
int totempg_groups_finalize (
totempg_groups_handle handle);
/*
* Multicast a message
*/
int totempg_mcast (
int totempg_groups_join (
totempg_groups_handle handle,
struct totempg_group *groups,
int gruop_cnt);
int totempg_groups_leave (
totempg_groups_handle handle,
struct totempg_group *groups,
int gruop_cnt);
int totempg_groups_mcast_joined (
totempg_groups_handle handle,
struct iovec *iovec,
int iov_len,
int guarantee);
/*
* Determine if a message of msg_size could be queued
*/
int totempg_send_ok (
int msg_size);
void totempg_callback_token_destroy (void *handle);
int totempg_callback_token_create (void **handle_out,
enum totem_callback_token_type type,
int delete,
int (*callback_fn) (enum totem_callback_token_type type, void *),
void *data);
int totempg_groups_send_ok_joined (
totempg_groups_handle handle,
struct iovec *iovec,
int iov_len);
int totempg_groups_mcast_groups (
totempg_groups_handle handle,
int guarantee,
struct totempg_group *groups,
int groups_cnt,
struct iovec *iovec,
int iov_len);
int totempg_groups_send_ok_groups (
totempg_groups_handle handle,
struct totempg_group *groups,
int groups_cnt,
struct iovec *iovec,
int iov_len);
#endif /* TOTEMPG_H_DEFINED */