mirror of
https://git.proxmox.com/git/mirror_frr
synced 2025-08-09 07:11:05 +00:00
zebra: use atomic operations in FPM
FPM has a thread to encode and enqueue output buffer that might compete with zebra RIB/RMAC walk on startup, so lets use atomic operations to make sure we are not getting statistic/counters wrong. Signed-off-by: Rafael Zalamena <rzalamena@opensourcerouting.org>
This commit is contained in:
parent
ba803a2fbe
commit
edfeff4251
@ -102,33 +102,33 @@ struct fpm_nl_ctx {
|
|||||||
/* Statistic counters. */
|
/* Statistic counters. */
|
||||||
struct {
|
struct {
|
||||||
/* Amount of bytes read into ibuf. */
|
/* Amount of bytes read into ibuf. */
|
||||||
uint64_t bytes_read;
|
_Atomic uint64_t bytes_read;
|
||||||
/* Amount of bytes written from obuf. */
|
/* Amount of bytes written from obuf. */
|
||||||
uint64_t bytes_sent;
|
_Atomic uint64_t bytes_sent;
|
||||||
/* Output buffer current usage. */
|
/* Output buffer current usage. */
|
||||||
uint64_t obuf_bytes;
|
_Atomic uint64_t obuf_bytes;
|
||||||
/* Output buffer peak usage. */
|
/* Output buffer peak usage. */
|
||||||
uint64_t obuf_peak;
|
_Atomic uint64_t obuf_peak;
|
||||||
|
|
||||||
/* Amount of connection closes. */
|
/* Amount of connection closes. */
|
||||||
uint64_t connection_closes;
|
_Atomic uint64_t connection_closes;
|
||||||
/* Amount of connection errors. */
|
/* Amount of connection errors. */
|
||||||
uint64_t connection_errors;
|
_Atomic uint64_t connection_errors;
|
||||||
|
|
||||||
/* Amount of user configurations: FNE_RECONNECT. */
|
/* Amount of user configurations: FNE_RECONNECT. */
|
||||||
uint64_t user_configures;
|
_Atomic uint64_t user_configures;
|
||||||
/* Amount of user disable requests: FNE_DISABLE. */
|
/* Amount of user disable requests: FNE_DISABLE. */
|
||||||
uint64_t user_disables;
|
_Atomic uint64_t user_disables;
|
||||||
|
|
||||||
/* Amount of data plane context processed. */
|
/* Amount of data plane context processed. */
|
||||||
uint64_t dplane_contexts;
|
_Atomic uint64_t dplane_contexts;
|
||||||
/* Amount of data plane contexts enqueued. */
|
/* Amount of data plane contexts enqueued. */
|
||||||
uint64_t ctxqueue_len;
|
_Atomic uint64_t ctxqueue_len;
|
||||||
/* Peak amount of data plane contexts enqueued. */
|
/* Peak amount of data plane contexts enqueued. */
|
||||||
uint64_t ctxqueue_len_peak;
|
_Atomic uint64_t ctxqueue_len_peak;
|
||||||
|
|
||||||
/* Amount of buffer full events. */
|
/* Amount of buffer full events. */
|
||||||
uint64_t buffer_full;
|
_Atomic uint64_t buffer_full;
|
||||||
} counters;
|
} counters;
|
||||||
} * gfnc;
|
} * gfnc;
|
||||||
|
|
||||||
@ -415,7 +415,7 @@ static int fpm_read(struct thread *t)
|
|||||||
rv = stream_read_try(fnc->ibuf, fnc->socket,
|
rv = stream_read_try(fnc->ibuf, fnc->socket,
|
||||||
STREAM_WRITEABLE(fnc->ibuf));
|
STREAM_WRITEABLE(fnc->ibuf));
|
||||||
if (rv == 0) {
|
if (rv == 0) {
|
||||||
fnc->counters.connection_closes++;
|
atomic_fetch_add(&fnc->counters.connection_closes, 1);
|
||||||
zlog_debug("%s: connection closed", __func__);
|
zlog_debug("%s: connection closed", __func__);
|
||||||
fpm_reconnect(fnc);
|
fpm_reconnect(fnc);
|
||||||
return 0;
|
return 0;
|
||||||
@ -425,7 +425,7 @@ static int fpm_read(struct thread *t)
|
|||||||
|| errno == EINTR)
|
|| errno == EINTR)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
fnc->counters.connection_errors++;
|
atomic_fetch_add(&fnc->counters.connection_errors, 1);
|
||||||
zlog_debug("%s: connection failure: %s", __func__,
|
zlog_debug("%s: connection failure: %s", __func__,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
fpm_reconnect(fnc);
|
fpm_reconnect(fnc);
|
||||||
@ -434,7 +434,7 @@ static int fpm_read(struct thread *t)
|
|||||||
stream_reset(fnc->ibuf);
|
stream_reset(fnc->ibuf);
|
||||||
|
|
||||||
/* Account all bytes read. */
|
/* Account all bytes read. */
|
||||||
fnc->counters.bytes_read += rv;
|
atomic_fetch_add(&fnc->counters.bytes_read, rv);
|
||||||
|
|
||||||
thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
|
thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
|
||||||
&fnc->t_read);
|
&fnc->t_read);
|
||||||
@ -464,7 +464,7 @@ static int fpm_write(struct thread *t)
|
|||||||
zlog_debug("%s: SO_ERROR failed: %s", __func__,
|
zlog_debug("%s: SO_ERROR failed: %s", __func__,
|
||||||
strerror(status));
|
strerror(status));
|
||||||
|
|
||||||
fnc->counters.connection_errors++;
|
atomic_fetch_add(&fnc->counters.connection_errors, 1);
|
||||||
|
|
||||||
fpm_reconnect(fnc);
|
fpm_reconnect(fnc);
|
||||||
return 0;
|
return 0;
|
||||||
@ -493,7 +493,7 @@ static int fpm_write(struct thread *t)
|
|||||||
stream_get_getp(fnc->obuf);
|
stream_get_getp(fnc->obuf);
|
||||||
bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
|
bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
|
||||||
if (bwritten == 0) {
|
if (bwritten == 0) {
|
||||||
fnc->counters.connection_closes++;
|
atomic_fetch_add(&fnc->counters.connection_closes, 1);
|
||||||
zlog_debug("%s: connection closed", __func__);
|
zlog_debug("%s: connection closed", __func__);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -505,7 +505,7 @@ static int fpm_write(struct thread *t)
|
|||||||
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
fnc->counters.connection_errors++;
|
atomic_fetch_add(&fnc->counters.connection_errors, 1);
|
||||||
zlog_debug("%s: connection failure: %s", __func__,
|
zlog_debug("%s: connection failure: %s", __func__,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
fpm_reconnect(fnc);
|
fpm_reconnect(fnc);
|
||||||
@ -513,10 +513,10 @@ static int fpm_write(struct thread *t)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Account all bytes sent. */
|
/* Account all bytes sent. */
|
||||||
fnc->counters.bytes_sent += bwritten;
|
atomic_fetch_add(&fnc->counters.bytes_sent, bwritten);
|
||||||
|
|
||||||
/* Account number of bytes free. */
|
/* Account number of bytes free. */
|
||||||
fnc->counters.obuf_bytes -= bwritten;
|
atomic_fetch_sub(&fnc->counters.obuf_bytes, bwritten);
|
||||||
|
|
||||||
stream_forward_getp(fnc->obuf, (size_t)bwritten);
|
stream_forward_getp(fnc->obuf, (size_t)bwritten);
|
||||||
}
|
}
|
||||||
@ -565,7 +565,7 @@ static int fpm_connect(struct thread *t)
|
|||||||
|
|
||||||
rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
|
rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
|
||||||
if (rv == -1 && errno != EINPROGRESS) {
|
if (rv == -1 && errno != EINPROGRESS) {
|
||||||
fnc->counters.connection_errors++;
|
atomic_fetch_add(&fnc->counters.connection_errors, 1);
|
||||||
close(sock);
|
close(sock);
|
||||||
zlog_warn("%s: fpm connection failed: %s", __func__,
|
zlog_warn("%s: fpm connection failed: %s", __func__,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
@ -603,6 +603,7 @@ static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
|
|||||||
uint8_t nl_buf[NL_PKT_BUF_SIZE];
|
uint8_t nl_buf[NL_PKT_BUF_SIZE];
|
||||||
size_t nl_buf_len;
|
size_t nl_buf_len;
|
||||||
ssize_t rv;
|
ssize_t rv;
|
||||||
|
uint64_t obytes, obytes_peak;
|
||||||
|
|
||||||
nl_buf_len = 0;
|
nl_buf_len = 0;
|
||||||
|
|
||||||
@ -689,7 +690,7 @@ static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
|
|||||||
|
|
||||||
/* Check if we have enough buffer space. */
|
/* Check if we have enough buffer space. */
|
||||||
if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
|
if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
|
||||||
fnc->counters.buffer_full++;
|
atomic_fetch_add(&fnc->counters.buffer_full, 1);
|
||||||
zlog_debug("%s: buffer full: wants to write %lu but has %ld",
|
zlog_debug("%s: buffer full: wants to write %lu but has %ld",
|
||||||
__func__, nl_buf_len + FPM_HEADER_SIZE,
|
__func__, nl_buf_len + FPM_HEADER_SIZE,
|
||||||
STREAM_WRITEABLE(fnc->obuf));
|
STREAM_WRITEABLE(fnc->obuf));
|
||||||
@ -709,9 +710,14 @@ static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
|
|||||||
stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
|
stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
|
||||||
|
|
||||||
/* Account number of bytes waiting to be written. */
|
/* Account number of bytes waiting to be written. */
|
||||||
fnc->counters.obuf_bytes += nl_buf_len + FPM_HEADER_SIZE;
|
atomic_fetch_add(&fnc->counters.obuf_bytes,
|
||||||
if (fnc->counters.obuf_peak < fnc->counters.obuf_bytes)
|
nl_buf_len + FPM_HEADER_SIZE);
|
||||||
fnc->counters.obuf_peak = fnc->counters.obuf_bytes;
|
obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
|
||||||
|
memory_order_relaxed);
|
||||||
|
obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
|
||||||
|
memory_order_relaxed);
|
||||||
|
if (obytes_peak < obytes)
|
||||||
|
atomic_store(&fnc->counters.obuf_peak, obytes);
|
||||||
|
|
||||||
/* Tell the thread to start writing. */
|
/* Tell the thread to start writing. */
|
||||||
thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
|
thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
|
||||||
@ -908,15 +914,15 @@ static int fpm_process_queue(struct thread *t)
|
|||||||
fpm_nl_enqueue(fnc, ctx);
|
fpm_nl_enqueue(fnc, ctx);
|
||||||
|
|
||||||
/* Account the processed entries. */
|
/* Account the processed entries. */
|
||||||
fnc->counters.dplane_contexts++;
|
atomic_fetch_add(&fnc->counters.dplane_contexts, 1);
|
||||||
fnc->counters.ctxqueue_len--;
|
atomic_fetch_sub(&fnc->counters.ctxqueue_len, 1);
|
||||||
|
|
||||||
dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
|
dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
|
||||||
dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
|
dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Check for more items in the queue. */
|
/* Check for more items in the queue. */
|
||||||
if (fnc->counters.ctxqueue_len)
|
if (atomic_load(&fnc->counters.ctxqueue_len) > 0)
|
||||||
thread_add_timer(fnc->fthread->master, fpm_process_queue,
|
thread_add_timer(fnc->fthread->master, fpm_process_queue,
|
||||||
fnc, 0, &fnc->t_dequeue);
|
fnc, 0, &fnc->t_dequeue);
|
||||||
|
|
||||||
@ -935,7 +941,7 @@ static int fpm_process_event(struct thread *t)
|
|||||||
case FNE_DISABLE:
|
case FNE_DISABLE:
|
||||||
zlog_debug("%s: manual FPM disable event", __func__);
|
zlog_debug("%s: manual FPM disable event", __func__);
|
||||||
fnc->disabled = true;
|
fnc->disabled = true;
|
||||||
fnc->counters.user_disables++;
|
atomic_fetch_add(&fnc->counters.user_disables, 1);
|
||||||
|
|
||||||
/* Call reconnect to disable timers and clean up context. */
|
/* Call reconnect to disable timers and clean up context. */
|
||||||
fpm_reconnect(fnc);
|
fpm_reconnect(fnc);
|
||||||
@ -944,7 +950,7 @@ static int fpm_process_event(struct thread *t)
|
|||||||
case FNE_RECONNECT:
|
case FNE_RECONNECT:
|
||||||
zlog_debug("%s: manual FPM reconnect event", __func__);
|
zlog_debug("%s: manual FPM reconnect event", __func__);
|
||||||
fnc->disabled = false;
|
fnc->disabled = false;
|
||||||
fnc->counters.user_configures++;
|
atomic_fetch_add(&fnc->counters.user_configures, 1);
|
||||||
fpm_reconnect(fnc);
|
fpm_reconnect(fnc);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@ -1000,6 +1006,7 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov)
|
|||||||
struct zebra_dplane_ctx *ctx;
|
struct zebra_dplane_ctx *ctx;
|
||||||
struct fpm_nl_ctx *fnc;
|
struct fpm_nl_ctx *fnc;
|
||||||
int counter, limit;
|
int counter, limit;
|
||||||
|
uint64_t cur_queue, peak_queue;
|
||||||
|
|
||||||
fnc = dplane_provider_get_data(prov);
|
fnc = dplane_provider_get_data(prov);
|
||||||
limit = dplane_provider_get_work_limit(prov);
|
limit = dplane_provider_get_work_limit(prov);
|
||||||
@ -1017,11 +1024,13 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov)
|
|||||||
dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
|
dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
|
||||||
|
|
||||||
/* Account the number of contexts. */
|
/* Account the number of contexts. */
|
||||||
fnc->counters.ctxqueue_len++;
|
atomic_fetch_add(&fnc->counters.ctxqueue_len, 1);
|
||||||
if (fnc->counters.ctxqueue_len_peak <
|
cur_queue = atomic_load_explicit(&fnc->counters.ctxqueue_len,
|
||||||
fnc->counters.ctxqueue_len)
|
memory_order_relaxed);
|
||||||
fnc->counters.ctxqueue_len_peak =
|
peak_queue = atomic_load_explicit(&fnc->counters.ctxqueue_len_peak,
|
||||||
fnc->counters.ctxqueue_len;
|
memory_order_relaxed);
|
||||||
|
if (peak_queue < cur_queue)
|
||||||
|
atomic_store(&fnc->counters.ctxqueue_len_peak, peak_queue);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1029,7 +1038,7 @@ static int fpm_nl_process(struct zebra_dplane_provider *prov)
|
|||||||
dplane_provider_enqueue_out_ctx(prov, ctx);
|
dplane_provider_enqueue_out_ctx(prov, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fnc->counters.ctxqueue_len)
|
if (atomic_load(&fnc->counters.ctxqueue_len) > 0)
|
||||||
thread_add_timer(fnc->fthread->master, fpm_process_queue,
|
thread_add_timer(fnc->fthread->master, fpm_process_queue,
|
||||||
fnc, 0, &fnc->t_dequeue);
|
fnc, 0, &fnc->t_dequeue);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user