diff --git a/zebra/zebra_opaque.c b/zebra/zebra_opaque.c index 570387e785..d1e0497154 100644 --- a/zebra/zebra_opaque.c +++ b/zebra/zebra_opaque.c @@ -24,8 +24,45 @@ #include "lib/stream.h" #include "zebra/debug.h" #include "zebra/zserv.h" +#include "zebra/zebra_memory.h" #include "zebra/zebra_opaque.h" +/* Mem type */ +DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information"); + +/* Hash to hold message registration info from zapi clients */ +PREDECL_HASH(opq_regh); + +/* Registered client info */ +struct opq_client_reg { + int proto; + int instance; + uint32_t session_id; + + struct opq_client_reg *next; + struct opq_client_reg *prev; +}; + +/* Opaque message registration info */ +struct opq_msg_reg { + struct opq_regh_item item; + + /* Message type */ + uint32_t type; + + struct opq_client_reg *clients; +}; + +/* Registration helper prototypes */ +static uint32_t registration_hash(const struct opq_msg_reg *reg); +static int registration_compare(const struct opq_msg_reg *reg1, + const struct opq_msg_reg *reg2); + +DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare, + registration_hash); + +static struct opq_regh_head opq_reg_hash; + /* * Globals */ @@ -71,6 +108,21 @@ static const char LOG_NAME[] = "Zebra Opaque"; /* Main event loop, processing incoming message queue */ static int process_messages(struct thread *event); +static int handle_opq_registration(const struct zmsghdr *hdr, + struct stream *msg); +static int handle_opq_unregistration(const struct zmsghdr *hdr, + struct stream *msg); +static int dispatch_opq_messages(struct stream_fifo *msg_fifo); +static struct opq_msg_reg *opq_reg_lookup(uint32_t type); +static bool opq_client_match(const struct opq_client_reg *client, + const struct zapi_opaque_reg_info *info); +static struct opq_msg_reg *opq_reg_alloc(uint32_t type); +static void opq_reg_free(struct opq_msg_reg **reg); +static struct opq_client_reg *opq_client_alloc( + const struct zapi_opaque_reg_info *info); +static void opq_client_free(struct opq_client_reg **client); +static const char *opq_client2str(char *buf, size_t buflen, + const struct opq_client_reg *client); /* * Initialize the module at startup @@ -139,9 +191,26 @@ void zebra_opaque_stop(void) */ void zebra_opaque_finish(void) { + struct opq_msg_reg *reg; + struct opq_client_reg *client; + if (IS_ZEBRA_DEBUG_EVENT) zlog_debug("%s module shutdown", LOG_NAME); + /* Clear out registration info */ + while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) { + client = reg->clients; + while (client) { + reg->clients = client->next; + opq_client_free(&client); + client = reg->clients; + } + + opq_reg_free(®); + } + + opq_regh_fini(&opq_reg_hash); + pthread_mutex_destroy(&zo_info.mutex); stream_fifo_deinit(&zo_info.in_fifo); } @@ -215,7 +284,8 @@ static int process_messages(struct thread *event) if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) goto done; - /* Dequeue some messages from the incoming queue, temporarily + /* + * Dequeue some messages from the incoming queue, temporarily * save them on the local fifo */ frr_with_mutex(&zo_info.mutex) { @@ -228,7 +298,8 @@ static int process_messages(struct thread *event) stream_fifo_push(&fifo, msg); } - /* We may need to reschedule, if there are still + /* + * We may need to reschedule, if there are still * queued messages */ if (stream_fifo_head(&zo_info.in_fifo) != NULL) @@ -247,13 +318,12 @@ static int process_messages(struct thread *event) if (IS_ZEBRA_DEBUG_RECV) zlog_debug("%s: processing %u messages", __func__, i); - /* Process the messages on the local fifo */ - /* TODO -- just discarding the messages for now */ - msg = stream_fifo_pop(&fifo); - while (msg) { - stream_free(msg); - msg = stream_fifo_pop(&fifo); - } + /* + * Process the messages from the temporary fifo. We send the whole + * fifo so that we can take advantage of batching internally. Note + * that registration/deregistration messages are handled here also. + */ + dispatch_opq_messages(&fifo); done: @@ -269,3 +339,361 @@ done: return 0; } + +/* + * Process (dispatch) or drop opaque messages. + */ +static int dispatch_opq_messages(struct stream_fifo *msg_fifo) +{ + struct stream *msg, *dup; + struct zmsghdr hdr; + struct opq_msg_reg *reg; + uint32_t type; + struct opq_client_reg *client; + struct zserv *zclient; + char buf[50]; + + while ((msg = stream_fifo_pop(msg_fifo)) != NULL) { + zapi_parse_header(msg, &hdr); + hdr.length -= ZEBRA_HEADER_SIZE; + + /* Handle client registration messages */ + if (hdr.command == ZEBRA_OPAQUE_REGISTER) { + handle_opq_registration(&hdr, msg); + continue; + } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) { + handle_opq_unregistration(&hdr, msg); + continue; + } + + /* We only process OPAQUE messages - drop anything else */ + if (hdr.command != ZEBRA_OPAQUE_MESSAGE) + goto drop_it; + + /* Dispatch to any registered ZAPI client(s) */ + + /* Extract subtype */ + STREAM_GETL(msg, type); + + /* Look up registered ZAPI client(s) */ + reg = opq_reg_lookup(type); + if (reg == NULL) { + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: no registrations for opaque type %u", + __func__, type); + goto drop_it; + } + + /* Reset read pointer, since we'll be re-sending message */ + stream_set_getp(msg, 0); + + /* Send a copy of the message to all registered clients */ + for (client = reg->clients; client; client = client->next) { + dup = NULL; + + /* Copy message if necessary */ + if (client->next) + dup = stream_dup(msg); + + /* + * TODO -- this isn't ideal: we're going through an + * acquire/release cycle for each client for each + * message. Replace this with a batching version. + */ + zclient = zserv_acquire_client(client->proto, + client->instance, + client->session_id); + if (zclient) { + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: sending %s to client %s", + __func__, + (dup ? "dup" : "msg"), + opq_client2str(buf, + sizeof(buf), + client)); + + /* + * Sending a message actually means enqueuing + * it for a zapi io pthread to send - so we + * don't touch the message after this call. + */ + zserv_send_message(zclient, dup ? dup : msg); + if (dup) + dup = NULL; + else + msg = NULL; + + zserv_release_client(zclient); + } else { + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: type %u: no zclient for %s", + __func__, type, + opq_client2str(buf, + sizeof(buf), + client)); + /* Registered but gone? */ + if (dup) + stream_free(dup); + } + } + +drop_it: +stream_failure: + if (msg) + stream_free(msg); + } + + return 0; +} + +/* + * Process a register/unregister message + */ +static int handle_opq_registration(const struct zmsghdr *hdr, + struct stream *msg) +{ + int ret = 0; + struct zapi_opaque_reg_info info; + struct opq_client_reg *client; + struct opq_msg_reg key, *reg; + char buf[50]; + + memset(&info, 0, sizeof(info)); + + if (zapi_parse_opaque_reg(msg, &info) < 0) { + ret = -1; + goto done; + } + + memset(&key, 0, sizeof(key)); + + key.type = info.type; + + reg = opq_regh_find(&opq_reg_hash, &key); + if (reg) { + /* Look for dup client */ + for (client = reg->clients; client != NULL; + client = client->next) { + if (opq_client_match(client, &info)) + break; + } + + if (client) { + /* Oops - duplicate registration? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: duplicate opq reg for client %s", + __func__, + opq_client2str(buf, sizeof(buf), + client)); + goto done; + } + + client = opq_client_alloc(&info); + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s registers for %u", + __func__, + opq_client2str(buf, sizeof(buf), client), + info.type); + + /* Link client into registration */ + client->next = reg->clients; + if (reg->clients) + reg->clients->prev = client; + reg->clients = client; + } else { + /* + * No existing registrations - create one, add the + * client, and add registration to hash. + */ + reg = opq_reg_alloc(info.type); + client = opq_client_alloc(&info); + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s registers for new reg %u", + __func__, + opq_client2str(buf, sizeof(buf), client), + info.type); + + reg->clients = client; + + opq_regh_add(&opq_reg_hash, reg); + } + +done: + + stream_free(msg); + return ret; +} + +/* + * Process a register/unregister message + */ +static int handle_opq_unregistration(const struct zmsghdr *hdr, + struct stream *msg) +{ + int ret = 0; + struct zapi_opaque_reg_info info; + struct opq_client_reg *client; + struct opq_msg_reg key, *reg; + char buf[50]; + + memset(&info, 0, sizeof(info)); + + if (zapi_parse_opaque_reg(msg, &info) < 0) { + ret = -1; + goto done; + } + + memset(&key, 0, sizeof(key)); + + key.type = info.type; + + reg = opq_regh_find(&opq_reg_hash, &key); + if (reg == NULL) { + /* Weird: unregister for unknown message? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u", + __func__, + zebra_route_string(info.proto), + info.instance, info.session_id, info.type); + goto done; + } + + /* Look for client */ + for (client = reg->clients; client != NULL; + client = client->next) { + if (opq_client_match(client, &info)) + break; + } + + if (client == NULL) { + /* Oops - unregister for unknown client? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: unknown client %s/%u/%u unregisters for %u", + __func__, zebra_route_string(info.proto), + info.instance, info.session_id, info.type); + goto done; + } + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s unregisters for %u", + __func__, opq_client2str(buf, sizeof(buf), client), + info.type); + + if (client->prev) + client->prev->next = client->next; + if (client->next) + client->next->prev = client->prev; + if (reg->clients == client) + reg->clients = client->next; + + opq_client_free(&client); + + /* Is registration empty now? */ + if (reg->clients == NULL) { + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: free empty reg %u", __func__, + reg->type); + + opq_regh_del(&opq_reg_hash, reg); + opq_reg_free(®); + } + +done: + + stream_free(msg); + return ret; +} + +/* Compare utility for registered clients */ +static bool opq_client_match(const struct opq_client_reg *client, + const struct zapi_opaque_reg_info *info) +{ + if (client->proto == info->proto && + client->instance == info->instance && + client->session_id == info->session_id) + return true; + else + return false; +} + +static struct opq_msg_reg *opq_reg_lookup(uint32_t type) +{ + struct opq_msg_reg key, *reg; + + memset(&key, 0, sizeof(key)); + + key.type = type; + + reg = opq_regh_find(&opq_reg_hash, &key); + + return reg; +} + +static struct opq_msg_reg *opq_reg_alloc(uint32_t type) +{ + struct opq_msg_reg *reg; + + reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg)); + + reg->type = type; + INIT_HASH(®->item); + + return reg; +} + +static void opq_reg_free(struct opq_msg_reg **reg) +{ + XFREE(MTYPE_OPQ, (*reg)); +} + +static struct opq_client_reg *opq_client_alloc( + const struct zapi_opaque_reg_info *info) +{ + struct opq_client_reg *client; + + client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg)); + + client->proto = info->proto; + client->instance = info->instance; + client->session_id = info->session_id; + + return client; +} + +static void opq_client_free(struct opq_client_reg **client) +{ + XFREE(MTYPE_OPQ, (*client)); +} + +static const char *opq_client2str(char *buf, size_t buflen, + const struct opq_client_reg *client) +{ + char sbuf[20]; + + snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto), + client->instance); + if (client->session_id > 0) { + snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id); + strlcat(buf, sbuf, buflen); + } + + return buf; +} + +/* Hash function for clients registered for messages */ +static uint32_t registration_hash(const struct opq_msg_reg *reg) +{ + return reg->type; +} + +/* Comparison function for client registrations */ +static int registration_compare(const struct opq_msg_reg *reg1, + const struct opq_msg_reg *reg2) +{ + if (reg1->type == reg2->type) + return 0; + else + return -1; +}