Fix some problems in the open channel recovery code that could cause

remaining nodes to have an incorrect count when a node left the
membership.

(Logical change 1.95)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@339 fd59a12c-fef9-0310-b244-a6a79926bd2f
This commit is contained in:
Mark Haverkamp 2004-10-15 14:35:37 +00:00
parent 40e0554c19
commit 486ecf2099

View File

@ -32,8 +32,6 @@
*/
//#define DEBUG
//#define EVT_EVENT_LIST_CHECK
//#define EVT_ALLOC_CHECK
#include <sys/types.h>
#include <malloc.h>
#include <errno.h>
@ -246,7 +244,7 @@ struct event_svr_channel_subscr;
struct open_count {
SaClmNodeIdT oc_node_id;
uint32_t oc_open_count;
int32_t oc_open_count;
};
/*
@ -255,8 +253,7 @@ struct open_count {
* esc_channel_name: The name of this channel.
* esc_total_opens: The total number of opens on this channel including
* other nodes.
* esc_local_opens: The total number of opens on this channel for this
* node.
* esc_local_opens: The number of opens on this channel for this node.
* esc_oc_size: The total number of entries in esc_node_opens;
* esc_node_opens: list of node IDs and how many opens are associated.
* esc_retained_count: How many retained events for this channel
@ -266,8 +263,8 @@ struct open_count {
*/
struct event_svr_channel_instance {
SaNameT esc_channel_name;
uint32_t esc_total_opens;
uint32_t esc_local_opens;
int32_t esc_total_opens;
int32_t esc_local_opens;
uint32_t esc_oc_size;
struct open_count *esc_node_opens;
uint32_t esc_retained_count;
@ -540,7 +537,7 @@ static int check_open_size(struct event_svr_channel_instance *eci)
return -1;
}
memset(&eci->esc_node_opens[eci->esc_oc_size], 0,
total_members - eci->esc_oc_size);
sizeof(struct open_count) * (total_members - eci->esc_oc_size));
eci->esc_oc_size = total_members;
}
return 0;
@ -548,6 +545,7 @@ static int check_open_size(struct event_svr_channel_instance *eci)
/*
* Find the specified node ID in the node list of the channel.
* If it's not in the list, add it.
*/
static struct open_count* find_open_count(
struct event_svr_channel_instance *eci,
@ -558,6 +556,7 @@ static struct open_count* find_open_count(
for (i = 0; i < eci->esc_oc_size; i++) {
if (eci->esc_node_opens[i].oc_node_id == 0) {
eci->esc_node_opens[i].oc_node_id = node_id;
eci->esc_node_opens[i].oc_open_count = 0;
}
if (eci->esc_node_opens[i].oc_node_id == node_id) {
return &eci->esc_node_opens[i];
@ -569,6 +568,23 @@ static struct open_count* find_open_count(
return 0;
}
static void dump_chan_opens(struct event_svr_channel_instance *eci)
{
int i;
log_printf(LOG_LEVEL_NOTICE,
"(EVT) Channel %s, total %d, local %d\n",
eci->esc_channel_name.value,
eci->esc_total_opens,
eci->esc_local_opens);
for (i = 0; i < eci->esc_oc_size; i++) {
if (eci->esc_node_opens[i].oc_node_id == 0) {
break;
}
log_printf(LOG_LEVEL_NOTICE, "(EVT) Node 0x%x, count %d\n",
eci->esc_node_opens[i].oc_node_id,
eci->esc_node_opens[i].oc_open_count);
}
}
/*
* Replace the current open count for a node with the specified value.
@ -585,8 +601,19 @@ static int set_open_count(struct event_svr_channel_instance *eci,
oc = find_open_count(eci, node_id);
if (oc) {
eci->esc_total_opens -=
(eci->esc_node_opens[i].oc_open_count + open_count);
if (oc->oc_open_count) {
if (oc->oc_open_count != open_count) {
log_printf(LOG_LEVEL_ERROR,
"(EVT) Channel open count error\n");
dump_chan_opens(eci);
}
return 0;
}
log_printf(LOG_LEVEL_DEBUG,
"(EVT) Set count: Chan %s for node 0x%x, was %d, now %d\n",
eci->esc_channel_name.value,
node_id, eci->esc_node_opens[i].oc_open_count, open_count);
eci->esc_total_opens += open_count;
oc->oc_open_count = open_count;
return 0;
}
@ -607,6 +634,9 @@ static int inc_open_count(struct event_svr_channel_instance *eci,
return i;
}
if (node_id == my_node_id) {
eci->esc_local_opens++;
}
oc = find_open_count(eci, node_id);
if (oc) {
eci->esc_total_opens++;
@ -631,10 +661,17 @@ static int dec_open_count(struct event_svr_channel_instance *eci,
return i;
}
if (node_id == my_node_id) {
eci->esc_local_opens--;
}
oc = find_open_count(eci, node_id);
if (oc) {
eci->esc_total_opens--;
oc->oc_open_count--;
if ((eci->esc_total_opens < 0) || (oc->oc_open_count < 0)) {
log_printf(LOG_LEVEL_ERROR, "(EVT) Channel open decrement error\n");
dump_chan_opens(eci);
}
return 0;
}
return -1;
@ -650,7 +687,7 @@ static void delete_channel(struct event_svr_channel_instance *eci)
log_printf(LOG_LEVEL_DEBUG,
"(EVT) Called Delete channel %s t %d, l %d, r %d\n",
eci->esc_channel_name.value,
eci->esc_total_opens, eci->esc_local_opens,
eci->esc_total_opens, eci->esc_local_opens,
eci->esc_retained_count);
if ((eci->esc_retained_count == 0) && (eci->esc_total_opens == 0)) {
log_printf(LOG_LEVEL_DEBUG, "(EVT) Delete channel %s\n",
@ -662,6 +699,14 @@ static void delete_channel(struct event_svr_channel_instance *eci)
eci->esc_channel_name.value);
return;
}
/*
* adjust if we're sending open counts on a config change
*/
if (in_cfg_change && (&eci->esc_entry == next_chan)) {
next_chan = eci->esc_entry.next;
}
list_del(&eci->esc_entry);
if (eci->esc_node_opens) {
free(eci->esc_node_opens);
@ -685,25 +730,33 @@ static int remove_open_count(
*/
for (i = 0; i < eci->esc_oc_size; i++) {
if (eci->esc_node_opens[i].oc_node_id == 0) {
eci->esc_node_opens[i].oc_node_id = node_id;
break;
}
log_printf(LOG_LEVEL_DEBUG, "(EVT) roc: %x/%x, t %d, oc %d\n",
node_id, eci->esc_node_opens[i].oc_node_id,
eci->esc_total_opens, eci->esc_node_opens[i].oc_open_count);
if (eci->esc_node_opens[i].oc_node_id == node_id) {
eci->esc_total_opens -= eci->esc_node_opens[i].oc_open_count;
for (j = i+1; j < eci->esc_oc_size; j++, i++) {
eci->esc_node_opens[i].oc_node_id =
eci->esc_node_opens[j].oc_node_id;
eci->esc_node_opens[i].oc_open_count =
eci->esc_node_opens[j].oc_open_count;
}
eci->esc_node_opens[eci->esc_oc_size-1].oc_node_id = 0;
eci->esc_node_opens[eci->esc_oc_size-1].oc_open_count = 0;
}
/*
* Remove the channel if it's not being used anymore
*/
delete_channel(eci);
return 0;
/*
* Remove the channel if it's not being used anymore
*/
delete_channel(eci);
return 0;
}
}
return -1;
}
@ -1174,10 +1227,6 @@ static SaErrorT get_event_id(uint64_t *event_id)
}
#ifdef EVT_ALLOC_CHECK
static uint32_t evt_alloc = 0;
static uint32_t evt_free = 0;
#endif
/*
* Free up an event structure if it isn't being used anymore.
@ -1194,13 +1243,6 @@ free_event_data(struct event_data *edp)
free(edp->ed_delivered);
}
#ifdef EVT_ALLOC_CHECK
evt_free++;
if ((evt_free % 1000) == 0) {
log_printf(LOG_LEVEL_NOTICE, "(EVT) evt alloc: %u, *evt free: %u\n",
evt_alloc, evt_free);
}
#endif
free(edp);
}
@ -1229,7 +1271,9 @@ event_retention_timeout(void *data)
* Check to see it the channel isn't in use anymore.
*/
edp->ed_my_chan->esc_retained_count--;
delete_channel(edp->ed_my_chan);
if (edp->ed_my_chan->esc_retained_count == 0) {
delete_channel(edp->ed_my_chan);
}
free_event_data(edp);
}
@ -1271,7 +1315,9 @@ clear_retention_time(SaEvtEventIdT event_id)
* Check to see if the channel isn't in use anymore.
*/
edp->ed_my_chan->esc_retained_count--;
delete_channel(edp->ed_my_chan);
if (edp->ed_my_chan->esc_retained_count == 0) {
delete_channel(edp->ed_my_chan);
}
free_event_data(edp);
break;
}
@ -1515,13 +1561,6 @@ filter_undelivered_events(struct event_svr_channel_open *op_chan)
list_init(&cel->cel_entry);
esip->esi_nevents--;
#ifdef EVT_EVENT_LIST_CHECK
if (esip->esi_nevents < 0) {
log_printf(LOG_LEVEL_NOTICE,
"(EVT) event count went negative\n");
esip->esi_nevents = 0;
}
#endif
free_event_data(cel->cel_event);
free(cel);
next_event:
@ -1720,13 +1759,6 @@ make_local_event(struct lib_event_data *p,
eps++;
}
#ifdef EVT_ALLOC_CHECK
evt_alloc++;
if ((evt_alloc % 1000) == 0) {
log_printf(LOG_LEVEL_NOTICE, "(EVT) *evt alloc: %u, evt free: %u\n",
evt_alloc, evt_free);
}
#endif
ed->ed_ref_count++;
return ed;
}
@ -2334,15 +2366,6 @@ static int lib_evt_event_data_get(struct conn_info *conn_info, void *message)
esip->esi_queue_blocked = 0;
log_printf(LOG_LEVEL_DEBUG, "(EVT) unblock\n");
}
#ifdef EVT_EVENT_LIST_CHECK
if (esip->esi_nevents < 0) {
log_printf(LOG_LEVEL_NOTICE, "(EVT) event count went negative\n");
if (!list_empty(&esip->esi_events[i])) {
log_printf(LOG_LEVEL_NOTICE, "(EVT) event list isn't empty\n");
}
esip->esi_nevents = 0;
}
#endif
edp = cel->cel_event;
edp->ed_event.led_lib_channel_handle = cel->cel_chan_handle;
edp->ed_event.led_sub_id = cel->cel_sub_id;
@ -2377,11 +2400,11 @@ data_get_done:
*/
static void remove_chan_open_info(SaClmNodeIdT node_id)
{
struct list_head *l;
struct list_head *l, *nxt;
struct event_svr_channel_instance *eci;
for (l = esc_head.next; l != &esc_head; l = l->next) {
for (l = esc_head.next; l != &esc_head; l = nxt) {
nxt = l->next;
eci = list_entry(l, struct event_svr_channel_instance, esc_entry);
remove_open_count(eci, node_id);
@ -2476,7 +2499,7 @@ static int evt_conf_change(
while (left_list_entries--) {
md = evt_find_node(left_list->sin_addr);
if (md == 0) {
log_printf(LOG_LEVEL_DEBUG,
log_printf(LOG_LEVEL_WARNING,
"(EVT) Can't find cluster node at %s\n",
inet_ntoa(left_list->sin_addr));
/*
@ -2508,7 +2531,7 @@ static int evt_conf_change(
*/
if (configuration_type == GMI_CONFIGURATION_REGULAR) {
if (in_cfg_change) {
log_printf(LOG_LEVEL_DEBUG,
log_printf(LOG_LEVEL_NOTICE,
"(EVT) Already in config change, Starting over, m %d, c %d\n",
total_members, checked_in);
}
@ -2553,23 +2576,6 @@ static int evt_finalize(struct conn_info *conn_info)
saHandleDestroy(&esip->esi_hdb, eco->eco_my_handle);
}
#ifdef EVT_EVENT_LIST_CHECK
{
int i;
if (esip->esi_nevents) {
log_printf(LOG_LEVEL_WARNING,
"(EVT) %d Events left on delivery list after finalize\n",
esip->esi_nevents);
}
for (i = SA_EVT_HIGHEST_PRIORITY; i <= SA_EVT_LOWEST_PRIORITY; i++) {
if (!list_empty(&esip->esi_events[i])) {
log_printf(LOG_LEVEL_WARNING,
"(EVT) Events list not empty after finalize\n");
}
}
}
#endif
/*
* Delete track entry if there is one
@ -2669,21 +2675,12 @@ static int evt_remote_evt(void *msg, struct in_addr source_addr)
eci = find_channel(&evtpkt->led_chan_name);
/*
* If we don't kmow about the channel, then no one has opened it yet.
* We create the channel if there is a retention time, otherwise we can
* just throw it away since no one here is looking for this event.
* We shouldn't see an event for a channel that we don't know about.
*/
if (!eci) {
if (evtpkt->led_retention_time) {
eci = create_channel(&evtpkt->led_chan_name);
if (!eci) {
log_printf(LOG_LEVEL_WARNING, "(EVT) Can't create channel %s\n",
evtpkt->led_chan_name.value);
}
} else {
return 0;
}
log_printf(LOG_LEVEL_WARNING, "(EVT) Channel %s doesn't exist\n",
evtpkt->led_chan_name.value);
return 0;
}
if (check_last_event(evtpkt, source_addr)) {
@ -2824,17 +2821,12 @@ static int evt_remote_recovery_evt(void *msg, struct in_addr source_addr)
eci = find_channel(&evtpkt->led_chan_name);
/*
* If the channel doesn't exist, then create it
* since we're in recovery mode, so that we can save this message.
* We shouldn't see an event for a channel that we don't know about.
*/
if (!eci) {
eci = create_channel(&evtpkt->led_chan_name);
if (!eci) {
log_printf(LOG_LEVEL_WARNING, "(EVT) Can't create channel %s\n",
evtpkt->led_chan_name.value);
return 0;
}
log_printf(LOG_LEVEL_WARNING, "(EVT) Channel %s doesn't exist\n",
evtpkt->led_chan_name.value);
return 0;
}
evt = make_local_event(evtpkt, eci);
@ -2999,7 +2991,6 @@ static int evt_remote_chan_op(void *msg, struct in_addr source_addr)
inc_open_count(eci, mn->mn_node_info.nodeId);
if (mn->mn_node_info.nodeId == my_node->nodeId) {
eci->esc_local_opens++;
/*
* Complete one of our pending open requests
*/
@ -3015,7 +3006,7 @@ static int evt_remote_chan_op(void *msg, struct in_addr source_addr)
log_printf(LOG_LEVEL_DEBUG,
"(EVT) Open channel %s t %d, l %d, r %d\n",
eci->esc_channel_name.value,
eci->esc_total_opens, eci->esc_local_opens,
eci->esc_total_opens, eci->esc_local_opens,
eci->esc_retained_count);
break;
}
@ -3031,18 +3022,19 @@ static int evt_remote_chan_op(void *msg, struct in_addr source_addr)
if (!eci) {
log_printf(LOG_LEVEL_NOTICE,
"(EVT) Channel close request for %s not found\n",
cpkt->u.chc_chan);
cpkt->u.chc_chan.value);
break;
}
if (mn->mn_node_info.nodeId == my_node->nodeId) {
eci->esc_local_opens--;
}
/*
* if last instance, we can free up assocated data.
*/
dec_open_count(eci, mn->mn_node_info.nodeId);
log_printf(LOG_LEVEL_DEBUG,
"(EVT) Close channel %s t %d, l %d, r %d\n",
eci->esc_channel_name.value,
eci->esc_total_opens, eci->esc_local_opens,
eci->esc_retained_count);
delete_channel(eci);
break;