From acba87e17943e9095eeae2ec4d13e051ca17f79f Mon Sep 17 00:00:00 2001 From: Mark Haverkamp Date: Thu, 2 Sep 2004 17:16:25 +0000 Subject: [PATCH] Completed most of the event service functionality. (Logical change 1.63) git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@223 fd59a12c-fef9-0310-b244-a6a79926bd2f --- exec/evt.c | 2019 ++++++++++++++++++++++++++++++++++++++++++++++++++-- exec/evt.h | 27 +- 2 files changed, 1964 insertions(+), 82 deletions(-) diff --git a/exec/evt.c b/exec/evt.c index df454630..85abb2f2 100644 --- a/exec/evt.c +++ b/exec/evt.c @@ -14,7 +14,7 @@ * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. - * - Neither the name of the MontaVista Software, Inc. nor the names of its + * - Neither the name of the Open Source Development Lab nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * @@ -31,10 +31,18 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ -#define DEBUG +//#define DEBUG +//#define EVT_EVENT_LIST_CHECK +//#define EVT_ALLOC_CHECK +//#define NO_DUPLICATES #include #include #include +#include +#include +#include +#include +#include #include "../include/ais_types.h" #include "../include/ais_msg.h" #include "../include/list.h" @@ -45,25 +53,30 @@ #include "main.h" #include "print.h" #include "gmi.h" +#include "hdb.h" +#include "clm.h" #include "evt.h" static int message_handler_req_lib_activatepoll (struct conn_info *conn_info, void *message); static int lib_evt_open_channel(struct conn_info *conn_info, void *message); static int lib_evt_close_channel(struct conn_info *conn_info, void *message); -static int lib_evt_channel_subscribe(struct conn_info *conn_info, +static int lib_evt_event_subscribe(struct conn_info *conn_info, void *message); -static int lib_evt_channel_unsubscribe(struct conn_info *conn_info, +static int lib_evt_event_unsubscribe(struct conn_info *conn_info, void *message); static int lib_evt_event_publish(struct conn_info *conn_info, void *message); static int lib_evt_event_clear_retentiontime(struct conn_info *conn_info, void *message); +static int lib_evt_event_data_get(struct conn_info *conn_info, + void *message); static int evt_conf_change( struct sockaddr_in *member_list, int member_list_entries, struct sockaddr_in *left_list, int left_list_entries, struct sockaddr_in *joined_list, int joined_list_entries); -static int evt_init(struct conn_info *conn_info, void *msg); -static int evt_exit(struct conn_info *conn_info); + +static int evt_initialize(struct conn_info *conn_info, void *msg); +static int evt_finalize(struct conn_info *conn_info); static int evt_exec_init(void); static struct libais_handler evt_libais_handlers[] = { @@ -86,14 +99,14 @@ static struct libais_handler evt_libais_handlers[] = { .gmi_prio = GMI_PRIO_RECOVERY }, { - .libais_handler_fn = lib_evt_channel_subscribe, - .response_size = sizeof(struct res_evt_channel_subscribe), + .libais_handler_fn = lib_evt_event_subscribe, + .response_size = sizeof(struct res_evt_event_subscribe), .response_id = MESSAGE_RES_EVT_SUBSCRIBE, .gmi_prio = GMI_PRIO_RECOVERY }, { - .libais_handler_fn = lib_evt_channel_unsubscribe, - .response_size = sizeof(struct res_evt_channel_unsubscribe), + .libais_handler_fn = lib_evt_event_unsubscribe, + .response_size = sizeof(struct res_evt_event_unsubscribe), .response_id = MESSAGE_RES_EVT_UNSUBSCRIBE, .gmi_prio = GMI_PRIO_RECOVERY }, @@ -107,6 +120,12 @@ static struct libais_handler evt_libais_handlers[] = { .libais_handler_fn = lib_evt_event_clear_retentiontime, .response_size = sizeof(struct res_evt_event_clear_retentiontime), .response_id = MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME, + .gmi_prio = GMI_PRIO_MED + }, + { + .libais_handler_fn = lib_evt_event_data_get, + .response_size = sizeof(struct lib_event_data), + .response_id = MESSAGE_RES_EVT_EVENT_DATA, .gmi_prio = GMI_PRIO_RECOVERY }, }; @@ -128,26 +147,224 @@ struct service_handler evt_service_handler = { .aisexec_handler_fns_count = sizeof(evt_exec_handler_fns) / sizeof(int (*)), .confchg_fn = evt_conf_change, - .libais_init_fn = evt_init, - .libais_exit_fn = evt_exit, + .libais_init_fn = evt_initialize, + .libais_exit_fn = evt_finalize, .aisexec_init_fn = evt_exec_init }; +/* + * list of all retained events + * struct event_data + */ +static DECLARE_LIST_INIT(retained_list); + +/* + * list of all event channel information + * struct event_svr_channel_instance + */ +static DECLARE_LIST_INIT(esc_head); + +/* + * list of all active event conn_info structs. + */ +static DECLARE_LIST_INIT(ci_head); + +#define min(a,b) ((a) < (b) ? (a) : (b)) + +#define ipaddr(a) \ + ((a) & 0xff), \ + (((a) >> 8) & 0xff), \ + (((a) >> 16) & 0xff), \ + (((a) >> 24) & 0xff) + +/* + * Throttle event delivery to applications to keep + * the exec from using too much memory if the app is + * slow to process its events. + */ +#define MAX_EVT_DELIVERY_QUEUE 1000 +#define MIN_EVT_QUEUE_RESUME (MAX_EVT_DELIVERY_QUEUE / 2) + +#define LOST_PUB "EVENT_SERIVCE" +#define LOST_CHAN "LOST EVENT" +/* + * Event to send when the delivery queue gets too full + */ +char lost_evt[] = SA_EVT_LOST_EVENT; +static int dropped_event_size; +static struct event_data *dropped_event; +struct evt_pattern { + SaEvtEventPatternT pat; + char str[sizeof(lost_evt)]; +}; +static struct evt_pattern dropped_pattern = { + .pat = {&dropped_pattern.str[0], + sizeof(lost_evt)}, + .str = {SA_EVT_LOST_EVENT} +}; + +SaNameT lost_chan = { + .value = LOST_CHAN, + .length = sizeof(LOST_CHAN) +}; + +SaNameT dropped_publisher = { + .value = LOST_PUB, + .length = sizeof(LOST_PUB) +}; + +struct event_svr_channel_open; +struct event_svr_channel_subscr; + +/* + * Structure to contain global channel releated information + * + * esc_channel_name: The name of this channel. + * esc_open_chans: list of opens of this channel. + * (event_svr_channel_open.eco_entry) + * esc_entry: links to other channels. (used by esc_head) + */ +struct event_svr_channel_instance { + SaNameT esc_channel_name; + struct list_head esc_open_chans; + struct list_head esc_entry; +}; + +/* + * has the event data in the correct format to send to the library API + * with aditional field for accounting. + * + * ed_ref_count: how many other strutures are referencing. + * ed_retained: retained event list. + * ed_timer_handle: Timer handle for retained event expiration. + * ed_delivered: arrays of open channel pointers that this event + * has been delivered to. (only used for events + * with a retention time). + * ed_delivered_count: Number of entries available in ed_delivered. + * ed_delivered_next: Next free spot in ed_delivered + * ed_my_chan: pointer to the global channel instance associated + * with this event. + * ed_event: The event data formatted to be ready to send. + */ +struct event_data { + uint32_t ed_ref_count; + struct list_head ed_retained; + poll_timer_handle ed_timer_handle; + struct event_svr_channel_open **ed_delivered; + uint32_t ed_delivered_count; + uint32_t ed_delivered_next; + struct event_svr_channel_instance *ed_my_chan; + struct lib_event_data ed_event; +}; + +/* + * Contains a list of pending events to be delivered to a subscribed + * application. + * + * cel_chan_handle: associated library channel handle + * cel_sub_id: associated library subscription ID + * cel_event: event structure to deliver. + * cel_entry: list of pending events + * (struct event_server_instance.esi_events) + */ +struct chan_event_list { + uint32_t cel_chan_handle; + uint32_t cel_sub_id; + struct event_data* cel_event; + struct list_head cel_entry; +}; + +/* + * Contains information about each open for a given channel + * + * eco_flags: How the channel was opened. + * eco_lib_handle: channel handle in the app. Used for event delivery. + * eco_my_handle: the handle used to access this data structure. + * eco_channel: Pointer to global channel info. + * eco_entry: links to other opeinings of this channel. + * eco_instance_entry: links to other channel opeinings for the + * associated server instance. + * eco_subscr: head of list of sbuscriptions for this channel open. + * (event_svr_channel_subscr.ecs_entry) + * eco_conn_info: refrence to EvtInitialize who owns this open. + */ +struct event_svr_channel_open { + uint8_t eco_flags; + uint32_t eco_lib_handle; + uint32_t eco_my_handle; + struct event_svr_channel_instance *eco_channel; + struct list_head eco_entry; + struct list_head eco_instance_entry; + struct list_head eco_subscr; + struct conn_info *eco_conn_info; +}; + +/* + * Contains information about each channel subscription + * + * ecs_open_chan: Link to our open channel. + * ecs_sub_id: Subscription ID. + * ecs_filter_count: number of filters in ecs_filters + * ecs_filters: filters for determining event delivery. + * ecs_entry: Links to other subscriptions to this channel opening. + */ +struct event_svr_channel_subscr { + struct event_svr_channel_open *ecs_open_chan; + uint32_t ecs_sub_id; + SaEvtEventFilterArrayT *ecs_filters; + struct list_head ecs_entry; +}; + + +/* + * Member node data + * mn_node_info: cluster node info from membership + * mn_last_evt_id: last seen event ID for this node + * mn_started: Indicates that event service has started + * on this node. + * mn_next: pointer to the next node in the hash chain. + */ +struct member_node_data { + SaClmClusterNodeT mn_node_info; + SaEvtEventIdT mn_last_evt_id; + SaClmNodeIdT mn_started; + struct member_node_data *mn_next; +}; + +/* + * Get the time of day and convert to nanoseconds + */ +static SaTimeT clustTimeNow(void) +{ + struct timeval tv; + SaTimeT time_now; + + if (gettimeofday(&tv, 0)) { + return 0ULL; + } + + time_now = (SaTimeT)(tv.tv_sec) * 1000000000ULL; + time_now += (SaTimeT)(tv.tv_usec) * 1000ULL; + + return time_now; +} /* * Take the filters we received from the application via the library and * make them into a real SaEvtEventFilterArrayT */ -static SaErrorT evtfilt_to_aisfilt(struct req_evt_channel_subscribe *req, +static SaErrorT evtfilt_to_aisfilt(struct req_evt_event_subscribe *req, SaEvtEventFilterArrayT **evtfilters) { - SaEvtEventFilterArrayT *filta = (SaEvtEventFilterArrayT *)req->ics_filter_data; + SaEvtEventFilterArrayT *filta = + (SaEvtEventFilterArrayT *)req->ics_filter_data; SaEvtEventFilterArrayT *filters; SaEvtEventFilterT *filt = (void *)filta + sizeof(SaEvtEventFilterArrayT); SaUint8T *str = (void *)filta + sizeof(SaEvtEventFilterArrayT) + (sizeof(SaEvtEventFilterT) * filta->filtersNumber); int i; + int j; filters = malloc(sizeof(SaEvtEventFilterArrayT)); if (!filters) { @@ -157,17 +374,28 @@ static SaErrorT evtfilt_to_aisfilt(struct req_evt_channel_subscribe *req, filters->filtersNumber = filta->filtersNumber; filters->filters = malloc(sizeof(SaEvtEventFilterT) * filta->filtersNumber); + if (!filters->filters) { + free(filters); + return SA_ERR_NO_MEMORY; + } for (i = 0; i < filters->filtersNumber; i++) { filters->filters[i].filter.pattern = malloc(filt[i].filter.patternSize); - /* - * TODO: Back out of previous allocs if malloc fails - */ + + if (!filters->filters[i].filter.pattern) { + for (j = 0; j < i; j++) { + free(filters->filters[j].filter.pattern); + } + free(filters->filters); + free(filters); + return SA_ERR_NO_MEMORY; + } filters->filters[i].filter.patternSize = filt[i].filter.patternSize; memcpy(filters->filters[i].filter.pattern, str, filters->filters[i].filter.patternSize); + filters->filters[i].filterType = filt[i].filterType; str += filters->filters[i].filter.patternSize; } @@ -176,7 +404,336 @@ static SaErrorT evtfilt_to_aisfilt(struct req_evt_channel_subscribe *req, return SA_OK; } +/* + * Free up filter data + */ +static void free_filters(SaEvtEventFilterArrayT *fp) +{ + int i; + for (i = 0; i < fp->filtersNumber; i++) { + free(fp->filters[i].filter.pattern); + } + + free(fp->filters); + free(fp); +} + +/* + * Look up a channel in the global channel list + */ +static struct event_svr_channel_instance * +find_channel(SaNameT *chan_name) +{ + struct list_head *l; + struct event_svr_channel_instance *eci; + + for (l = esc_head.next; l != &esc_head; l = l->next) { + + eci = list_entry(l, struct event_svr_channel_instance, esc_entry); + if (chan_name->length != eci->esc_channel_name.length) { + continue; + } + if (memcmp(chan_name->value, eci->esc_channel_name.value, + chan_name->length) != 0) { + continue; + } + return eci; + } + return 0; +} + +/* + * Create and initialize a channel instance structure + */ +static struct event_svr_channel_instance *create_channel(SaNameT *cn) +{ + struct event_svr_channel_instance *eci; + eci = (struct event_svr_channel_instance *) malloc(sizeof(*eci)); + if (!eci) { + return (eci); + } + + memset(eci, 0, sizeof(eci)); + list_init(&eci->esc_entry); + list_init(&eci->esc_open_chans); + eci->esc_channel_name.length = + cn->length; + memcpy(eci->esc_channel_name.value, cn->value, cn->length); + list_add(&eci->esc_entry, &esc_head); + + return eci; +} + +/* + * Return a pointer to the global channel information. + * Possibly create the channel structure and notify remote nodes + * of channel creation. + */ +static SaErrorT evt_open_channel(SaNameT *cn, SaUint8T flgs, + SaTimeT timeout, struct event_svr_channel_instance **eci, + struct libevt_ci *esip) +{ + struct event_svr_channel_instance *ecp; + struct req_evt_chan_command cpkt; + struct iovec chn_iovec; + int res; + SaErrorT ret; + + ret = SA_OK; + + *eci = find_channel(cn); + + /* + * No need to send anything to the cluster since we're already + * receiving messages for this channel. + */ + if (*eci) { + goto chan_open_end; + } + + /* + * If the create flag set, we can make the channel. Otherwise, + * it's an error since we're notified of channels being created and + * opened. + */ + if (flgs & SA_EVT_CHANNEL_CREATE) { + *eci = create_channel(cn); + ecp = *eci; + } else { + ret = SA_ERR_NOT_EXIST; + goto chan_open_end; + } + + /* + * create the channel packet to send. Tell the rest of the cluster + * that we've created the channel. + */ + memset(&cpkt, 0, sizeof(cpkt)); + cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD; + cpkt.chc_head.size = sizeof(cpkt); + cpkt.chc_op = MESSAGE_REQ_EVT_OPEN_CHANNEL; + cpkt.u.chc_chan = *cn; + chn_iovec.iov_base = &cpkt; + chn_iovec.iov_len = cpkt.chc_head.size; + res = gmi_mcast (&aisexec_groupname, &chn_iovec, 1, GMI_PRIO_MED); + if (res != 0) { + ret = SA_ERR_SYSTEM; + } + +chan_open_end: + return ret; + +} + +#ifdef NO_DUPLICATES +/* + * Node data access functions. Used to detect and filter duplicate + * delivery of messages. + * + * add_node: Add a new member node to our list. + * remove_node: Remove a node that left membership. + * find_node: Given the node ID return a pointer to node information. + * + * TODO: There is a problem when receiving config updates. When we get the + * TODO: update, the cluster node table hasn't been updated yet and we + * TODO: can't find the node to put in this list. + * + */ +#define NODE_HASH_SIZE 256 +static struct member_node_data *nl[NODE_HASH_SIZE] = {0}; +inline int +hash_node_id(SaClmNodeIdT node_id) +{ + return node_id & (NODE_HASH_SIZE - 1); +} + +static struct member_node_data **lookup_node(SaClmNodeIdT node_id) +{ + int index = hash_node_id(node_id); + struct member_node_data **nlp; + + nlp = &nl[index]; + for (nlp = &nl[index]; *nlp; nlp = &((*nlp)->mn_next)) { + if ((*nlp)->mn_node_info.nodeId == node_id) { + break; + } + } + + return nlp; +} + +static struct member_node_data * +evt_find_node(SaClmNodeIdT node_id) +{ + struct member_node_data **nlp; + + nlp = lookup_node(node_id); + + if (!nlp) { + log_printf(LOG_LEVEL_DEBUG, "find_node: Got NULL nlp?\n"); + return 0; + } + + return *nlp; +} + +static SaErrorT +evt_add_node(SaClmClusterNodeT *ni) +{ + struct member_node_data **nlp; + struct member_node_data *nl; + SaErrorT err = SA_ERR_EXIST; + + nlp = lookup_node(ni->nodeId); + + if (!nlp) { + log_printf(LOG_LEVEL_DEBUG, "add_node: Got NULL nlp?\n"); + goto an_out; + } + + if (*nlp) { + goto an_out; + } + + *nlp = malloc(sizeof(struct member_node_data)); + if (!nlp) { + return SA_ERR_NO_MEMORY; + } + nl = *nlp; + if (nl) { + memset(nl, 0, sizeof(*nl)); + err = SA_OK; + } + + nl->mn_node_info.nodeId = ni->nodeId; + nl->mn_node_info.nodeAddress = ni->nodeAddress; + nl->mn_node_info.nodeName = ni->nodeName; + nl->mn_node_info.clusterName = ni->clusterName; + nl->mn_node_info.member = ni->member; + nl->mn_node_info.bootTimestamp = ni->bootTimestamp; + +an_out: + return err; +} + +static SaErrorT +evt_remove_node(SaClmClusterNodeT *ni) +{ + struct member_node_data **nlp; + struct member_node_data *nl; + SaErrorT err = SA_ERR_NOT_EXIST; + + nlp = lookup_node(ni->nodeId); + + if (!nlp) { + log_printf(LOG_LEVEL_DEBUG, "remove_node: Got NULL nlp?\n"); + goto an_out; + } + + if (!(*nlp)) { + goto an_out; + } + + nl = *nlp; + + *nlp = nl->mn_next; + free(*nlp); + err = SA_OK; + +an_out: + return err; +} +#endif + + +/* + * Send our retained events to the specified node id. + * Called when a remote event server starts up and opens a channel + * that has retained events that we published. + * + * TODO: Fill me in + */ +static void send_retained(SaNameT *cn, SaClmNodeIdT node_id) +{ + log_printf(LOG_LEVEL_DEBUG, + "TODO: Send retained messages for %s to 0x%x\n", + cn->value, node_id); +} + +/* + * purge retained events from the specified node id. + * Called when a remote event server terminates. + * + * TODO: Fill me in + */ +static void purge_retained(SaClmNodeIdT node_id) +{ + log_printf(LOG_LEVEL_DEBUG, "TODO: Purge retained messages for node 0x%x\n", node_id); +} + +#ifdef NO_DUPLICATES +/* + * See if we've already seen a message with this ID from + * this node. Return 0 for not seen, 1 for seen. + * We also bump the last seen event for the next time. So only call this + * once per event being proccessed. + */ +static int is_duplicate_event(struct lib_event_data *evtpkt, + SaClmClusterNodeT *cn) +{ + struct member_node_data *nd; + + + /* + * Look up the node and check the largest event ID that we've seen. + * Since event IDs are increasing and are delivered in order from + * a given publisher, we just need to check that this ID is + * greater than the last one that we saw. + */ + nd = evt_find_node(evtpkt->led_publisher_node_id); + if (!nd) { + log_printf(LOG_LEVEL_DEBUG, "Node ID 0x%x not found for event %llx\n", + evtpkt->led_publisher_node_id, evtpkt->led_event_id); + evt_add_node(cn); + return 0; + } + + /* + * This shouldn't happen + */ + if ((nd->mn_last_evt_id >= evtpkt->led_event_id) && + (evtpkt->led_event_id & 0xffffffffull) != 0ull) { + log_printf(LOG_LEVEL_NOTICE, + "Event out of order for node ID 0x%x\n", + evtpkt->led_publisher_node_id); + log_printf(LOG_LEVEL_NOTICE, + "last event ID 0x%llx, current event ID 0x%llx\n", + nd->mn_last_evt_id, evtpkt->led_event_id); + return 1; + } + + /* + * This is probably OK, but here for debugging purposes + */ + if(((nd->mn_last_evt_id & 0xffffffff) > 0) && (nd->mn_last_evt_id < + (evtpkt->led_event_id -1))) { + log_printf(LOG_LEVEL_NOTICE, + "Event sequence skipped for node ID 0x%x\n", + evtpkt->led_publisher_node_id); + log_printf(LOG_LEVEL_NOTICE, + "last event ID 0x%llx, current event ID 0x%llx\n", + nd->mn_last_evt_id, evtpkt->led_event_id); + } + nd->mn_last_evt_id = evtpkt->led_event_id; + + return 0; +} +#endif + +/* + * Send a message to the app to wake it up if it is polling + */ static int message_handler_req_lib_activatepoll(struct conn_info *conn_info, void *message) { @@ -190,128 +747,960 @@ static int message_handler_req_lib_activatepoll(struct conn_info *conn_info, return (0); } -static int evt_init(struct conn_info *conn_info, void *msg) +/* + * event id generating code. We use the node ID for this node for the + * upper 32 bits of the event ID to make sure that we can generate a cluster + * wide unique event ID for a given event. + */ +static SaEvtEventIdT base_id = 0; +SaErrorT set_event_id(SaClmNodeIdT node_id) +{ + SaErrorT err = SA_OK; + if (base_id) { + err = SA_ERR_EXIST; + } + base_id = (SaEvtEventIdT)node_id << 32; + return err; +} + +static SaErrorT get_event_id(uint64_t *event_id) +{ + *event_id = base_id++; + return SA_OK; +} + +/* +static uint32_t evt_alloc = 0; +static uint32_t evt_free = 0; +*/ +/* + * Free up an event structure if it isn't being used anymore. + */ +static void +free_event_data(struct event_data *edp) +{ + if (--edp->ed_ref_count) { + return; + } + log_printf(LOG_LEVEL_DEBUG, "Freeing event ID: 0x%llx\n", + edp->ed_event.led_event_id); + if (edp->ed_delivered) { + free(edp->ed_delivered); + } + +#ifdef EVT_ALLOC_CHECK + evt_free++; + if ((evt_free % 1000) == 0) { + log_printf(LOG_LEVEL_NOTICE, "evt alloc: %u, evt free: %u\n", + evt_alloc, evt_free); + } +#endif + free(edp); +} + +/* + * Check for expired events. Then re-arm the timer. + * + */ +static void +event_retention_timeout(void *data) +{ + struct event_data *edp = data; + log_printf(LOG_LEVEL_DEBUG, "Event ID %llx expired\n", + edp->ed_event.led_event_id); + list_del(&edp->ed_retained); + list_init(&edp->ed_retained); + free_event_data(edp); +} + +/* + * clear a particular event's retention time. + * This will free the event as long as it isn't being + * currently used. + * + */ +static void +clear_retention_time(SaEvtEventIdT event_id) +{ + struct event_data *edp; + struct list_head *l, *nxt; + int ret; + + log_printf(LOG_LEVEL_DEBUG, "Search for Event ID %llx\n", event_id); + for(l = retained_list.next; l != &retained_list; l = nxt) { + nxt = l->next; + edp = list_entry(l, struct event_data, ed_retained); + if (edp->ed_event.led_event_id != event_id) { + continue; + } + + log_printf(LOG_LEVEL_DEBUG, + "Clear retention time for Event ID %llx\n", + edp->ed_event.led_event_id); + ret = poll_timer_delete(aisexec_poll_handle, edp->ed_timer_handle); + if (ret != 0 ) { + log_printf(LOG_LEVEL_ERROR, "Error expiring event ID %llx\n", + edp->ed_event.led_event_id); + return; + } + edp->ed_event.led_retention_time = 0; + list_del(&edp->ed_retained); + list_init(&edp->ed_retained); + free_event_data(edp); + break; + } +} + +/* + * Remove specified channel from event delivery list + */ +static void +remove_delivered_channel(struct event_svr_channel_open *eco) +{ + int i; + struct list_head *l; + struct event_data *edp; + + for (l = retained_list.next; l != &retained_list; l = l->next) { + edp = list_entry(l, struct event_data, ed_retained); + + for (i = 0; i < edp->ed_delivered_next; i++) { + if (edp->ed_delivered[i] == eco) { + edp->ed_delivered_next--; + if (edp->ed_delivered_next == i) { + break; + } + memmove(&edp->ed_delivered[i], + &edp->ed_delivered[i+1], + &edp->ed_delivered[edp->ed_delivered_next] - + &edp->ed_delivered[i]); + break; + } + } + } + return; +} + +/* + * If there is a retention time, add this open channel to the event so + * we can check if we've already delivered this message later if a new + * subscription matches. + */ +#define DELIVER_SIZE 8 +static void +evt_delivered(struct event_data *evt, struct event_svr_channel_open *eco) +{ + if (!evt->ed_event.led_retention_time) { + return; + } + + log_printf(LOG_LEVEL_DEBUG, "delivered ID %llx to eco %p\n", + evt->ed_event.led_event_id, eco); + if (evt->ed_delivered_count == evt->ed_delivered_next) { + evt->ed_delivered = realloc(evt->ed_delivered, + DELIVER_SIZE * sizeof(struct event_svr_channel_open *)); + memset(evt->ed_delivered + evt->ed_delivered_next, 0, + DELIVER_SIZE * sizeof(struct event_svr_channel_open *)); + evt->ed_delivered_next = evt->ed_delivered_count; + evt->ed_delivered_count += DELIVER_SIZE; + } + + evt->ed_delivered[evt->ed_delivered_next++] = eco; +} + +/* + * Check to see if an event has already been delivered to this open channel + */ +static int +evt_already_delivered(struct event_data *evt, + struct event_svr_channel_open *eco) +{ + int i; + + if (!evt->ed_event.led_retention_time) { + return 0; + } + + log_printf(LOG_LEVEL_DEBUG, "Deliver count: %d deliver_next %d\n", + evt->ed_delivered_count, evt->ed_delivered_next); + for (i = 0; i < evt->ed_delivered_next; i++) { + log_printf(LOG_LEVEL_DEBUG, "Checking ID %llx delivered %p eco %p\n", + evt->ed_event.led_event_id, evt->ed_delivered[i], eco); + if (evt->ed_delivered[i] == eco) { + return 1; + } + } + return 0; +} + +/* + * Compare a filter to a given pattern. + * return SA_OK if the pattern matches a filter + */ +static SaErrorT +filter_match(SaEvtEventPatternT *ep, SaEvtEventFilterT *ef) +{ + int ret; + ret = SA_ERR_FAILED_OPERATION; + + switch (ef->filterType) { + case SA_EVT_PREFIX_FILTER: + if (ef->filter.patternSize > ep->patternSize) { + break; + } + if (strncmp(ef->filter.pattern, ep->pattern, + ef->filter.patternSize) == 0) { + ret = SA_OK; + } + break; + case SA_EVT_SUFFIX_FILTER: + if (ef->filter.patternSize > ep->patternSize) { + break; + } + if (strncmp(ef->filter.pattern, + &ep->pattern[ep->patternSize - ef->filter.patternSize], + ef->filter.patternSize) == 0) { + ret = SA_OK; + } + + break; + case SA_EVT_EXACT_FILTER: + if (ef->filter.patternSize != ep->patternSize) { + break; + } + if (strncmp(ef->filter.pattern, ep->pattern, + ef->filter.patternSize) == 0) { + ret = SA_OK; + } + break; + case SA_EVT_PASS_ALL_FILTER: + ret = SA_OK; + break; + default: + break; + } + return ret; +} + +/* + * compare the event's patterns with the subscription's filter rules. + * SA_OK is returned if the event matches the filter rules. + */ +static SaErrorT +event_match(struct event_data *evt, + struct event_svr_channel_subscr *ecs) +{ + SaEvtEventFilterT *ef; + SaEvtEventPatternT *ep; + uint32_t filt_count; + SaErrorT ret = SA_OK; + int i; + + ep = (SaEvtEventPatternT *)(&evt->ed_event.led_body[0]); + ef = ecs->ecs_filters->filters; + filt_count = min(ecs->ecs_filters->filtersNumber, + evt->ed_event.led_patterns_number); + + for (i = 0; i < filt_count; i++) { + ret = filter_match(ep, ef); + if (ret != SA_OK) { + break; + } + ep++; + ef++; + } + return ret; +} + +/* + * Scan undelivered pending events and either remove them if no subscription + * filters match anymore or re-assign them to another matching subscription + */ +static void +filter_undelivered_events(struct event_svr_channel_open *op_chan) +{ + struct event_svr_channel_open *eco; + struct event_svr_channel_instance *eci; + struct event_svr_channel_subscr *ecs; + struct chan_event_list *cel; + struct libevt_ci *esip = &op_chan->eco_conn_info->ais_ci.u.libevt_ci; + struct list_head *l, *nxt; + struct list_head *l1, *l2; + int i; + + eci = op_chan->eco_channel; + + /* + * Scan each of the priority queues for messages + */ + for (i = SA_EVT_HIGHEST_PRIORITY; i <= SA_EVT_LOWEST_PRIORITY; i++) { + /* + * examine each message queued for delivery + */ + for (l = esip->esi_events[i].next; l != &esip->esi_events[i]; l = nxt) { + nxt = l->next; + cel = list_entry(l, struct chan_event_list, cel_entry); + /* + * Check open channels + */ + for (l1 = eci->esc_open_chans.next; + l1 != &eci->esc_open_chans; l1 = l1->next) { + eco = list_entry(l1, struct event_svr_channel_open, eco_entry); + + /* + * See if this channel open instance belongs + * to this evtinitialize instance + */ + if (eco->eco_conn_info != op_chan->eco_conn_info) { + continue; + } + + /* + * See if enabled to receive + */ + if (!(eco->eco_flags & SA_EVT_CHANNEL_SUBSCRIBER)) { + continue; + } + + /* + * Check subscriptions + */ + for (l2 = eco->eco_subscr.next; + l2 != &eco->eco_subscr; l2 = l2->next) { + ecs = list_entry(l2, + struct event_svr_channel_subscr, ecs_entry); + if (event_match(cel->cel_event, ecs) == SA_OK) { + /* + * Something still matches. + * We'll assign it to + * the new subscription. + */ + cel->cel_sub_id = ecs->ecs_sub_id; + cel->cel_chan_handle = eco->eco_lib_handle; + goto next_event; + } + } + } + /* + * No subscription filter matches anymore. We + * can delete this event. + */ + list_del(&cel->cel_entry); + list_init(&cel->cel_entry); + esip->esi_nevents--; + +#ifdef EVT_EVENT_LIST_CHECK + if (esip->esi_nevents < 0) { + log_printf(LOG_LEVEL_NOTICE, "event count went negative\n"); + esip->esi_nevents = 0; + } +#endif + free_event_data(cel->cel_event); + free(cel); +next_event: + continue; + } + } +} + +/* + * Notify the library of a pending event + */ +static void __notify_event(struct conn_info *conn_info) +{ + struct res_evt_event_data res; + struct libevt_ci *esip = &conn_info->ais_ci.u.libevt_ci; + + log_printf(LOG_LEVEL_DEBUG, "DELIVER: notify\n"); + if (esip->esi_nevents != 0) { + res.evd_head.size = sizeof(res); + res.evd_head.id = MESSAGE_RES_EVT_AVAILABLE; + res.evd_head.error = SA_OK; + libais_send_response(conn_info, &res, sizeof(res)); + } + +} +inline void notify_event(struct conn_info *conn_info) +{ + struct libevt_ci *esip = &conn_info->ais_ci.u.libevt_ci; + + /* + * Give the library a kick if there aren't already + * events queued for delivery. + */ + if (esip->esi_nevents++ == 0) { + __notify_event(conn_info); + } +} + +/* + * sends/queues up an event for a subscribed channel. + */ +static void +deliver_event(struct event_data *evt, + struct event_svr_channel_open *eco, + struct event_svr_channel_subscr *ecs) +{ + struct chan_event_list *ep; + struct libevt_ci *esip = &eco->eco_conn_info->ais_ci.u.libevt_ci; + SaEvtEventPriorityT evt_prio = evt->ed_event.led_priority; + struct chan_event_list *cel; + int do_deliver_event = 0; + int do_deliver_warning = 0; + int i; + + if (evt_prio > SA_EVT_LOWEST_PRIORITY) { + evt_prio = SA_EVT_LOWEST_PRIORITY; + } + + /* + * Delivery queue check. + * - If the queue is blocked, see if we've sent enough messages to + * unblock it. + * - If it isn't blocked, see if this message will put us over the top. + * - If we can't deliver this message, see if we can toss some lower + * priority message to make room for this one. + * - If we toss any messages, queue up an event of SA_EVT_LOST_EVENT_PATTERN + * to let the application know that we dropped some messages. + */ + if (esip->esi_queue_blocked) { + if (esip->esi_nevents < MIN_EVT_QUEUE_RESUME) { + esip->esi_queue_blocked = 0; + log_printf(LOG_LEVEL_DEBUG, "unblock\n"); + } + } + + if (!esip->esi_queue_blocked && + (esip->esi_nevents >= MAX_EVT_DELIVERY_QUEUE)) { + log_printf(LOG_LEVEL_DEBUG, "block\n"); + esip->esi_queue_blocked = 1; + do_deliver_warning = 1; + } + + if (esip->esi_queue_blocked) { + do_deliver_event = 0; + for (i = SA_EVT_LOWEST_PRIORITY; i > evt_prio; i--) { + if (!list_empty(&esip->esi_events[i])) { + /* + * Get the last item on the list, so we drop the most + * recent lowest priority event. + */ + cel = list_entry(esip->esi_events[i].prev, + struct chan_event_list, cel_entry); + log_printf(LOG_LEVEL_DEBUG, "Drop 0x%0llx\n", + cel->cel_event->ed_event.led_event_id); + list_del(&cel->cel_entry); + free_event_data(cel->cel_event); + free(cel); + esip->esi_nevents--; + do_deliver_event = 1; + break; + } + } + } else { + do_deliver_event = 1; + } + + /* + * Queue the event for delivery + */ + if (do_deliver_event) { + evt->ed_ref_count++; + ep = malloc(sizeof(*ep)); + if (!ep) { + log_printf(LOG_LEVEL_WARNING, + "Memory allocation error, can't deliver event\n"); + return; + } + ep->cel_chan_handle = eco->eco_lib_handle; + ep->cel_sub_id = ecs->ecs_sub_id; + list_init(&ep->cel_entry); + ep->cel_event = evt; + list_add_tail(&ep->cel_entry, &esip->esi_events[evt_prio]); + evt_delivered(evt, eco); + notify_event(eco->eco_conn_info); + } + + /* + * If we dropped an event, queue this so that the application knows + * what has happened. + */ + if (do_deliver_warning) { + struct event_data *ed; + ed = malloc(dropped_event_size); + if (!ed) { + log_printf(LOG_LEVEL_WARNING, + "Memory allocation error, can't deliver event\n"); + return; + } + log_printf(LOG_LEVEL_DEBUG, "Warn 0x%0llx\n", + evt->ed_event.led_event_id); + memcpy(ed, dropped_event, dropped_event_size); + ed->ed_event.led_publish_time = clustTimeNow(); + list_init(&ed->ed_retained); + + ep = malloc(sizeof(*ep)); + if (!ep) { + log_printf(LOG_LEVEL_WARNING, + "Memory allocation error, can't deliver event\n"); + return; + } + ep->cel_chan_handle = eco->eco_lib_handle; + ep->cel_sub_id = ecs->ecs_sub_id; + list_init(&ep->cel_entry); + ep->cel_event = ed; + list_add_tail(&ep->cel_entry, &esip->esi_events[SA_EVT_HIGHEST_PRIORITY]); + notify_event(eco->eco_conn_info); + } +} + +/* + * Take an event received from the network and fix it up to be usable. + * - fix up pointers for pattern list. + * - fill in some channel info + */ +static struct event_data * +make_local_event(struct lib_event_data *p, + struct event_svr_channel_instance *eci) +{ + struct event_data *ed; + SaEvtEventPatternT *eps; + SaUint8T *str; + uint32_t ed_size; + int i; + + ed_size = sizeof(*ed) + p->led_user_data_offset + p->led_user_data_size; + ed = malloc(ed_size); + if (!ed) { + return 0; + } + memset(ed, 0, ed_size); + list_init(&ed->ed_retained); + ed->ed_my_chan = eci; + + /* + * Fill in lib_event_data and make the pattern pointers valid + */ + memcpy(&ed->ed_event, p, sizeof(*p) + + p->led_user_data_offset + p->led_user_data_size); + + eps = (SaEvtEventPatternT *)ed->ed_event.led_body; + str = ed->ed_event.led_body + + (ed->ed_event.led_patterns_number * sizeof(SaEvtEventPatternT)); + for (i = 0; i < ed->ed_event.led_patterns_number; i++) { + eps->pattern = str; + str += eps->patternSize; + eps++; + } + +#ifdef EVT_ALLOC_CHECK + evt_alloc++; + if ((evt_alloc % 1000) == 0) { + log_printf(LOG_LEVEL_NOTICE, "evt alloc: %u, evt free: %u\n", + evt_alloc, evt_free); + } +#endif + ed->ed_ref_count++; + return ed; +} + +/* + * Set an event to be retained. + */ +static void retain_event(struct event_data *evt) +{ + uint32_t ret; + int msec_in_future; + + evt->ed_ref_count++; + list_add_tail(&evt->ed_retained, &retained_list); + /* + * Time in nanoseconds - convert to miliseconds + */ + msec_in_future = (uint32_t)((evt->ed_event.led_retention_time) / 1000000ULL); + ret = poll_timer_add(aisexec_poll_handle, + msec_in_future, + evt, + event_retention_timeout, + &evt->ed_timer_handle); + if (ret != 0) { + log_printf(LOG_LEVEL_ERROR, "retention of event id 0x%llx failed\n", + evt->ed_event.led_event_id); + } else { + log_printf(LOG_LEVEL_DEBUG, "Retain event ID 0x%llx\n", + evt->ed_event.led_event_id); + } +} + +/* + * Scan the subscription list and look for the specified subsctiption ID. + * Only look for the ID in subscriptions that are associated with the + * saEvtInitialize associated with the specified open channel. + */ +static struct event_svr_channel_subscr *find_subscr( + struct event_svr_channel_open *open_chan, SaEvtSubscriptionIdT sub_id) +{ + struct event_svr_channel_instance *eci; + struct event_svr_channel_subscr *ecs; + struct event_svr_channel_open *eco; + struct list_head *l, *l1; + struct conn_info* conn_info = open_chan->eco_conn_info; + + eci = open_chan->eco_channel; + + /* + * Check for subscription id already in use. + * Subscriptions are unique within saEvtInitialize (Callback scope). + */ + for (l = eci->esc_open_chans.next; l != &eci->esc_open_chans; l = l->next) { + eco = list_entry(l, struct event_svr_channel_open, eco_entry); + /* + * Don't bother with open channels associated with another + * EvtInitialize + */ + if (eco->eco_conn_info != conn_info) { + continue; + } + + for (l1 = eco->eco_subscr.next; l1 != &eco->eco_subscr; l1 = l1->next) { + ecs = list_entry(l1, struct event_svr_channel_subscr, ecs_entry); + if (ecs->ecs_sub_id == sub_id) { + return ecs; + } + } + } + return 0; +} + +/* + * Handler for saEvtInitialize + */ +static int evt_initialize(struct conn_info *conn_info, void *msg) { struct res_lib_init res; + struct libevt_ci *libevt_ci = &conn_info->ais_ci.u.libevt_ci; + int i; res.header.size = sizeof (struct res_lib_init); res.header.id = MESSAGE_RES_INIT; res.header.error = SA_OK; - log_printf(LOG_LEVEL_DEBUG, - "Got request to initalize cluster event service.\n"); + log_printf(LOG_LEVEL_DEBUG, "saEvtInitialize request.\n"); if (!conn_info->authenticated) { - log_printf(LOG_LEVEL_DEBUG, - "event service: Not authenticated\n"); + log_printf(LOG_LEVEL_DEBUG, "event service: Not authenticated\n"); res.header.error = SA_ERR_SECURITY; libais_send_response(conn_info, &res, sizeof(res)); return -1; } + memset(libevt_ci, 0, sizeof(*libevt_ci)); + list_init(&libevt_ci->esi_open_chans); + for (i = SA_EVT_HIGHEST_PRIORITY; i <= SA_EVT_LOWEST_PRIORITY; i++) { + list_init(&libevt_ci->esi_events[i]); + } conn_info->service = SOCKET_SERVICE_EVT; - libais_send_response (conn_info, &res, sizeof(res)); list_init (&conn_info->conn_list); + list_add_tail(&conn_info->conn_list, &ci_head); + libais_send_response (conn_info, &res, sizeof(res)); return 0; } +/* + * Handler for saEvtChannelOpen + */ static int lib_evt_open_channel(struct conn_info *conn_info, void *message) { + uint32_t handle; + SaErrorT error; struct req_evt_channel_open *req; struct res_evt_channel_open res; + struct event_svr_channel_instance *eci; + struct event_svr_channel_open *eco; + struct libevt_ci *esip = &conn_info->ais_ci.u.libevt_ci; req = message; - log_printf(LOG_LEVEL_DEBUG, "Open channel request\n"); + log_printf(LOG_LEVEL_DEBUG, "saEvtChannelOpen (Open channel request)\n"); log_printf(LOG_LEVEL_DEBUG, - "size %d, id %d, handle 0x%x, to 0x%llx\n", - req->ico_head.size, - req->ico_head.id, + "handle 0x%x, to 0x%llx\n", req->ico_c_handle, req->ico_timeout); log_printf(LOG_LEVEL_DEBUG, "flags %x, channel name(%d) %s\n", req->ico_open_flag, req->ico_channel_name.length, req->ico_channel_name.value); + /* + * Create a handle to give back to the caller to associate + * with this channel open instance. + */ + error = saHandleCreate(&esip->esi_hdb, sizeof(*eco), &handle); + if (error != SA_OK) { + goto open_return; + } + error = saHandleInstanceGet(&esip->esi_hdb, handle, (void**)&eco); + if (error != SA_OK) { + goto open_return; + } /* - * TODO: Add open code here + * Open the channel. + * */ + error = evt_open_channel(&req->ico_channel_name, + req->ico_open_flag, req->ico_timeout, &eci, esip); + if (error != SA_OK) { + saHandleDestroy(&esip->esi_hdb, handle); + goto open_put; + } + + /* + * Initailize and link into the global channel structure. + */ + list_init(&eco->eco_subscr); + list_init(&eco->eco_entry); + list_init(&eco->eco_instance_entry); + eco->eco_flags = req->ico_open_flag; + eco->eco_channel = eci; + eco->eco_lib_handle = req->ico_c_handle; + eco->eco_my_handle = handle; + eco->eco_conn_info = conn_info; + list_add_tail(&eco->eco_entry, &eci->esc_open_chans); + list_add_tail(&eco->eco_instance_entry, &esip->esi_open_chans); + + /* + * respond back with a handle to access this channel + * open instance for later subscriptions, etc. + */ +open_put: + saHandleInstancePut(&esip->esi_hdb, handle); +open_return: res.ico_head.size = sizeof(res); res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL; - res.ico_head.error = SA_OK; - res.ico_channel_handle = req->ico_c_handle; /* TODO: fix this */ + res.ico_head.error = error; + res.ico_channel_handle = handle; libais_send_response (conn_info, &res, sizeof(res)); return 0; } + + +/* + * Used by the channel close code and by the implicit close + * when saEvtFinalize is called with channels open. + */ +static void +common_chan_close(struct event_svr_channel_open *eco, struct libevt_ci *esip) +{ + struct event_svr_channel_subscr *ecs; + struct list_head *l, *nxt; + + /* + * TODO: do channel close with the rest of the world + */ + log_printf(LOG_LEVEL_DEBUG, "Close channel %s flags 0x%02x\n", + eco->eco_channel->esc_channel_name.value, + eco->eco_flags); + + /* + * Unlink the channel open structure. + * + * Check for subscriptions and deal with them. In this case + * if there are any, we just implicitly unsubscribe. + * + * When We're done with the channel open data then we can + * remove it's handle (this frees the memory too). + * + */ + list_del(&eco->eco_entry); + list_del(&eco->eco_instance_entry); + + for (l = eco->eco_subscr.next; l != &eco->eco_subscr; l = nxt) { + nxt = l->next; + ecs = list_entry(l, struct event_svr_channel_subscr, ecs_entry); + log_printf(LOG_LEVEL_DEBUG, "Unsubscribe ID: %x\n", ecs->ecs_sub_id); + list_del(&ecs->ecs_entry); + free(ecs); + /* + * Purge any pending events associated with this subscription + * that don't match another subscription. + */ + filter_undelivered_events(eco); + } + + /* + * Remove this channel from the retained event's notion + * of who they have been delivered to. + */ + remove_delivered_channel(eco); +} + +/* + * Handler for saEvtChannelClose + */ static int lib_evt_close_channel(struct conn_info *conn_info, void *message) { struct req_evt_channel_close *req; struct res_evt_channel_close res; + struct event_svr_channel_open *eco; + struct libevt_ci *esip = &conn_info->ais_ci.u.libevt_ci; + SaErrorT error; req = message; - log_printf(LOG_LEVEL_DEBUG, "Close channel request\n"); - log_printf(LOG_LEVEL_DEBUG, "size %d, id %d, handle 0x%x\n", - req->icc_head.size, - req->icc_head.id, - req->icc_channel_handle); + log_printf(LOG_LEVEL_DEBUG, "saEvtChannelClose (Close channel request)\n"); + log_printf(LOG_LEVEL_DEBUG, "handle 0x%x\n", req->icc_channel_handle); /* - * TODO: Add close code here + * look up the channel handle */ + error = saHandleInstanceGet(&esip->esi_hdb, + req->icc_channel_handle, (void**)&eco); + if (error != SA_OK) { + goto chan_close_done; + } + + common_chan_close(eco, esip); + saHandleDestroy(&esip->esi_hdb, req->icc_channel_handle); + saHandleInstancePut(&esip->esi_hdb, req->icc_channel_handle); + +chan_close_done: res.icc_head.size = sizeof(res); res.icc_head.id = MESSAGE_RES_EVT_CLOSE_CHANNEL; - res.icc_head.error = SA_OK; + res.icc_head.error = error; libais_send_response (conn_info, &res, sizeof(res)); return 0; } -static int lib_evt_channel_subscribe(struct conn_info *conn_info, void *message) +/* + * Subscribe to an event channel. + * + * - First look up the channel to subscribe. + * - Make sure that the subscription ID is not already in use. + * - Fill in the subscription data structures and add them to the channels + * subscription list. + * - See if there are any events with retetion times that need to be delivered + * because of the new subscription. + */ +static char *filter_types[] = { + "INVALID FILTER TYPE", + "SA_EVT_PREFIX_FILTER", + "SA_EVT_SUFFIX_FILTER", + "SA_EVT_EXACT_FILTER", + "SA_EVT_PASS_ALL_FILTER", +}; + +/* + * saEvtEventSubscribe Handler + */ +static int lib_evt_event_subscribe(struct conn_info *conn_info, void *message) { - struct req_evt_channel_subscribe *req; - struct res_evt_channel_subscribe res; + struct req_evt_event_subscribe *req; + struct res_evt_event_subscribe res; SaEvtEventFilterArrayT *filters; SaErrorT error = SA_OK; + struct event_svr_channel_open *eco; + struct event_svr_channel_instance *eci; + struct event_svr_channel_subscr *ecs; + struct event_data *evt; + struct libevt_ci *esip = &conn_info->ais_ci.u.libevt_ci; + struct list_head *l; int i; req = message; - log_printf(LOG_LEVEL_DEBUG, "Subscribe channel request\n"); - log_printf(LOG_LEVEL_DEBUG, "size %d, id %d\n", - req->ics_head.size, - req->ics_head.id); + log_printf(LOG_LEVEL_DEBUG, "saEvtEventSubscribe (Subscribe request)\n"); log_printf(LOG_LEVEL_DEBUG, "subscription Id: 0x%x\n", req->ics_sub_id); + error = evtfilt_to_aisfilt(req, &filters); if (error == SA_OK) { log_printf(LOG_LEVEL_DEBUG, "Subscribe filters count %d\n", filters->filtersNumber); for (i = 0; i < filters->filtersNumber; i++) { - log_printf(LOG_LEVEL_DEBUG, "sz %d, type %d, <%s>\n", + log_printf(LOG_LEVEL_DEBUG, "type %s(%d) sz %d, <%s>\n", + filter_types[filters->filters[i].filterType], filters->filters[i].filterType, filters->filters[i].filter.patternSize, - filters->filters[i].filter.pattern); + (filters->filters[i].filter.patternSize) + ? (char *)filters->filters[i].filter.pattern + : ""); } } - - /* - * TODO: add subscribe code here. - * TODO: remove filters for now to avoid a leak. - */ - for (i = 0; i < filters->filtersNumber; i++) { - free(filters->filters[i].filter.pattern); - } - free(filters->filters); - free(filters); + if (error != SA_OK) { + goto subr_done; + } + + /* + * look up the channel handle + */ + error = saHandleInstanceGet(&esip->esi_hdb, + req->ics_channel_handle, (void**)&eco); + if (error != SA_OK) { + goto subr_done; + } + + eci = eco->eco_channel; + + /* + * See if the id is already being used + */ + ecs = find_subscr(eco, req->ics_sub_id); + if (ecs) { + error = SA_ERR_EXIST; + goto subr_put; + } + + ecs = (struct event_svr_channel_subscr *)malloc(sizeof(*ecs)); + if (!ecs) { + error = SA_ERR_NO_MEMORY; + goto subr_put; + } + ecs->ecs_filters = filters; + ecs->ecs_sub_id = req->ics_sub_id; + list_init(&ecs->ecs_entry); + list_add(&ecs->ecs_entry, &eco->eco_subscr); + + + res.ics_head.size = sizeof(res); + res.ics_head.id = MESSAGE_RES_EVT_SUBSCRIBE; + res.ics_head.error = error; + libais_send_response (conn_info, &res, sizeof(res)); + + /* + * See if an existing event with a retention time + * needs to be delivered based on this subscription + */ + for (l = retained_list.next; l != &retained_list; l = l->next) { + evt = list_entry(l, struct event_data, ed_retained); + log_printf(LOG_LEVEL_DEBUG, + "Checking event ID %llx chanp %p -- sub chanp %p\n", + evt->ed_event.led_event_id, evt->ed_my_chan, eci); + if (evt->ed_my_chan == eci) { + if (evt_already_delivered(evt, eco)) { + continue; + } + if (event_match(evt, ecs) == SA_OK) { + log_printf(LOG_LEVEL_DEBUG, + "deliver event ID: 0x%llx\n", + evt->ed_event.led_event_id); + deliver_event(evt, eco, ecs); + } + } + } + saHandleInstancePut(&esip->esi_hdb, req->ics_channel_handle); + return 0; + +subr_put: + saHandleInstancePut(&esip->esi_hdb, req->ics_channel_handle); +subr_done: res.ics_head.size = sizeof(res); res.ics_head.id = MESSAGE_RES_EVT_SUBSCRIBE; res.ics_head.error = error; @@ -319,26 +1708,62 @@ static int lib_evt_channel_subscribe(struct conn_info *conn_info, void *message) return 0; } -static int lib_evt_channel_unsubscribe(struct conn_info *conn_info, + +/* + * saEvtEventUnsubscribe Handler + */ +static int lib_evt_event_unsubscribe(struct conn_info *conn_info, void *message) { - struct req_evt_channel_unsubscribe *req; - struct res_evt_channel_unsubscribe res; + struct req_evt_event_unsubscribe *req; + struct res_evt_event_unsubscribe res; + struct event_svr_channel_open *eco; + struct event_svr_channel_instance *eci; + struct event_svr_channel_subscr *ecs; + struct libevt_ci *esip = &conn_info->ais_ci.u.libevt_ci; SaErrorT error = SA_OK; req = message; - log_printf(LOG_LEVEL_DEBUG, "Unsubscribe channel request\n"); - log_printf(LOG_LEVEL_DEBUG, "size %d, id %d\n", - req->icu_head.size, - req->icu_head.id); + log_printf(LOG_LEVEL_DEBUG, + "saEvtEventUnsubscribe (Unsubscribe request)\n"); log_printf(LOG_LEVEL_DEBUG, "subscription Id: 0x%x\n", req->icu_sub_id); /* - * TODO: Add unsubscribe code here + * look up the channel handle, get the open channel + * data. */ + error = saHandleInstanceGet(&esip->esi_hdb, + req->icu_channel_handle, (void**)&eco); + if (error != SA_OK) { + goto unsubr_done; + } + + eci = eco->eco_channel; + /* + * Make sure that the id exists. + */ + ecs = find_subscr(eco, req->icu_sub_id); + if (!ecs) { + error = SA_ERR_INVALID_PARAM; + goto unsubr_put; + } + list_del(&ecs->ecs_entry); + + log_printf(LOG_LEVEL_DEBUG, + "unsubscribe from channel %s subscription ID 0x%x " + "with %d filters\n", + eci->esc_channel_name.value, + ecs->ecs_sub_id, ecs->ecs_filters->filtersNumber); + + free_filters(ecs->ecs_filters); + free(ecs); + +unsubr_put: + saHandleInstancePut(&esip->esi_hdb, req->icu_channel_handle); +unsubr_done: res.icu_head.size = sizeof(res); res.icu_head.id = MESSAGE_RES_EVT_UNSUBSCRIBE; res.icu_head.error = error; @@ -347,24 +1772,61 @@ static int lib_evt_channel_unsubscribe(struct conn_info *conn_info, return 0; } +/* + * saEvtEventPublish Handler + */ static int lib_evt_event_publish(struct conn_info *conn_info, void *message) { struct lib_event_data *req; struct res_evt_event_publish res; - SaEvtEventIdT event_id = 0x5a5a5a5a5a5a5a5aull; + struct libevt_ci *esip = &conn_info->ais_ci.u.libevt_ci; + struct event_svr_channel_open *eco; + struct event_svr_channel_instance *eci; + SaEvtEventIdT event_id = 0; SaErrorT error = SA_OK; + struct iovec pub_iovec; + int result; + req = message; - log_printf(LOG_LEVEL_DEBUG, "Publish event request\n"); - log_printf(LOG_LEVEL_DEBUG, "size %d, id %d\n", - req->led_head.size, - req->led_head.id); + log_printf(LOG_LEVEL_DEBUG, "saEvtEventPublish (Publish event request)\n"); + /* - * TODO: Add publish code here + * look up and validate open channel info */ + error = saHandleInstanceGet(&esip->esi_hdb, + req->led_svr_channel_handle, (void**)&eco); + if (error != SA_OK) { + goto pub_done; + } + eci = eco->eco_channel; + + /* + * modify the request structure for sending event data to subscribed + * processes. + */ + get_event_id(&event_id); + req->led_head.id = MESSAGE_REQ_EXEC_EVT_EVENTDATA; + req->led_chan_name = eci->esc_channel_name; + req->led_event_id = event_id; + + /* + * Distribute the event. + * The multicasted event will be picked up and delivered + * locally by the local network event receiver. + */ + pub_iovec.iov_base = req; + pub_iovec.iov_len = req->led_head.size; + result = gmi_mcast (&aisexec_groupname, &pub_iovec, 1, GMI_PRIO_LOW); + if (result != 0) { + error = SA_ERR_SYSTEM; + } + + saHandleInstancePut(&esip->esi_hdb, req->led_svr_channel_handle); +pub_done: res.iep_head.size = sizeof(res); res.iep_head.id = MESSAGE_RES_EVT_PUBLISH; res.iep_head.error = error; @@ -373,25 +1835,43 @@ static int lib_evt_event_publish(struct conn_info *conn_info, void *message) return 0; } -static int lib_evt_event_clear_retentiontime(struct conn_info *conn_info, void *message) + +/* + * saEvtEventRetentionTimeClear handler + */ +static int lib_evt_event_clear_retentiontime(struct conn_info *conn_info, + void *message) { struct req_evt_event_clear_retentiontime *req; struct res_evt_event_clear_retentiontime res; + struct req_evt_chan_command cpkt; + struct iovec rtn_iovec; SaErrorT error = SA_OK; + int ret; req = message; - log_printf(LOG_LEVEL_DEBUG, "Clear event retentiontime request\n"); log_printf(LOG_LEVEL_DEBUG, - "size %d, id %d, event ID 0x%llx, chan handle 0x%x\n", - req->iec_head.size, - req->iec_head.id, + "saEvtEventRetentionTimeClear (Clear event retentiontime request)\n"); + log_printf(LOG_LEVEL_DEBUG, + "event ID 0x%llx, chan handle 0x%x\n", req->iec_event_id, req->iec_channel_handle); /* * TODO: Add clear retention time code here */ + memset(&cpkt, 0, sizeof(cpkt)); + cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD; + cpkt.chc_head.size = sizeof(cpkt); + cpkt.chc_op = MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME; + cpkt.u.chc_event_id = req->iec_event_id; + rtn_iovec.iov_base = &cpkt; + rtn_iovec.iov_len = cpkt.chc_head.size; + ret = gmi_mcast (&aisexec_groupname, &rtn_iovec, 1, GMI_PRIO_MED); + if (ret != 0) { + error = SA_ERR_SYSTEM; + } res.iec_head.size = sizeof(res); res.iec_head.id = MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME; @@ -400,18 +1880,210 @@ static int lib_evt_event_clear_retentiontime(struct conn_info *conn_info, void * return 0; } + +/* + * Send requested event data to the application + */ +static int lib_evt_event_data_get(struct conn_info *conn_info, void *message) +{ + struct lib_event_data res; + struct libevt_ci *esip = &conn_info->ais_ci.u.libevt_ci; + struct chan_event_list *cel; + struct event_data *edp; + int i; + + + /* + * Deliver events in publish order within priority + */ + for (i = SA_EVT_HIGHEST_PRIORITY; i <= SA_EVT_LOWEST_PRIORITY; i++) { + if (!list_empty(&esip->esi_events[i])) { + cel = list_entry(esip->esi_events[i].next, struct chan_event_list, + cel_entry); + list_del(&cel->cel_entry); + list_init(&cel->cel_entry); + esip->esi_nevents--; + if (esip->esi_queue_blocked && + (esip->esi_nevents < MIN_EVT_QUEUE_RESUME)) { + esip->esi_queue_blocked = 0; + log_printf(LOG_LEVEL_DEBUG, "unblock\n"); + } +#ifdef EVT_EVENT_LIST_CHECK + if (esip->esi_nevents < 0) { + log_printf(LOG_LEVEL_NOTICE, "event count went negative\n"); + if (!list_empty(&esip->esi_events[i])) { + log_printf(LOG_LEVEL_NOTICE, "event list isn't empty\n"); + } + esip->esi_nevents = 0; + } +#endif + edp = cel->cel_event; + edp->ed_event.led_lib_channel_handle = cel->cel_chan_handle; + edp->ed_event.led_sub_id = cel->cel_sub_id; + edp->ed_event.led_head.id = MESSAGE_RES_EVT_EVENT_DATA; + edp->ed_event.led_head.error = SA_OK; + free(cel); + libais_send_response(conn_info, &edp->ed_event, + edp->ed_event.led_head.size); + free_event_data(edp); + goto data_get_done; + } + } + + res.led_head.size = sizeof(res.led_head); + res.led_head.id = MESSAGE_RES_EVT_EVENT_DATA; + res.led_head.error = SA_ERR_NOT_EXIST; + libais_send_response(conn_info, &res, res.led_head.size); + + /* + * See if there are any events that the app doesn't know about + * because the notify pipe was full. + */ +data_get_done: + if (esip->esi_nevents) { + __notify_event(conn_info); + } + return 0; +} + +/* + * Called when there is a configuration change in the cluster. + * This function looks at any joiners and leavers and updates the evt + * node list. The node list is used to keep track of event IDs + * received for each node for the detection of duplicate events. + */ static int evt_conf_change( struct sockaddr_in *member_list, int member_list_entries, struct sockaddr_in *left_list, int left_list_entries, struct sockaddr_in *joined_list, int joined_list_entries) { + struct in_addr my_node = {SA_CLM_LOCAL_NODE_ID}; + SaClmClusterNodeT *cn; +#ifdef NO_DUPLICATES + static int first = 1; + struct sockaddr_in *add_list; + SaErrorT error; + int add_count; + log_printf(LOG_LEVEL_DEBUG, "Evt conf change\n"); + log_printf(LOG_LEVEL_DEBUG, "m %d, j %d, l %d\n", + member_list_entries, + joined_list_entries, + left_list_entries); + /* + * Don't seem to be able to tell who joined if we're just coming up. Not all + * nodes show up in the join list. If this is the first time through, + * choose the members list to use to add nodes, after that use the join + * list. ALways use the left list for removing nodes. + */ + if (first) { + add_list = member_list; + add_count = member_list_entries; + first = 0; + } else { + add_list = joined_list; + add_count = joined_list_entries; + } + + while (add_count--) { + log_printf(LOG_LEVEL_DEBUG, + "Look up Cluster node for %s\n", + inet_ntoa(add_list->sin_addr)); + cn = clm_get_by_nodeid(add_list->sin_addr); + if (!cn) { + log_printf(LOG_LEVEL_DEBUG, + "No Cluster node found for %s\n", + inet_ntoa(add_list->sin_addr)); + } else { + log_printf(LOG_LEVEL_DEBUG, "Adding node: %s(0x%x)\n", + cn->nodeName.value, cn->nodeId); + error = evt_add_node(cn); + if (error != SA_OK) { + log_printf(LOG_LEVEL_DEBUG, + "Can't add Cluster node at %s\n", + inet_ntoa(add_list->sin_addr)); + } + } + cn++; + } + + while (left_list_entries--) { + log_printf(LOG_LEVEL_DEBUG, + "Look up Cluster node for %s\n", + inet_ntoa(left_list->sin_addr)); + cn = clm_get_by_nodeid(left_list->sin_addr); + if (!cn) { + log_printf(LOG_LEVEL_DEBUG, + "No Cluster node found for %s\n", + inet_ntoa(left_list->sin_addr)); + } else { + log_printf(LOG_LEVEL_DEBUG, "Removing node: %s(0x%x)\n", + cn->nodeName.value, cn->nodeId); + error = evt_remove_node(cn); + if (error != SA_OK) { + log_printf(LOG_LEVEL_DEBUG, + "Can't add Cluster node at %s\n", + inet_ntoa(left_list->sin_addr)); + } + } + cn++; + } +#endif + + /* + * Set the base event id + */ + if (!base_id) { + cn = clm_get_by_nodeid(my_node); + log_printf(LOG_LEVEL_DEBUG, "My node ID 0x%x\n"); + set_event_id(cn->nodeId); + } + return 0; } -static int evt_exit(struct conn_info *conn_info) + +/* + * saEvtFinalize Handler + */ +static int evt_finalize(struct conn_info *conn_info) { - log_printf(LOG_LEVEL_DEBUG, "Evt exit request\n"); + + struct libevt_ci *esip = &conn_info->ais_ci.u.libevt_ci; + struct event_svr_channel_open *eco; + struct list_head *l, *nxt; + + log_printf(LOG_LEVEL_DEBUG, "saEvtFinalize (Event exit request)\n"); + log_printf(LOG_LEVEL_DEBUG, "saEvtFinalize %d evts on list\n", + esip->esi_nevents); + + /* + * Clean up any open channels and associated subscriptions. + */ + for (l = esip->esi_open_chans.next; l != &esip->esi_open_chans; l = nxt) { + nxt = l->next; + eco = list_entry(l, struct event_svr_channel_open, eco_instance_entry); + common_chan_close(eco, esip); + saHandleDestroy(&esip->esi_hdb, eco->eco_my_handle); + } + +#ifdef EVT_EVENT_LIST_CHECK +{ + int i; + if (esip->esi_nevents) { + log_printf(LOG_LEVEL_WARNING, + "%d Events left on delivery list after finalize\n", + esip->esi_nevents); + } + + for (i = SA_EVT_HIGHEST_PRIORITY; i <= SA_EVT_LOWEST_PRIORITY; i++) { + if (!list_empty(&esip->esi_events[i])) { + log_printf(LOG_LEVEL_WARNING, + "Events list not empty after finalize\n"); + } + } +} +#endif /* * Delete track entry if there is one @@ -420,19 +2092,208 @@ static int evt_exit(struct conn_info *conn_info) return 0; } + +/* + * Called at service start time. + */ static int evt_exec_init(void) { - log_printf(LOG_LEVEL_DEBUG, "Evt exec exit request\n"); + log_printf(LOG_LEVEL_DEBUG, "Evt exec init request\n"); + + /* + * Create an event to be sent when we have to drop messages + * for an application. + */ + dropped_event_size = sizeof(*dropped_event) + sizeof(dropped_pattern); + dropped_event = malloc(dropped_event_size); + if (dropped_event == 0) { + log_printf(LOG_LEVEL_ERROR, + "Memory Allocation Failure, event service not started\n"); + errno = ENOMEM; + return -1; + } + memset(dropped_event, 0, sizeof(*dropped_event) + sizeof(dropped_pattern)); + dropped_event->ed_ref_count = 1; + list_init(&dropped_event->ed_retained); + dropped_event->ed_event.led_head.size = + sizeof(*dropped_event) + sizeof(dropped_pattern); + dropped_event->ed_event.led_head.error = SA_OK; + dropped_event->ed_event.led_priority = SA_EVT_HIGHEST_PRIORITY; + dropped_event->ed_event.led_chan_name = lost_chan; + dropped_event->ed_event.led_publisher_name = dropped_publisher; + dropped_event->ed_event.led_patterns_number = 1; + memcpy(&dropped_event->ed_event.led_body[0], + &dropped_pattern, sizeof(dropped_pattern)); + return 0; } + +/* + * Receive the network event message and distribute it to local subscribers + */ static int evt_remote_evt(void *msg, struct in_addr source_addr) { - log_printf(LOG_LEVEL_DEBUG, "Remote event data received"); + /* + * - retain events that have a retention time + * - Find assocated channel + * - Scan list of subscribers + * - Apply filters + * - Deliver events that pass the filter test + */ + struct lib_event_data *evtpkt = msg; + struct event_svr_channel_instance *eci; + struct event_svr_channel_open *eco; + struct event_svr_channel_subscr *ecs; + struct event_data *evt; + struct list_head *l, *l1; + SaClmClusterNodeT *cn; + + log_printf(LOG_LEVEL_DEBUG, "Remote event data received from 0x08%x\n", + source_addr); + + /* + * See where the message came from so that we can set the + * publishing node id in the message before delivery. + */ + cn = clm_get_by_nodeid (source_addr); + if (!cn) { + /* + * TODO: do something here when we can't find the node. + */ + log_printf(LOG_LEVEL_DEBUG, "No cluster node for %s\n", + inet_ntoa(source_addr)); + errno = ENXIO; + return -1; + } + log_printf(LOG_LEVEL_DEBUG, "Cluster node ID 0x%x name %s\n", + cn->nodeId, cn->nodeName.value); + evtpkt->led_publisher_node_id = cn->nodeId; + + eci = find_channel(&evtpkt->led_chan_name); + + /* + * No one here has this channel open yet. We can ignore the + * message. When someone does open the channel, any retained messages + * will be sent by the originators. + */ + if (!eci) { + return 0; + } + +#ifdef NO_DUPLICATES + /* + * Check for duplicate receipt of message + */ + if (is_duplicate_event(evtpkt, cn)) { + return 0; + } +#endif + + evt = make_local_event(evtpkt, eci); + if (!evt) { + log_printf(LOG_LEVEL_WARNING, + "Memory allocation error, can't deliver event\n"); + errno = ENOMEM; + return -1; + } + + if (evt->ed_event.led_retention_time) { + retain_event(evt); + } + + /* + * Check open channels + */ + for (l = eci->esc_open_chans.next; l != &eci->esc_open_chans; l = l->next) { + eco = list_entry(l, struct event_svr_channel_open, eco_entry); + /* + * See if enabled to receive + */ + if (!(eco->eco_flags & SA_EVT_CHANNEL_SUBSCRIBER)) { + continue; + } + + /* + * Check subscriptions + */ + for (l1 = eco->eco_subscr.next; l1 != &eco->eco_subscr; l1 = l1->next) { + ecs = list_entry(l1, struct event_svr_channel_subscr, ecs_entry); + /* + * Apply filter rules and deliver if patterns + * match filters. + * Only deliver one event per open channel + */ + if (event_match(evt, ecs) == SA_OK) { + deliver_event(evt, eco, ecs); + break; + } + } + } + free_event_data(evt); + + return 0; } + +/* + * Receive and process remote event operations. + * Used to communicate channel opens/closes, clear retention time. + */ static int evt_remote_chan_op(void *msg, struct in_addr source_addr) { + struct req_evt_chan_command *cpkt = msg; + struct in_addr local_node = {SA_CLM_LOCAL_NODE_ID}; + SaClmClusterNodeT *cn, *my_node; + struct event_svr_channel_instance *eci; + + log_printf(LOG_LEVEL_DEBUG, "Remote channel operation request\n"); + my_node = clm_get_by_nodeid(local_node); + cn = clm_get_by_nodeid(source_addr); + + /* + * can ignore messages from me. + if (my_node->nodeId == cn->nodeId) { + return 0; + } + */ + + switch (cpkt->chc_op) { + case MESSAGE_REQ_EVT_OPEN_CHANNEL: + log_printf(LOG_LEVEL_DEBUG, "Creating channel %s for node 0x%x\n", + cpkt->u.chc_chan.value, cn->nodeId); + eci = find_channel(&cpkt->u.chc_chan); + + /* + * If found, either there was a race opening a channel or + * a node joined after a channel was created. We need to send + * him our retained messages to bring him up to date. + */ + if (eci) { + send_retained(&cpkt->u.chc_chan, cn->nodeId); + break; + } + + eci = create_channel(&cpkt->u.chc_chan); + if (!eci) { + log_printf(LOG_LEVEL_WARNING, "Could not create channel %s\n", + &cpkt->u.chc_chan.value); + } + + break; + case MESSAGE_REQ_EVT_CLOSE_CHANNEL: + break; + case MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME: + log_printf(LOG_LEVEL_DEBUG, "Clear retention time request %llx\n", + cpkt->u.chc_event_id); + clear_retention_time(cpkt->u.chc_event_id); + break; + default: + log_printf(LOG_LEVEL_NOTICE, "Invalid channel operation %d\n", + cpkt->chc_op); + break; + } + return 0; } diff --git a/exec/evt.h b/exec/evt.h index f6685713..8dd9039a 100644 --- a/exec/evt.h +++ b/exec/evt.h @@ -14,7 +14,7 @@ * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. - * - Neither the name of the MontaVista Software, Inc. nor the names of its + * - Neither the name of the Open Source Developent Lab nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * @@ -39,11 +39,32 @@ extern struct service_handler evt_service_handler; +/* + * event instance structure. Contains information about the + * active connection to the API library. + * + * esi_version: Version that the library is running. + * esi_open_chans: list of open channels associated with this + * instance. Used to clean up any data left + * allocated when the finalize is done. + * (event_svr_channel_open.eco_instance_entry) + * esi_events: list of pending events to be delivered on this + * instance (struct chan_event_list.cel_entry) + * esi_queue_blocked: non-zero if the delivery queue got too full + * and we're blocking new messages until we + * drain some of the queued messages. + * esi_nevents: Number of events in events lists to be sent. + * esi_hdb: Handle data base for open channels on this + * instance. Used for a quick lookup of + * open channel data from a lib api message. + */ struct libevt_ci { SaVersionT esi_version; - uint32_t esi_notify; struct list_head esi_open_chans; - struct list_head esi_events; + struct list_head esi_events[SA_EVT_LOWEST_PRIORITY+1]; + int esi_nevents; + int esi_queue_blocked; struct saHandleDatabase esi_hdb; }; + #endif