diff --git a/zebra/dplane_fpm_nl.c b/zebra/dplane_fpm_nl.c index 77326f8d7c..5fada3b352 100644 --- a/zebra/dplane_fpm_nl.c +++ b/zebra/dplane_fpm_nl.c @@ -76,12 +76,22 @@ struct fpm_nl_ctx { struct stream *obuf; pthread_mutex_t obuf_mutex; + /* + * data plane context queue: + * When a FPM server connection becomes a bottleneck, we must keep the + * data plane contexts until we get a chance to process them. + */ + struct dplane_ctx_q ctxqueue; + pthread_mutex_t ctxqueue_mutex; + /* data plane events. */ + struct zebra_dplane_provider *prov; struct frr_pthread *fthread; struct thread *t_connect; struct thread *t_read; struct thread *t_write; struct thread *t_event; + struct thread *t_dequeue; /* zebra events. */ struct thread *t_ribreset; @@ -112,6 +122,10 @@ struct fpm_nl_ctx { /* Amount of data plane context processed. */ uint64_t dplane_contexts; + /* Amount of data plane contexts enqueued. */ + uint64_t ctxqueue_len; + /* Peak amount of data plane contexts enqueued. */ + uint64_t ctxqueue_len_peak; /* Amount of buffer full events. */ uint64_t buffer_full; @@ -266,6 +280,10 @@ DEFUN(fpm_show_counters, fpm_show_counters_cmd, SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors); SHOW_COUNTER("Data plane items processed", gfnc->counters.dplane_contexts); + SHOW_COUNTER("Data plane items enqueued", + gfnc->counters.ctxqueue_len); + SHOW_COUNTER("Data plane items queue peak", + gfnc->counters.ctxqueue_len_peak); SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full); SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures); SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables); @@ -292,6 +310,10 @@ DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd, json_object_int_add(jo, "connection-closes", gfnc->counters.connection_closes); json_object_int_add(jo, "connection-errors", gfnc->counters.connection_errors); json_object_int_add(jo, "data-plane-contexts", gfnc->counters.dplane_contexts); + json_object_int_add(jo, "data-plane-contexts-queue", + gfnc->counters.ctxqueue_len); + json_object_int_add(jo, "data-plane-contexts-queue-peak", + gfnc->counters.ctxqueue_len_peak); json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full); json_object_int_add(jo, "user-configures", gfnc->counters.user_configures); json_object_int_add(jo, "user-disables", gfnc->counters.user_disables); @@ -866,6 +888,41 @@ static int fpm_rmac_reset(struct thread *t) return 0; } +static int fpm_process_queue(struct thread *t) +{ + struct fpm_nl_ctx *fnc = THREAD_ARG(t); + struct zebra_dplane_ctx *ctx; + + frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex); + + while (true) { + /* No space available yet. */ + if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) + break; + + /* Dequeue next item or quit processing. */ + ctx = dplane_ctx_dequeue(&fnc->ctxqueue); + if (ctx == NULL) + break; + + fpm_nl_enqueue(fnc, ctx); + + /* Account the processed entries. */ + fnc->counters.dplane_contexts++; + fnc->counters.ctxqueue_len--; + + dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS); + dplane_provider_enqueue_out_ctx(fnc->prov, ctx); + } + + /* Check for more items in the queue. */ + if (fnc->counters.ctxqueue_len) + thread_add_timer(fnc->fthread->master, fpm_process_queue, + fnc, 0, &fnc->t_dequeue); + + return 0; +} + /** * Handles external (e.g. CLI, data plane or others) events. */ @@ -919,6 +976,9 @@ static int fpm_nl_start(struct zebra_dplane_provider *prov) pthread_mutex_init(&fnc->obuf_mutex, NULL); fnc->socket = -1; fnc->disabled = true; + fnc->prov = prov; + TAILQ_INIT(&fnc->ctxqueue); + pthread_mutex_init(&fnc->ctxqueue_mutex, NULL); return 0; } @@ -953,15 +1013,26 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov) * anyway. */ if (fnc->socket != -1 && fnc->connecting == false) { - fpm_nl_enqueue(fnc, ctx); + frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex); + dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx); - fnc->counters.dplane_contexts++; + /* Account the number of contexts. */ + fnc->counters.ctxqueue_len++; + if (fnc->counters.ctxqueue_len_peak < + fnc->counters.ctxqueue_len) + fnc->counters.ctxqueue_len_peak = + fnc->counters.ctxqueue_len; + continue; } dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS); dplane_provider_enqueue_out_ctx(prov, ctx); } + if (fnc->counters.ctxqueue_len) + thread_add_timer(fnc->fthread->master, fpm_process_queue, + fnc, 0, &fnc->t_dequeue); + return 0; }