Merge pull request #7766 from deastoe/dplane-fpm-nl_optimisations

dplane_fpm_nl optimisations
This commit is contained in:
Donald Sharp 2020-12-18 18:47:35 -05:00 committed by GitHub
commit 1a161d8093
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1222,24 +1222,27 @@ static int fpm_process_queue(struct thread *t)
{
struct fpm_nl_ctx *fnc = THREAD_ARG(t);
struct zebra_dplane_ctx *ctx;
frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
bool no_bufs = false;
uint64_t processed_contexts = 0;
while (true) {
/* No space available yet. */
if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE)
if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) {
no_bufs = true;
break;
}
/* Dequeue next item or quit processing. */
ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
frr_with_mutex (&fnc->ctxqueue_mutex) {
ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
}
if (ctx == NULL)
break;
fpm_nl_enqueue(fnc, ctx);
/* Account the processed entries. */
atomic_fetch_add_explicit(&fnc->counters.dplane_contexts, 1,
memory_order_relaxed);
processed_contexts++;
atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
memory_order_relaxed);
@ -1247,10 +1250,12 @@ static int fpm_process_queue(struct thread *t)
dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
}
/* Check for more items in the queue. */
if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
memory_order_relaxed)
> 0)
/* Update count of processed contexts */
atomic_fetch_add_explicit(&fnc->counters.dplane_contexts,
processed_contexts, memory_order_relaxed);
/* Re-schedule if we ran out of buffer space */
if (no_bufs)
thread_add_timer(fnc->fthread->master, fpm_process_queue,
fnc, 0, &fnc->t_dequeue);
@ -1421,7 +1426,7 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov)
struct zebra_dplane_ctx *ctx;
struct fpm_nl_ctx *fnc;
int counter, limit;
uint64_t cur_queue, peak_queue;
uint64_t cur_queue, peak_queue = 0, stored_peak_queue;
fnc = dplane_provider_get_data(prov);
limit = dplane_provider_get_work_limit(prov);
@ -1435,22 +1440,22 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov)
* anyway.
*/
if (fnc->socket != -1 && fnc->connecting == false) {
frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
/* Account the number of contexts. */
/*
* Update the number of queued contexts *before*
* enqueueing, to ensure counter consistency.
*/
atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1, memory_order_relaxed);
frr_with_mutex (&fnc->ctxqueue_mutex) {
dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
}
cur_queue = atomic_load_explicit(
&fnc->counters.ctxqueue_len,
memory_order_relaxed);
peak_queue = atomic_load_explicit(
&fnc->counters.ctxqueue_len_peak,
memory_order_relaxed);
if (peak_queue < cur_queue)
atomic_store_explicit(
&fnc->counters.ctxqueue_len_peak,
cur_queue, memory_order_relaxed);
peak_queue = cur_queue;
continue;
}
@ -1458,6 +1463,13 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov)
dplane_provider_enqueue_out_ctx(prov, ctx);
}
/* Update peak queue length, if we just observed a new peak */
stored_peak_queue = atomic_load_explicit(
&fnc->counters.ctxqueue_len_peak, memory_order_relaxed);
if (stored_peak_queue < peak_queue)
atomic_store_explicit(&fnc->counters.ctxqueue_len_peak,
peak_queue, memory_order_relaxed);
if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
memory_order_relaxed)
> 0)