diff --git a/zebra/dplane_fpm_nl.c b/zebra/dplane_fpm_nl.c index 261b859bf6..aeb4c7fadd 100644 --- a/zebra/dplane_fpm_nl.c +++ b/zebra/dplane_fpm_nl.c @@ -1222,24 +1222,27 @@ static int fpm_process_queue(struct thread *t) { struct fpm_nl_ctx *fnc = THREAD_ARG(t); struct zebra_dplane_ctx *ctx; - - frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex); + bool no_bufs = false; + uint64_t processed_contexts = 0; while (true) { /* No space available yet. */ - if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) + if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) { + no_bufs = true; break; + } /* Dequeue next item or quit processing. */ - ctx = dplane_ctx_dequeue(&fnc->ctxqueue); + frr_with_mutex (&fnc->ctxqueue_mutex) { + ctx = dplane_ctx_dequeue(&fnc->ctxqueue); + } if (ctx == NULL) break; fpm_nl_enqueue(fnc, ctx); /* Account the processed entries. */ - atomic_fetch_add_explicit(&fnc->counters.dplane_contexts, 1, - memory_order_relaxed); + processed_contexts++; atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1, memory_order_relaxed); @@ -1247,10 +1250,12 @@ static int fpm_process_queue(struct thread *t) dplane_provider_enqueue_out_ctx(fnc->prov, ctx); } - /* Check for more items in the queue. */ - if (atomic_load_explicit(&fnc->counters.ctxqueue_len, - memory_order_relaxed) - > 0) + /* Update count of processed contexts */ + atomic_fetch_add_explicit(&fnc->counters.dplane_contexts, + processed_contexts, memory_order_relaxed); + + /* Re-schedule if we ran out of buffer space */ + if (no_bufs) thread_add_timer(fnc->fthread->master, fpm_process_queue, fnc, 0, &fnc->t_dequeue); @@ -1421,7 +1426,7 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov) struct zebra_dplane_ctx *ctx; struct fpm_nl_ctx *fnc; int counter, limit; - uint64_t cur_queue, peak_queue; + uint64_t cur_queue, peak_queue = 0, stored_peak_queue; fnc = dplane_provider_get_data(prov); limit = dplane_provider_get_work_limit(prov); @@ -1435,22 +1440,22 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov) * anyway. */ if (fnc->socket != -1 && fnc->connecting == false) { - frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex); - dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx); - - /* Account the number of contexts. */ + /* + * Update the number of queued contexts *before* + * enqueueing, to ensure counter consistency. + */ atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len, 1, memory_order_relaxed); + + frr_with_mutex (&fnc->ctxqueue_mutex) { + dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx); + } + cur_queue = atomic_load_explicit( &fnc->counters.ctxqueue_len, memory_order_relaxed); - peak_queue = atomic_load_explicit( - &fnc->counters.ctxqueue_len_peak, - memory_order_relaxed); if (peak_queue < cur_queue) - atomic_store_explicit( - &fnc->counters.ctxqueue_len_peak, - cur_queue, memory_order_relaxed); + peak_queue = cur_queue; continue; } @@ -1458,6 +1463,13 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov) dplane_provider_enqueue_out_ctx(prov, ctx); } + /* Update peak queue length, if we just observed a new peak */ + stored_peak_queue = atomic_load_explicit( + &fnc->counters.ctxqueue_len_peak, memory_order_relaxed); + if (stored_peak_queue < peak_queue) + atomic_store_explicit(&fnc->counters.ctxqueue_len_peak, + peak_queue, memory_order_relaxed); + if (atomic_load_explicit(&fnc->counters.ctxqueue_len, memory_order_relaxed) > 0)