diff --git a/exec/amf.c b/exec/amf.c index 68afe898..fcadedde 100644 --- a/exec/amf.c +++ b/exec/amf.c @@ -532,7 +532,7 @@ static void component_unregister ( iovecs[0].iov_base = (char *)&req_exec_amf_componentunregister; iovecs[0].iov_len = sizeof (req_exec_amf_componentunregister); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } static void component_register ( @@ -569,7 +569,7 @@ static void component_register ( iovecs[0].iov_base = (char *)&req_exec_amf_componentregister; iovecs[0].iov_len = sizeof (req_exec_amf_componentregister); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } /*** @@ -770,7 +770,7 @@ static void ha_state_group_set ( iovecs[0].iov_base = (char *)&req_exec_amf_hastateset; iovecs[0].iov_len = sizeof (req_exec_amf_hastateset); - totempg_mcast (iovecs, 1, TOTEMPG_AGREED); + totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED); } void readiness_state_api_set (struct saAmfComponent *component, @@ -839,7 +839,7 @@ static void readiness_state_group_set ( iovecs[0].iov_base = (char *)&req_exec_amf_readinessstateset; iovecs[0].iov_len = sizeof (req_exec_amf_readinessstateset); - totempg_mcast (iovecs, 1, TOTEMPG_AGREED); + totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED); } static void dsmDisabledUnlockedRegisteredOrErrorCancel ( @@ -1463,7 +1463,7 @@ void error_report ( iovecs[0].iov_base = (char *)&req_exec_amf_errorreport; iovecs[0].iov_len = sizeof (req_exec_amf_errorreport); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } int healthcheck_instance = 0; @@ -2410,7 +2410,7 @@ static int message_handler_req_amf_componentregister (struct conn_info *conn_inf iovecs[0].iov_base = (char *)&req_exec_amf_componentregister; iovecs[0].iov_len = sizeof (req_exec_amf_componentregister); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -2439,7 +2439,7 @@ static int message_handler_req_amf_componentunregister (struct conn_info *conn_i iovecs[0].iov_base = (char *)&req_exec_amf_componentunregister; iovecs[0].iov_len = sizeof (req_exec_amf_componentunregister); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -2620,7 +2620,7 @@ static int message_handler_req_amf_errorreport (struct conn_info *conn_info, voi // iovecs[1].iov_base = (char *)&req_lib_amf_errorreport; // iovecs[1].iov_len = sizeof (req_lib_amf_errorreport); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -2650,7 +2650,7 @@ static int message_handler_req_amf_errorcancelall (struct conn_info *conn_info, // iovecs[1].iov_base = (char *)&req_lib_amf_errorcancelall; // iovecs[1].iov_len = sizeof (req_lib_amf_errorcancelall); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } diff --git a/exec/ckpt.c b/exec/ckpt.c index 46c0b4fc..21e214ae 100644 --- a/exec/ckpt.c +++ b/exec/ckpt.c @@ -755,8 +755,8 @@ static int ckpt_recovery_process (void) * Check to see if we can queue the new message and if you can * then mcast the message else break and create callback. */ - if (totempg_send_ok(iovecs[0].iov_len + iovecs[1].iov_len)){ - assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); + if (totempg_groups_send_ok_joined (openais_group_handle, iovecs, 2)) { + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0); log_printf (LOG_LEVEL_DEBUG, "CKPT: Multicasted Sync State Message.\n"); } else { @@ -829,8 +829,8 @@ static int ckpt_recovery_process (void) * Check to see if we can queue the new message and if you can * then mcast the message else break and create callback. */ - if (totempg_send_ok(iovecs[0].iov_len + iovecs[1].iov_len + iovecs[2].iov_len)){ - assert (totempg_mcast (iovecs, 3, TOTEMPG_AGREED) == 0); + if (totempg_groups_send_ok_joined (openais_group_handle, iovecs, 2)) { + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 3, TOTEMPG_AGREED) == 0); log_printf (LOG_LEVEL_DEBUG, "CKPT: Multicasted Sync Section Message.\n"); } else { @@ -1284,8 +1284,8 @@ int ckpt_checkpoint_close (struct saCkptCheckpoint *checkpoint) { iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointclose; iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointclose); - if (totempg_send_ok (sizeof (struct req_exec_ckpt_checkpointclose))) { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + if (totempg_groups_send_ok_joined (openais_group_handle, iovecs, 1)) { + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -1833,7 +1833,7 @@ void timer_function_retention (void *data) iovec.iov_base = (char *)&req_exec_ckpt_checkpointretentiondurationexpire; iovec.iov_len = sizeof (req_exec_ckpt_checkpointretentiondurationexpire); - assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0); } extern int message_handler_req_exec_ckpt_checkpointclose (void *message, struct totem_ip_address *source_addr, int endian_conversion_required) @@ -2021,7 +2021,7 @@ static int message_handler_req_exec_ckpt_checkpointretentiondurationexpire (void iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointunlink; iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointunlink); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } return (0); } @@ -2802,7 +2802,7 @@ static int message_handler_req_lib_ckpt_checkpointopen (struct conn_info *conn_i iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointopen; iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointopen); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -2832,7 +2832,7 @@ static int message_handler_req_lib_ckpt_checkpointopenasync (struct conn_info *c iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointopen; iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointopen); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -2859,8 +2859,8 @@ static int message_handler_req_lib_ckpt_checkpointclose (struct conn_info *conn_ iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointclose; iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointclose); - if (totempg_send_ok (sizeof (struct req_exec_ckpt_checkpointclose))) { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + if (totempg_groups_send_ok_joined (openais_group_handle, iovecs, 1)) { + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } } else { @@ -2896,7 +2896,7 @@ static int message_handler_req_lib_ckpt_checkpointunlink (struct conn_info *conn iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointunlink; iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointunlink); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -2921,7 +2921,7 @@ static int message_handler_req_lib_ckpt_checkpointretentiondurationset (struct c iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointretentiondurationset; iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointretentiondurationset); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -3070,9 +3070,9 @@ printf ("|\n"); #endif if (iovecs[1].iov_len > 0) { log_printf (LOG_LEVEL_DEBUG, "IOV_BASE is %p\n", iovecs[1].iov_base); - assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0); } else { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } } @@ -3123,9 +3123,9 @@ static int message_handler_req_lib_ckpt_sectiondelete (struct conn_info *conn_in req_exec_ckpt_sectiondelete.header.size += iovecs[1].iov_len; if (iovecs[1].iov_len > 0) { - assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0); } else { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } return (0); @@ -3163,9 +3163,9 @@ static int message_handler_req_lib_ckpt_sectionexpirationtimeset (struct conn_in if (iovecs[1].iov_len > 0) { log_printf (LOG_LEVEL_DEBUG, "IOV_BASE is %p\n", iovecs[1].iov_base); - assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0); } else { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } return (0); @@ -3218,9 +3218,9 @@ static int message_handler_req_lib_ckpt_sectionwrite (struct conn_info *conn_inf req_exec_ckpt_sectionwrite.header.size += iovecs[1].iov_len; if (iovecs[1].iov_len > 0) { - assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0); } else { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } } else { @@ -3276,9 +3276,9 @@ static int message_handler_req_lib_ckpt_sectionoverwrite (struct conn_info *conn req_exec_ckpt_sectionoverwrite.header.size += iovecs[1].iov_len; if (iovecs[1].iov_len > 0) { - assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0); } else { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } } else { @@ -3340,9 +3340,9 @@ static int message_handler_req_lib_ckpt_sectionread (struct conn_info *conn_info req_exec_ckpt_sectionread.header.size += iovecs[1].iov_len; if (iovecs[1].iov_len > 0) { - assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2, TOTEMPG_AGREED) == 0); } else { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } return (0); /* DO NOT REMOVE */ } diff --git a/exec/clm.c b/exec/clm.c index 53c155ac..3bbec873 100644 --- a/exec/clm.c +++ b/exec/clm.c @@ -386,7 +386,7 @@ static int clm_nodejoin_send (void) req_exec_clm_iovec.iov_base = (char *)&req_exec_clm_nodejoin; req_exec_clm_iovec.iov_len = sizeof (req_exec_clm_nodejoin); - result = totempg_mcast (&req_exec_clm_iovec, 1, TOTEMPG_AGREED); + result = totempg_groups_mcast_joined (openais_group_handle, &req_exec_clm_iovec, 1, TOTEMPG_AGREED); return (result); } diff --git a/exec/evs.c b/exec/evs.c index 7760ef8a..f650713c 100644 --- a/exec/evs.c +++ b/exec/evs.c @@ -340,9 +340,9 @@ static int message_handler_req_evs_mcast_joined (struct conn_info *conn_info, vo req_exec_evs_mcast_iovec[2].iov_base = &req_lib_evs_mcast_joined->msg; req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_joined->msg_len; // TODO this doesn't seem to work for some reason - send_ok = totempg_send_ok (req_lib_evs_mcast_joined->msg_len); + send_ok = totempg_groups_send_ok_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3); - res = totempg_mcast (req_exec_evs_mcast_iovec, 3, TOTEMPG_AGREED); + res = totempg_groups_mcast_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3, TOTEMPG_AGREED); // TODO if (res == 0) { error = EVS_OK; @@ -389,8 +389,8 @@ static int message_handler_req_evs_mcast_groups (struct conn_info *conn_info, vo req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_groups->msg_len; // TODO this is wacky - send_ok = totempg_send_ok (req_lib_evs_mcast_groups->msg_len); - res = totempg_mcast (req_exec_evs_mcast_iovec, 3, TOTEMPG_AGREED); + send_ok = totempg_groups_send_ok_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3); + res = totempg_groups_mcast_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3, TOTEMPG_AGREED); if (res == 0) { error = EVS_OK; } diff --git a/exec/evt.c b/exec/evt.c index 2ace80c7..cfe2fbbd 100644 --- a/exec/evt.c +++ b/exec/evt.c @@ -1115,7 +1115,7 @@ static SaErrorT evt_open_channel(SaNameT *cn, SaUint8T flgs) chn_iovec.iov_base = &cpkt; chn_iovec.iov_len = cpkt.chc_head.size; log_printf(CHAN_OPEN_DEBUG, "evt_open_channel: Send open mcast\n"); - res = totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED); + res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1, TOTEMPG_AGREED); log_printf(CHAN_OPEN_DEBUG, "evt_open_channel: Open mcast result: %d\n", res); if (res != 0) { @@ -1151,7 +1151,7 @@ static SaErrorT evt_close_channel(SaNameT *cn, uint64_t unlink_id) cpkt.u.chcu.chcu_unlink_id = unlink_id; chn_iovec.iov_base = &cpkt; chn_iovec.iov_len = cpkt.chc_head.size; - res = totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED); + res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1, TOTEMPG_AGREED); if (res != 0) { ret = SA_AIS_ERR_LIBRARY; } @@ -2338,7 +2338,7 @@ static int lib_evt_unlink_channel(struct conn_info *conn_info, void *message) cpkt.u.chcu.chcu_unlink_id = ucp->ucp_unlink_id; chn_iovec.iov_base = &cpkt; chn_iovec.iov_len = cpkt.chc_head.size; - if (totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED) == 0) { + if (totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1, TOTEMPG_AGREED) == 0) { return 0; } @@ -2609,7 +2609,7 @@ static int lib_evt_event_publish(struct conn_info *conn_info, void *message) */ pub_iovec.iov_base = req; pub_iovec.iov_len = req->led_head.size; - result = totempg_mcast (&pub_iovec, 1, TOTEMPG_AGREED); + result = totempg_groups_mcast_joined (openais_group_handle, &pub_iovec, 1, TOTEMPG_AGREED); if (result != 0) { error = SA_AIS_ERR_LIBRARY; } @@ -2670,7 +2670,7 @@ static int lib_evt_event_clear_retentiontime(struct conn_info *conn_info, cpkt.u.chc_event_id = req->iec_event_id; rtn_iovec.iov_base = &cpkt; rtn_iovec.iov_len = cpkt.chc_head.size; - ret = totempg_mcast (&rtn_iovec, 1, TOTEMPG_AGREED); + ret = totempg_groups_mcast_joined (openais_group_handle, &rtn_iovec, 1, TOTEMPG_AGREED); if (ret == 0) { return 0; } @@ -3983,7 +3983,7 @@ static int evt_sync_process(void) cpkt.u.chc_set_id.chc_last_id = md->mn_last_msg_id; chn_iovec.iov_base = &cpkt; chn_iovec.iov_len = cpkt.chc_head.size; - res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED); + res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1,TOTEMPG_AGREED); if (res != 0) { log_printf(RECOVERY_DEBUG, "Unable to send event id to %s\n", @@ -4048,7 +4048,7 @@ static int evt_sync_process(void) cpkt.u.chc_set_opens.chc_open_count = eci->esc_local_opens; chn_iovec.iov_base = &cpkt; chn_iovec.iov_len = cpkt.chc_head.size; - res = totempg_mcast(&chn_iovec, 1,TOTEMPG_AGREED); + res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1,TOTEMPG_AGREED); if (res != 0) { /* @@ -4063,7 +4063,7 @@ static int evt_sync_process(void) cpkt.chc_op = EVT_OPEN_COUNT_DONE; chn_iovec.iov_base = &cpkt; chn_iovec.iov_len = cpkt.chc_head.size; - res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED); + res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1,TOTEMPG_AGREED); if (res != 0) { /* * Try again later. @@ -4110,7 +4110,7 @@ static int evt_sync_process(void) evt->ed_event.led_head.id = MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA; chn_iovec.iov_base = &evt->ed_event; chn_iovec.iov_len = evt->ed_event.led_head.size; - res = totempg_mcast(&chn_iovec, 1, TOTEMPG_AGREED); + res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1, TOTEMPG_AGREED); if (res != 0) { /* @@ -4137,7 +4137,7 @@ static int evt_sync_process(void) cpkt.chc_op = EVT_CONF_DONE; chn_iovec.iov_base = &cpkt; chn_iovec.iov_len = cpkt.chc_head.size; - res = totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED); + res = totempg_groups_mcast_joined (openais_group_handle, &chn_iovec, 1, TOTEMPG_AGREED); recovery_phase = evt_wait_send_retained_events; return 1; diff --git a/exec/lck.c b/exec/lck.c index 26599094..9306d1c5 100644 --- a/exec/lck.c +++ b/exec/lck.c @@ -441,8 +441,8 @@ int lck_resource_close (struct resource *resource) iovec.iov_base = (char *)&req_exec_lck_resourceclose; iovec.iov_len = sizeof (req_exec_lck_resourceclose); - if (totempg_send_ok (sizeof (struct req_exec_lck_resourceclose))) { - assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0); + if (totempg_groups_send_ok_joined (openais_group_handle, &iovec, 1)) { + assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -471,7 +471,7 @@ void resource_lock_orphan (struct resource_lock *resource_lock) iovec.iov_base = (char *)&req_exec_lck_resourcelockorphan; iovec.iov_len = sizeof (req_exec_lck_resourcelockorphan); - assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0); // AAA } @@ -1235,7 +1235,7 @@ static int message_handler_req_lib_lck_resourceopen (struct conn_info *conn_info iovec.iov_base = (char *)&req_exec_lck_resourceopen; iovec.iov_len = sizeof (req_exec_lck_resourceopen); - assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -1268,7 +1268,7 @@ static int message_handler_req_lib_lck_resourceopenasync (struct conn_info *conn iovec.iov_base = (char *)&req_exec_lck_resourceopen; iovec.iov_len = sizeof (req_exec_lck_resourceopen); - assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -1298,8 +1298,8 @@ static int message_handler_req_lib_lck_resourceclose (struct conn_info *conn_inf iovecs[0].iov_base = (char *)&req_exec_lck_resourceclose; iovecs[0].iov_len = sizeof (req_exec_lck_resourceclose); - if (totempg_send_ok (sizeof (struct req_exec_lck_resourceclose))) { - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + if (totempg_groups_send_ok_joined (openais_group_handle, iovecs, 1)) { + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); } } else { @@ -1343,7 +1343,7 @@ static int message_handler_req_lib_lck_resourcelock (struct conn_info *conn_info iovecs[0].iov_base = (char *)&req_exec_lck_resourcelock; iovecs[0].iov_len = sizeof (req_exec_lck_resourcelock); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -1374,7 +1374,7 @@ static int message_handler_req_lib_lck_resourcelockasync (struct conn_info *conn iovecs[0].iov_base = (char *)&req_exec_lck_resourcelock; iovecs[0].iov_len = sizeof (req_exec_lck_resourcelock); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -1405,7 +1405,7 @@ static int message_handler_req_lib_lck_resourceunlock (struct conn_info *conn_in iovec.iov_base = (char *)&req_exec_lck_resourceunlock; iovec.iov_len = sizeof (req_exec_lck_resourceunlock); - assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -1436,7 +1436,7 @@ static int message_handler_req_lib_lck_resourceunlockasync (struct conn_info *co iovec.iov_base = (char *)&req_exec_lck_resourceunlock; iovec.iov_len = sizeof (req_exec_lck_resourceunlock); - assert (totempg_mcast (&iovec, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0); return (0); } @@ -1463,7 +1463,7 @@ static int message_handler_req_lib_lck_lockpurge (struct conn_info *conn_info, v iovecs[0].iov_base = (char *)&req_exec_lck_lockpurge; iovecs[0].iov_len = sizeof (req_exec_lck_lockpurge); - assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0); + assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0); return (0); } diff --git a/exec/main.c b/exec/main.c index a5bf927c..02dfa040 100644 --- a/exec/main.c +++ b/exec/main.c @@ -187,6 +187,14 @@ struct totem_ip_address this_non_loopback_ip; char *socketname = "libais.socket"; +totempg_groups_handle openais_group_handle; + +struct totempg_group openais_group = { + .group = "a", + .group_len = 1 +}; + + static int libais_connection_active (struct conn_info *conn_info) { return (conn_info->state == CONN_STATE_ACTIVE); @@ -592,6 +600,8 @@ static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent, struct ucred *cred; int on = 0; int send_ok = 0; + int send_ok_joined = 0; + struct iovec send_ok_joined_iovec; struct res_overlay res_overlay; if (revent & (POLLERR|POLLHUP)) { @@ -705,10 +715,15 @@ retry_recv: * to queue a message, otherwise tell the library we are busy and to * try again later */ + send_ok_joined_iovec.iov_base = header; + send_ok_joined_iovec.iov_len = header->size; + send_ok_joined = totempg_groups_send_ok_joined (openais_group_handle, + &send_ok_joined_iovec, 1); + send_ok = (ais_service_handlers[service]->libais_handlers[header->id].flow_control == FLOW_CONTROL_NOT_REQUIRED) || ((ais_service_handlers[service]->libais_handlers[header->id].flow_control == FLOW_CONTROL_REQUIRED) && - (totempg_send_ok (header->size)) && + (send_ok_joined) && (sync_in_process() == 0)); if (send_ok) { @@ -1139,10 +1154,18 @@ int main (int argc, char **argv) totempg_initialize ( aisexec_poll_handle, - &openais_config.totem_config, + &openais_config.totem_config); + + totempg_groups_initialize ( + &openais_group_handle, deliver_fn, confchg_fn); + totempg_groups_join ( + openais_group_handle, + &openais_group, + 1); + this_ip = &openais_config.totem_config.interfaces[0].boundto; /* diff --git a/exec/main.h b/exec/main.h index 9ff9aeb8..26db0923 100644 --- a/exec/main.h +++ b/exec/main.h @@ -148,9 +148,11 @@ enum nodeexec_message_types { extern struct totem_ip_address *this_ip; -poll_handle aisexec_poll_handle; +extern struct totempg_group openais_group; -extern struct gmi_groupname aisexec_groupname; +extern totempg_groups_handle openais_group_handle; + +poll_handle aisexec_poll_handle; extern int libais_send_response (struct conn_info *conn_info, void *msg, int mlen); @@ -158,4 +160,5 @@ extern int message_source_is_local(struct message_source *source); extern void message_source_set(struct message_source *source, struct conn_info *conn_info); + #endif /* AIS_EXEC_H_DEFINED */ diff --git a/exec/sync.c b/exec/sync.c index b70e4053..ecb67284 100644 --- a/exec/sync.c +++ b/exec/sync.c @@ -114,7 +114,7 @@ static int sync_barrier_send (struct memb_ring_id *ring_id) iovec.iov_base = (char *)&req_exec_sync_barrier_start; iovec.iov_len = sizeof (req_exec_sync_barrier_start); - res = totempg_mcast (&iovec, 1, TOTEMPG_AGREED); + res = totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED); return (res); } diff --git a/exec/totempg.c b/exec/totempg.c index a92d8f55..790afff3 100644 --- a/exec/totempg.c +++ b/exec/totempg.c @@ -82,16 +82,16 @@ */ #include -#include "totempg.h" -#include "totemsrp.h" -#include "totemmrp.h" #include #include #include #include #include - +#include "totempg.h" +#include "totemmrp.h" +#include "totemsrp.h" +#include "hdb.h" #include "swab.h" #define min(a,b) ((a) < (b)) ? a : b @@ -140,19 +140,6 @@ static int mcast_packed_msg_count = 0; struct totem_config *totempg_totem_config; -static void (*app_deliver_fn) ( - struct totem_ip_address *source_addr, - struct iovec *iovec, - int iov_len, - int endian_conversion_required) = 0; - -static void (*app_confchg_fn) ( - enum totem_configuration_type configuration_type, - struct totem_ip_address *member_list, int member_list_entries, - struct totem_ip_address *left_list, int left_list_entries, - struct totem_ip_address *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id) = 0; - struct assembly { struct totem_ip_address addr; unsigned char data[MESSAGE_SIZE_MAX]; @@ -174,11 +161,41 @@ int assembly_list_entries = 0; * the buffer is a continuation of a previously packed fragment. */ static unsigned char *fragmentation_data; -int fragment_size = 0; -int fragment_continuation = 0; + +static int fragment_size = 0; + +static int fragment_continuation = 0; static struct iovec iov_delv; +static unsigned int totempg_max_handle = 0; +struct totempg_group_instance { + void (*deliver_fn) ( + struct totem_ip_address *source_addr, + struct iovec *iovec, + int iov_len, + int endian_conversion_required); + + void (*confchg_fn) ( + enum totem_configuration_type configuration_type, + struct totem_ip_address *member_list, int member_list_entries, + struct totem_ip_address *left_list, int left_list_entries, + struct totem_ip_address *joined_list, int joined_list_entries, + struct memb_ring_id *ring_id); + + struct totempg_group *groups; + + int groups_cnt; +}; + +static struct saHandleDatabase totempg_groups_instance_database = { + .handleCount = 0, + .handles = 0, + .handleInstanceDestructor = 0 +}; + +static int send_ok (int msg_size); + static struct assembly *find_assembly (struct totem_ip_address *addr) { int i; @@ -191,6 +208,110 @@ static struct assembly *find_assembly (struct totem_ip_address *addr) return (0); } +static inline void app_confchg_fn ( + enum totem_configuration_type configuration_type, + struct totem_ip_address *member_list, int member_list_entries, + struct totem_ip_address *left_list, int left_list_entries, + struct totem_ip_address *joined_list, int joined_list_entries, + struct memb_ring_id *ring_id) +{ + int i; + SaAisErrorT error; + struct totempg_group_instance *instance; + + for (i = 0; i <= totempg_max_handle; i++) { + error = saHandleInstanceGet (&totempg_groups_instance_database, + i, (void *)&instance); + + if (error == SA_OK) { + instance->confchg_fn ( + configuration_type, + member_list, + member_list_entries, + left_list, + left_list_entries, + joined_list, + joined_list_entries, + ring_id); + + saHandleInstancePut (&totempg_groups_instance_database, i); + } + } +} +static inline int group_matches ( + struct iovec *iovec, + unsigned int iov_len, + struct totempg_group *groups_b, + unsigned int group_b_cnt, + unsigned int *adjust_iovec) +{ + unsigned short *group_len; + char *group_name; + int i; + int j; + + assert (iov_len == 1); + + group_len = (unsigned short *)iovec->iov_base; + group_name = ((char *)iovec->iov_base) + + sizeof (unsigned short) * (group_len[0] + 1); + + /* + * Calculate amount to adjust the iovec by before delivering to app + */ + *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1); + for (i = 1; i < group_len[0] + 1; i++) { + *adjust_iovec += group_len[i]; + } + + /* + * Determine if this message should be delivered to this instance + */ + for (i = 1; i < group_len[0] + 1; i++) { + for (j = 0; j < group_b_cnt; j++) { + if ((group_len[i] == groups_b[j].group_len) && + (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) { + return (1); + } + } + group_name += group_len[i]; + } + return (0); +} + + +static inline void app_deliver_fn ( + struct totem_ip_address *source_addr, + struct iovec *iovec, + unsigned int iov_len, + int endian_conversion_required) +{ + int i; + SaAisErrorT error; + struct totempg_group_instance *instance; + struct iovec stripped_iovec; + unsigned int adjust_iovec; + + for (i = 0; i <= totempg_max_handle; i++) { + error = saHandleInstanceGet (&totempg_groups_instance_database, + i, (void *)&instance); + + if (error == SA_OK) { + assert (iov_len == 1); + if (group_matches (iovec, iov_len, instance->groups, instance->groups_cnt, &adjust_iovec)) { + stripped_iovec.iov_len = iovec->iov_len - adjust_iovec; + stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec; + instance->deliver_fn ( + source_addr, + &stripped_iovec, + iov_len, + endian_conversion_required); + } + + saHandleInstancePut (&totempg_groups_instance_database, i); + } + } +} static void totempg_confchg_fn ( enum totem_configuration_type configuration_type, struct totem_ip_address *member_list, int member_list_entries, @@ -459,26 +580,10 @@ int callback_token_received_fn (enum totem_callback_token_type type, */ int totempg_initialize ( poll_handle poll_handle, - struct totem_config *totem_config, - - void (*deliver_fn) ( - struct totem_ip_address *source_addr, - struct iovec *iovec, - int iov_len, - int endian_conversion_required), - - void (*confchg_fn) ( - enum totem_configuration_type configuration_type, - struct totem_ip_address *member_list, int member_list_entries, - struct totem_ip_address *left_list, int left_list_entries, - struct totem_ip_address *joined_list, int joined_list_entries, - struct memb_ring_id *ring_id)) + struct totem_config *totem_config) { int res; - app_deliver_fn = deliver_fn; - app_confchg_fn = confchg_fn; - totempg_totem_config = totem_config; fragmentation_data = malloc (TOTEMPG_PACKET_SIZE); @@ -514,7 +619,7 @@ static unsigned char next_fragment = 1; /* * Multicast a message */ -int totempg_mcast ( +static int mcast_msg ( struct iovec *iovec, int iov_len, int guarantee) @@ -542,7 +647,7 @@ int totempg_mcast ( total_size += iovec[i].iov_len; } - if( totempg_send_ok (total_size + sizeof(unsigned short) * + if (send_ok (total_size + sizeof(unsigned short) * (mcast_packed_msg_count+1)) == 0) { return(-1); @@ -616,7 +721,7 @@ int totempg_mcast ( iovecs[0].iov_len = sizeof(struct totempg_mcast); iovecs[1].iov_base = mcast_packed_msg_lens; iovecs[1].iov_len = mcast_packed_msg_count * - sizeof(unsigned short); + sizeof(unsigned short); iovecs[2].iov_base = data_ptr; iovecs[2].iov_len = max_packet_size; assert (totemmrp_avail() > 0); @@ -662,7 +767,7 @@ int totempg_mcast ( /* * Determine if a message of msg_size could be queued */ -int totempg_send_ok ( +static int send_ok ( int msg_size) { int avail = 0; @@ -675,12 +780,11 @@ int totempg_send_ok ( * a full message, so add +1 * totempg_totem_config->net_mtu - 25 is for the totempg_mcast header */ - - total = (msg_size / (totempg_totem_config->net_mtu - 25)) + 1; return (avail >= total); } + int totempg_callback_token_create ( void **handle_out, enum totem_callback_token_type type, @@ -700,3 +804,266 @@ void totempg_callback_token_destroy ( /* * vi: set autoindent tabstop=4 shiftwidth=4 : */ + +int totempg_groups_initialize ( + totempg_groups_handle *handle, + + void (*deliver_fn) ( + struct totem_ip_address *source_addr, + struct iovec *iovec, + int iov_len, + int endian_conversion_required), + + void (*confchg_fn) ( + enum totem_configuration_type configuration_type, + struct totem_ip_address *member_list, int member_list_entries, + struct totem_ip_address *left_list, int left_list_entries, + struct totem_ip_address *joined_list, int joined_list_entries, + struct memb_ring_id *ring_id)) +{ + SaAisErrorT error; + struct totempg_group_instance *instance; + + error = saHandleCreate (&totempg_groups_instance_database, + sizeof (struct totempg_group_instance), handle); + if (error != SA_OK) { + goto error_exit; + } + + if (*handle > totempg_max_handle) { + totempg_max_handle = *handle; + } + + error = saHandleInstanceGet (&totempg_groups_instance_database, *handle, + (void *)&instance); + if (error != SA_OK) { + goto error_destroy; + } + + instance->deliver_fn = deliver_fn; + instance->confchg_fn = confchg_fn; + instance->groups = 0; + instance->groups_cnt = 0; + + saHandleInstancePut (&totempg_groups_instance_database, *handle); + + return (0); +error_destroy: + saHandleDestroy (&totempg_groups_instance_database, *handle); + +error_exit: + return (-1); +} + +int totempg_groups_join ( + totempg_groups_handle handle, + struct totempg_group *groups, + int group_cnt) +{ + SaAisErrorT error; + struct totempg_group_instance *instance; + struct totempg_group *new_groups; + + error = saHandleInstanceGet (&totempg_groups_instance_database, handle, + (void *)&instance); + if (error != SA_OK) { + goto error_exit; + } + + new_groups = realloc (instance->groups, + sizeof (struct totempg_group) * + (instance->groups_cnt + group_cnt)); + if (new_groups == 0) { + error = ENOMEM; + goto error_exit; + } + memcpy (&new_groups[instance->groups_cnt], + groups, group_cnt * sizeof (struct totempg_group)); + instance->groups = new_groups; + instance->groups_cnt = instance->groups_cnt = group_cnt; + + saHandleInstancePut (&totempg_groups_instance_database, handle); + return (0); + +error_exit: + return (error); +} + +int totempg_groups_leave ( + totempg_groups_handle handle, + struct totempg_group *groups, + int group_cnt) +{ + SaAisErrorT error; + struct totempg_group_instance *instance; + + error = saHandleInstanceGet (&totempg_groups_instance_database, handle, + (void *)&instance); + if (error != SA_OK) { + goto error_exit; + } + + saHandleInstancePut (&totempg_groups_instance_database, handle); + return (0); +error_exit: + return (error); +} + +#define MAX_IOVECS_FROM_APP 32 +#define MAX_GROUPS_PER_MSG 32 + +int totempg_groups_mcast_joined ( + totempg_groups_handle handle, + struct iovec *iovec, + int iov_len, + int guarantee) +{ + SaAisErrorT error; + struct totempg_group_instance *instance; + unsigned short group_len[MAX_GROUPS_PER_MSG + 1]; + struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP]; + int i; + + error = saHandleInstanceGet (&totempg_groups_instance_database, handle, + (void *)&instance); + if (error != SA_OK) { + goto error_exit; + } + + /* + * Build group_len structure and the iovec_mcast structure + */ + group_len[0] = instance->groups_cnt; + for (i = 0; i < instance->groups_cnt; i++) { + group_len[i + 1] = instance->groups[i].group_len; + iovec_mcast[i + 1].iov_len = instance->groups[i].group_len; + iovec_mcast[i + 1].iov_base = instance->groups[i].group; + } + iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short); + iovec_mcast[0].iov_base = group_len; + for (i = 0; i < iov_len; i++) { + iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len; + iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base; + } + + mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee); + + saHandleInstancePut (&totempg_groups_instance_database, handle); + return (0); + +error_exit: + return (error); +} + +int totempg_groups_send_ok_joined ( + totempg_groups_handle handle, + struct iovec *iovec, + int iov_len) +{ + SaAisErrorT error; + struct totempg_group_instance *instance; + unsigned int size = 0; + unsigned int i; + unsigned int res; + + error = saHandleInstanceGet (&totempg_groups_instance_database, handle, + (void *)&instance); + if (error != SA_OK) { + goto error_exit; + } + + for (i = 0; i < instance->groups_cnt; i++) { + size += instance->groups[i].group_len; + } + for (i = 0; i < iov_len; i++) { + size += iovec[i].iov_len; + } + + res = send_ok (size); + + saHandleInstancePut (&totempg_groups_instance_database, handle); + + return (res); +error_exit: + return (error); +} + +int totempg_groups_mcast_groups ( + totempg_groups_handle handle, + int guarantee, + struct totempg_group *groups, + int groups_cnt, + struct iovec *iovec, + int iov_len) +{ + SaAisErrorT error; + struct totempg_group_instance *instance; + unsigned short group_len[MAX_GROUPS_PER_MSG + 1]; + struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP]; + int i; + + error = saHandleInstanceGet (&totempg_groups_instance_database, handle, + (void *)&instance); + if (error != SA_OK) { + goto error_exit; + } + + /* + * Build group_len structure and the iovec_mcast structure + */ + group_len[0] = groups_cnt; + for (i = 0; i < groups_cnt; i++) { + group_len[i + 1] = groups[i].group_len; + iovec_mcast[i + 1].iov_len = groups[i].group_len; + iovec_mcast[i + 1].iov_base = groups[i].group; + } + iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short); + iovec_mcast[0].iov_base = group_len; + for (i = 0; i < iov_len; i++) { + iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len; + iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base; + } + + mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee); + + saHandleInstancePut (&totempg_groups_instance_database, handle); + return (0); + +error_exit: + return (error); +} + +int totempg_groups_send_ok_groups ( + totempg_groups_handle handle, + struct totempg_group *groups, + int groups_cnt, + struct iovec *iovec, + int iov_len) +{ + SaAisErrorT error; + struct totempg_group_instance *instance; + unsigned int size = 0; + unsigned int i; + unsigned int res; + + error = saHandleInstanceGet (&totempg_groups_instance_database, handle, + (void *)&instance); + if (error != SA_OK) { + goto error_exit; + } + + for (i = 0; i < groups_cnt; i++) { + size += groups[i].group_len; + } + for (i = 0; i < iov_len; i++) { + size += iovec[i].iov_len; + } + + res = send_ok (size); + + saHandleInstancePut (&totempg_groups_instance_database, handle); + return (0); +error_exit: + return (error); +} + diff --git a/exec/totempg.h b/exec/totempg.h index d7313ae2..45e9a652 100644 --- a/exec/totempg.h +++ b/exec/totempg.h @@ -40,6 +40,13 @@ #include "totemsrp.h" #include "totem.h" +typedef unsigned int totempg_groups_handle; + +struct totempg_group { + void *group; + int group_len; +}; + #define TOTEMPG_AGREED 0 #define TOTEMPG_SAFE 1 @@ -49,11 +56,28 @@ */ /* - * Initialize the totem process group abstraction + * Initialize the totem process groups abstraction */ int totempg_initialize ( poll_handle poll_handle, - struct totem_config *totem_config, + struct totem_config *totem_config +); + +void totempg_finalize (void); + +int totempg_callback_token_create (void **handle_out, + enum totem_callback_token_type type, + int delete, + int (*callback_fn) (enum totem_callback_token_type type, void *), + void *data); + +void totempg_callback_token_destroy (void *handle); + +/* + * Initialize a groups instance + */ +int totempg_groups_initialize ( + totempg_groups_handle *handle, void (*deliver_fn) ( struct totem_ip_address *source_addr, @@ -68,28 +92,43 @@ int totempg_initialize ( struct totem_ip_address *joined_list, int joined_list_entries, struct memb_ring_id *ring_id)); -void totempg_finalize (void); +int totempg_groups_finalize ( + totempg_groups_handle handle); -/* - * Multicast a message - */ -int totempg_mcast ( +int totempg_groups_join ( + totempg_groups_handle handle, + struct totempg_group *groups, + int gruop_cnt); + +int totempg_groups_leave ( + totempg_groups_handle handle, + struct totempg_group *groups, + int gruop_cnt); + +int totempg_groups_mcast_joined ( + totempg_groups_handle handle, struct iovec *iovec, int iov_len, int guarantee); -/* - * Determine if a message of msg_size could be queued - */ -int totempg_send_ok ( - int msg_size); - -void totempg_callback_token_destroy (void *handle); - -int totempg_callback_token_create (void **handle_out, - enum totem_callback_token_type type, - int delete, - int (*callback_fn) (enum totem_callback_token_type type, void *), - void *data); +int totempg_groups_send_ok_joined ( + totempg_groups_handle handle, + struct iovec *iovec, + int iov_len); + +int totempg_groups_mcast_groups ( + totempg_groups_handle handle, + int guarantee, + struct totempg_group *groups, + int groups_cnt, + struct iovec *iovec, + int iov_len); +int totempg_groups_send_ok_groups ( + totempg_groups_handle handle, + struct totempg_group *groups, + int groups_cnt, + struct iovec *iovec, + int iov_len); + #endif /* TOTEMPG_H_DEFINED */