zebra: optimize zserv_process_messages

* Simplify zapi_msg <-> zserv interaction
* Remove header validity checks, as they're already performed before the
  packet ever makes it here
* Perform the same kind of batch processing done in zserv_write by
  copying multiple inbound packets under lock instead of doing serial
  locking
* Perform self-scheduling under the same lock

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
This commit is contained in:
Quentin Young 2018-04-24 17:03:19 -04:00
parent 370d8dad79
commit 904e0d8830
3 changed files with 44 additions and 60 deletions

View File

@ -3017,14 +3017,27 @@ void (*zserv_handlers[])(ZAPI_HANDLER_ARGS) = {
[ZEBRA_IPTABLE_DELETE] = zread_iptable, [ZEBRA_IPTABLE_DELETE] = zread_iptable,
}; };
void zserv_handle_commands(struct zserv *client, struct zmsghdr *hdr, void zserv_handle_commands(struct zserv *client, struct stream *msg)
struct stream *msg, struct zebra_vrf *zvrf)
{ {
if (hdr->command > array_size(zserv_handlers) struct zmsghdr hdr;
|| zserv_handlers[hdr->command] == NULL) struct zebra_vrf *zvrf;
zlog_info("Zebra received unknown command %d", hdr->command);
else
zserv_handlers[hdr->command](client, hdr, msg, zvrf);
stream_free(msg); zapi_parse_header(msg, &hdr);
hdr.length -= ZEBRA_HEADER_SIZE;
/* lookup vrf */
zvrf = zebra_vrf_lookup_by_id(hdr.vrf_id);
if (!zvrf) {
if (IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV)
zlog_warn("ZAPI message specifies unknown VRF: %d",
hdr.vrf_id);
return;
}
if (hdr.command > array_size(zserv_handlers)
|| zserv_handlers[hdr.command] == NULL)
zlog_info("Zebra received unknown command %d", hdr.command);
else
zserv_handlers[hdr.command](client, &hdr, msg, zvrf);
} }

View File

@ -35,17 +35,10 @@
* client * client
* the client datastructure * the client datastructure
* *
* hdr
* the message header
*
* msg * msg
* the message contents, without the header * the message
*
* zvrf
* the vrf
*/ */
extern void zserv_handle_commands(struct zserv *client, struct zmsghdr *hdr, extern void zserv_handle_commands(struct zserv *client, struct stream *msg);
struct stream *msg, struct zebra_vrf *zvrf);
extern int zsend_vrf_add(struct zserv *zclient, struct zebra_vrf *zvrf); extern int zsend_vrf_add(struct zserv *zclient, struct zebra_vrf *zvrf);
extern int zsend_vrf_delete(struct zserv *zclient, struct zebra_vrf *zvrf); extern int zsend_vrf_delete(struct zserv *zclient, struct zebra_vrf *zvrf);

View File

@ -493,62 +493,40 @@ static void zserv_client_event(struct zserv *client,
* *
* Each message is popped off the client's input queue and the action associated * Each message is popped off the client's input queue and the action associated
* with the message is executed. This proceeds until there are no more messages, * with the message is executed. This proceeds until there are no more messages,
* an error occurs, or the processing limit is reached. In the last case, this * an error occurs, or the processing limit is reached.
* task reschedules itself. *
* This task reschedules itself if it cannot process everything on the input
* queue in one run.
*/ */
static int zserv_process_messages(struct thread *thread) static int zserv_process_messages(struct thread *thread)
{ {
struct zserv *client = THREAD_ARG(thread); struct zserv *client = THREAD_ARG(thread);
struct zebra_vrf *zvrf;
struct zmsghdr hdr;
struct stream *msg; struct stream *msg;
bool hdrvalid; struct stream_fifo *cache = stream_fifo_new();
int p2p = zebrad.packets_to_process; uint32_t p2p = zebrad.packets_to_process;
do {
pthread_mutex_lock(&client->ibuf_mtx); pthread_mutex_lock(&client->ibuf_mtx);
{ {
msg = stream_fifo_pop(client->ibuf_fifo); for (uint32_t i = p2p - 1; i && client->ibuf_fifo->head; --i)
} stream_fifo_push(cache,
pthread_mutex_unlock(&client->ibuf_mtx); stream_fifo_pop(client->ibuf_fifo));
/* break if out of messages */ if (client->ibuf_fifo->head)
if (!msg)
continue;
/* read & check header */
hdrvalid = zapi_parse_header(msg, &hdr);
if (!hdrvalid && IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV) {
const char *emsg = "Message has corrupt header";
zserv_log_message(emsg, msg, NULL);
}
if (!hdrvalid)
continue;
hdr.length -= ZEBRA_HEADER_SIZE;
/* lookup vrf */
zvrf = zebra_vrf_lookup_by_id(hdr.vrf_id);
if (!zvrf && IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV) {
const char *emsg = "Message specifies unknown VRF";
zserv_log_message(emsg, msg, &hdr);
}
if (!zvrf)
continue;
/* process commands */
zserv_handle_commands(client, &hdr, msg, zvrf);
} while (msg && --p2p);
/* reschedule self if necessary */
pthread_mutex_lock(&client->ibuf_mtx);
{
if (client->ibuf_fifo->count)
zserv_event(client, ZSERV_PROCESS_MESSAGES); zserv_event(client, ZSERV_PROCESS_MESSAGES);
} }
pthread_mutex_unlock(&client->ibuf_mtx); pthread_mutex_unlock(&client->ibuf_mtx);
while (p2p--) {
msg = stream_fifo_pop(cache);
if (!msg)
break;
zserv_handle_commands(client, msg);
stream_free(msg);
}
stream_fifo_free(cache);
return 0; return 0;
} }