mirror of
https://git.proxmox.com/git/mirror_frr
synced 2025-08-02 13:23:44 +00:00
lib: msg: refactor common connection code from mgmtd
Signed-off-by: Christian Hopps <chopps@labn.net>
This commit is contained in:
parent
528b326616
commit
070c5e7a91
@ -7,6 +7,7 @@
|
||||
|
||||
#include <zebra.h>
|
||||
#include "debug.h"
|
||||
#include "compiler.h"
|
||||
#include "libfrr.h"
|
||||
#include "mgmtd/mgmt.h"
|
||||
#include "mgmt_be_client.h"
|
||||
@ -99,14 +100,7 @@ DECLARE_LIST(mgmt_be_txns, struct mgmt_be_txn_ctx, list_linkage);
|
||||
frr_each_safe (mgmt_be_batches, &(txn)->apply_cfgs, (batch))
|
||||
|
||||
struct mgmt_be_client_ctx {
|
||||
int conn_fd;
|
||||
struct event_loop *tm;
|
||||
struct event *conn_retry_tmr;
|
||||
struct event *conn_read_ev;
|
||||
struct event *conn_write_ev;
|
||||
struct event *msg_proc_ev;
|
||||
|
||||
struct mgmt_msg_state mstate;
|
||||
struct msg_client client;
|
||||
|
||||
struct nb_config *candidate_config;
|
||||
struct nb_config *running_config;
|
||||
@ -128,8 +122,7 @@ struct mgmt_be_client_ctx {
|
||||
struct debug mgmt_dbg_be_client = {0, "Management backend client operations"};
|
||||
|
||||
static struct mgmt_be_client_ctx mgmt_be_client_ctx = {
|
||||
.conn_fd = -1,
|
||||
};
|
||||
.client = {.conn = {.fd = -1}}};
|
||||
|
||||
const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = {
|
||||
#ifdef HAVE_STATICD
|
||||
@ -138,35 +131,13 @@ const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = {
|
||||
[MGMTD_BE_CLIENT_ID_MAX] = "Unknown/Invalid",
|
||||
};
|
||||
|
||||
/* Forward declarations */
|
||||
static void
|
||||
mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx,
|
||||
enum mgmt_be_event event);
|
||||
static void
|
||||
mgmt_be_client_schedule_conn_retry(struct mgmt_be_client_ctx *client_ctx,
|
||||
unsigned long intvl_secs);
|
||||
static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx,
|
||||
Mgmtd__BeMessage *be_msg);
|
||||
|
||||
static void
|
||||
mgmt_be_server_disconnect(struct mgmt_be_client_ctx *client_ctx,
|
||||
bool reconnect)
|
||||
Mgmtd__BeMessage *be_msg)
|
||||
{
|
||||
/* Notify client through registered callback (if any) */
|
||||
if (client_ctx->client_params.client_connect_notify)
|
||||
(void)(*client_ctx->client_params.client_connect_notify)(
|
||||
(uintptr_t)client_ctx,
|
||||
client_ctx->client_params.user_data, false);
|
||||
|
||||
if (client_ctx->conn_fd != -1) {
|
||||
close(client_ctx->conn_fd);
|
||||
client_ctx->conn_fd = -1;
|
||||
}
|
||||
|
||||
if (reconnect)
|
||||
mgmt_be_client_schedule_conn_retry(
|
||||
client_ctx,
|
||||
client_ctx->client_params.conn_retry_intvl_sec);
|
||||
return msg_conn_send_msg(
|
||||
&client_ctx->client.conn, MGMT_MSG_VERSION_PROTOBUF, be_msg,
|
||||
mgmtd__be_message__get_packed_size(be_msg),
|
||||
(size_t(*)(void *, void *))mgmtd__be_message__pack);
|
||||
}
|
||||
|
||||
static struct mgmt_be_batch_ctx *
|
||||
@ -853,12 +824,16 @@ mgmt_be_client_handle_msg(struct mgmt_be_client_ctx *client_ctx,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mgmt_be_client_process_msg(uint8_t version, void *user_ctx,
|
||||
uint8_t *data, size_t len)
|
||||
static void mgmt_be_client_process_msg(uint8_t version, uint8_t *data,
|
||||
size_t len, struct msg_conn *conn)
|
||||
{
|
||||
struct mgmt_be_client_ctx *client_ctx = user_ctx;
|
||||
struct mgmt_be_client_ctx *client_ctx;
|
||||
struct msg_client *client;
|
||||
Mgmtd__BeMessage *be_msg;
|
||||
|
||||
client = container_of(conn, struct msg_client, conn);
|
||||
client_ctx = container_of(client, struct mgmt_be_client_ctx, client);
|
||||
|
||||
be_msg = mgmtd__be_message__unpack(NULL, len, data);
|
||||
if (!be_msg) {
|
||||
MGMTD_BE_CLIENT_DBG("Failed to decode %zu bytes from server",
|
||||
@ -872,69 +847,6 @@ static void mgmt_be_client_process_msg(uint8_t version, void *user_ctx,
|
||||
mgmtd__be_message__free_unpacked(be_msg, NULL);
|
||||
}
|
||||
|
||||
static void mgmt_be_client_proc_msgbufs(struct event *thread)
|
||||
{
|
||||
struct mgmt_be_client_ctx *client_ctx = EVENT_ARG(thread);
|
||||
|
||||
if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_be_client_process_msg,
|
||||
client_ctx, MGMTD_DBG_BE_CLIENT_CHECK()))
|
||||
mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG);
|
||||
}
|
||||
|
||||
static void mgmt_be_client_read(struct event *thread)
|
||||
{
|
||||
struct mgmt_be_client_ctx *client_ctx = EVENT_ARG(thread);
|
||||
enum mgmt_msg_rsched rv;
|
||||
|
||||
rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd,
|
||||
MGMTD_DBG_BE_CLIENT_CHECK());
|
||||
if (rv == MSR_DISCONNECT) {
|
||||
mgmt_be_server_disconnect(client_ctx, true);
|
||||
return;
|
||||
}
|
||||
if (rv == MSR_SCHED_BOTH)
|
||||
mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG);
|
||||
mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ);
|
||||
}
|
||||
|
||||
static inline void
|
||||
mgmt_be_client_sched_msg_write(struct mgmt_be_client_ctx *client_ctx)
|
||||
{
|
||||
mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_WRITE);
|
||||
}
|
||||
|
||||
static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx,
|
||||
Mgmtd__BeMessage *be_msg)
|
||||
{
|
||||
if (client_ctx->conn_fd == -1) {
|
||||
MGMTD_BE_CLIENT_DBG("can't send message on closed connection");
|
||||
return -1;
|
||||
}
|
||||
|
||||
int rv = mgmt_msg_send_msg(
|
||||
&client_ctx->mstate, MGMT_MSG_VERSION_PROTOBUF, be_msg,
|
||||
mgmtd__be_message__get_packed_size(be_msg),
|
||||
(size_t(*)(void *, void *))mgmtd__be_message__pack,
|
||||
MGMTD_DBG_BE_CLIENT_CHECK());
|
||||
mgmt_be_client_sched_msg_write(client_ctx);
|
||||
return rv;
|
||||
}
|
||||
|
||||
static void mgmt_be_client_write(struct event *thread)
|
||||
{
|
||||
struct mgmt_be_client_ctx *client_ctx = EVENT_ARG(thread);
|
||||
enum mgmt_msg_wsched rv;
|
||||
|
||||
rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd,
|
||||
MGMTD_DBG_BE_CLIENT_CHECK());
|
||||
if (rv == MSW_SCHED_STREAM)
|
||||
mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_WRITE);
|
||||
else if (rv == MSW_DISCONNECT)
|
||||
mgmt_be_server_disconnect(client_ctx, true);
|
||||
else
|
||||
assert(rv == MSW_SCHED_NONE);
|
||||
}
|
||||
|
||||
static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx,
|
||||
bool subscr_xpaths, uint16_t num_reg_xpaths,
|
||||
char **reg_xpaths)
|
||||
@ -958,80 +870,37 @@ static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx,
|
||||
return mgmt_be_client_send_msg(client_ctx, &be_msg);
|
||||
}
|
||||
|
||||
static void mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx)
|
||||
static int _notify_conenct_disconnect(struct msg_client *client, bool connected)
|
||||
{
|
||||
const char *dbgtag = MGMTD_DBG_BE_CLIENT_CHECK() ? "BE-client" : NULL;
|
||||
struct mgmt_be_client_ctx *client_ctx =
|
||||
container_of(client, struct mgmt_be_client_ctx, client);
|
||||
int ret;
|
||||
|
||||
assert(client_ctx->conn_fd == -1);
|
||||
client_ctx->conn_fd = mgmt_msg_connect(
|
||||
MGMTD_BE_SERVER_PATH, MGMTD_SOCKET_BE_SEND_BUF_SIZE,
|
||||
MGMTD_SOCKET_BE_RECV_BUF_SIZE, dbgtag);
|
||||
|
||||
/* Send SUBSCRIBE_REQ message */
|
||||
if (client_ctx->conn_fd == -1 ||
|
||||
mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) {
|
||||
mgmt_be_server_disconnect(client_ctx, true);
|
||||
return;
|
||||
if (connected) {
|
||||
assert(client->conn.fd != -1);
|
||||
ret = mgmt_be_send_subscr_req(client_ctx, false, 0, NULL);
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Start reading from the socket */
|
||||
mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ);
|
||||
|
||||
/* Notify client through registered callback (if any) */
|
||||
/* Notify BE client through registered callback (if any) */
|
||||
if (client_ctx->client_params.client_connect_notify)
|
||||
(void)(*client_ctx->client_params.client_connect_notify)(
|
||||
(uintptr_t)client_ctx,
|
||||
client_ctx->client_params.user_data, true);
|
||||
client_ctx->client_params.user_data, connected);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mgmt_be_client_conn_timeout(struct event *thread)
|
||||
static int mgmt_be_client_notify_conenct(struct msg_client *client)
|
||||
{
|
||||
mgmt_be_server_connect(EVENT_ARG(thread));
|
||||
return _notify_conenct_disconnect(client, true);
|
||||
}
|
||||
|
||||
static void
|
||||
mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx,
|
||||
enum mgmt_be_event event)
|
||||
static int mgmt_be_client_notify_disconenct(struct msg_conn *conn)
|
||||
{
|
||||
struct timeval tv = {0};
|
||||
struct msg_client *client = container_of(conn, struct msg_client, conn);
|
||||
|
||||
switch (event) {
|
||||
case MGMTD_BE_CONN_READ:
|
||||
event_add_read(client_ctx->tm, mgmt_be_client_read,
|
||||
client_ctx, client_ctx->conn_fd,
|
||||
&client_ctx->conn_read_ev);
|
||||
break;
|
||||
case MGMTD_BE_CONN_WRITE:
|
||||
event_add_write(client_ctx->tm, mgmt_be_client_write,
|
||||
client_ctx, client_ctx->conn_fd,
|
||||
&client_ctx->conn_write_ev);
|
||||
break;
|
||||
case MGMTD_BE_PROC_MSG:
|
||||
tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC;
|
||||
event_add_timer_tv(client_ctx->tm, mgmt_be_client_proc_msgbufs,
|
||||
client_ctx, &tv, &client_ctx->msg_proc_ev);
|
||||
break;
|
||||
case MGMTD_BE_SERVER:
|
||||
case MGMTD_BE_CONN_INIT:
|
||||
case MGMTD_BE_SCHED_CFG_PREPARE:
|
||||
case MGMTD_BE_RESCHED_CFG_PREPARE:
|
||||
case MGMTD_BE_SCHED_CFG_APPLY:
|
||||
case MGMTD_BE_RESCHED_CFG_APPLY:
|
||||
assert(!"mgmt_be_client_post_event() called incorrectly");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
mgmt_be_client_schedule_conn_retry(struct mgmt_be_client_ctx *client_ctx,
|
||||
unsigned long intvl_secs)
|
||||
{
|
||||
MGMTD_BE_CLIENT_DBG(
|
||||
"Scheduling MGMTD Backend server connection retry after %lu seconds",
|
||||
intvl_secs);
|
||||
event_add_timer(client_ctx->tm, mgmt_be_client_conn_timeout,
|
||||
(void *)client_ctx, intvl_secs,
|
||||
&client_ctx->conn_retry_tmr);
|
||||
return _notify_conenct_disconnect(client, false);
|
||||
}
|
||||
|
||||
DEFPY(debug_mgmt_client_be, debug_mgmt_client_be_cmd,
|
||||
@ -1082,29 +951,22 @@ static struct cmd_node mgmt_dbg_node = {
|
||||
uintptr_t mgmt_be_client_lib_init(struct mgmt_be_client_params *params,
|
||||
struct event_loop *master_thread)
|
||||
{
|
||||
assert(master_thread && params && strlen(params->name)
|
||||
&& !mgmt_be_client_ctx.tm);
|
||||
/* Don't call twice */
|
||||
assert(!mgmt_be_client_ctx.client.conn.loop);
|
||||
|
||||
mgmt_be_client_ctx.tm = master_thread;
|
||||
/* Only call after frr_init() */
|
||||
assert(running_config);
|
||||
|
||||
if (!running_config)
|
||||
assert(!"MGMTD Be Client lib_init() after frr_init() only!");
|
||||
mgmt_be_client_ctx.running_config = running_config;
|
||||
mgmt_be_client_ctx.candidate_config = nb_config_new(NULL);
|
||||
|
||||
memcpy(&mgmt_be_client_ctx.client_params, params,
|
||||
sizeof(mgmt_be_client_ctx.client_params));
|
||||
if (!mgmt_be_client_ctx.client_params.conn_retry_intvl_sec)
|
||||
mgmt_be_client_ctx.client_params.conn_retry_intvl_sec =
|
||||
MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC;
|
||||
|
||||
mgmt_be_client_ctx.client_params = *params;
|
||||
mgmt_be_txns_init(&mgmt_be_client_ctx.txn_head);
|
||||
mgmt_msg_init(&mgmt_be_client_ctx.mstate, MGMTD_BE_MAX_NUM_MSG_PROC,
|
||||
MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN,
|
||||
"BE-client");
|
||||
|
||||
/* Start trying to connect to MGMTD backend server immediately */
|
||||
mgmt_be_client_schedule_conn_retry(&mgmt_be_client_ctx, 1);
|
||||
msg_client_init(&mgmt_be_client_ctx.client, master_thread,
|
||||
MGMTD_BE_SERVER_PATH, mgmt_be_client_notify_conenct,
|
||||
mgmt_be_client_notify_disconenct,
|
||||
mgmt_be_client_process_msg, MGMTD_BE_MAX_NUM_MSG_PROC,
|
||||
MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN,
|
||||
"BE-client", MGMTD_DBG_BE_CLIENT_CHECK());
|
||||
|
||||
MGMTD_BE_CLIENT_DBG("Initialized client '%s'", params->name);
|
||||
|
||||
@ -1183,24 +1045,16 @@ enum mgmt_result mgmt_be_send_yang_notify(uintptr_t lib_hndl,
|
||||
/*
|
||||
* Destroy library and cleanup everything.
|
||||
*/
|
||||
void mgmt_be_client_lib_destroy(uintptr_t lib_hndl)
|
||||
void mgmt_be_client_lib_destroy(void)
|
||||
{
|
||||
struct mgmt_be_client_ctx *client_ctx;
|
||||
|
||||
client_ctx = (struct mgmt_be_client_ctx *)lib_hndl;
|
||||
assert(client_ctx);
|
||||
struct mgmt_be_client_ctx *client_ctx = &mgmt_be_client_ctx;
|
||||
|
||||
MGMTD_BE_CLIENT_DBG("Destroying MGMTD Backend Client '%s'",
|
||||
client_ctx->client_params.name);
|
||||
|
||||
mgmt_be_server_disconnect(client_ctx, false);
|
||||
|
||||
mgmt_msg_destroy(&client_ctx->mstate);
|
||||
|
||||
EVENT_OFF(client_ctx->conn_retry_tmr);
|
||||
EVENT_OFF(client_ctx->conn_read_ev);
|
||||
EVENT_OFF(client_ctx->conn_write_ev);
|
||||
EVENT_OFF(client_ctx->msg_proc_ev);
|
||||
msg_client_cleanup(&client_ctx->client);
|
||||
mgmt_be_cleanup_all_txns(client_ctx);
|
||||
mgmt_be_txns_fini(&client_ctx->txn_head);
|
||||
|
||||
memset(client_ctx, 0, sizeof(*client_ctx));
|
||||
}
|
||||
|
@ -265,7 +265,7 @@ enum mgmt_result mgmt_be_unsubscribe_yang_data(uintptr_t lib_hndl,
|
||||
/*
|
||||
* Destroy library and cleanup everything.
|
||||
*/
|
||||
extern void mgmt_be_client_lib_destroy(uintptr_t lib_hndl);
|
||||
extern void mgmt_be_client_lib_destroy(void);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
*/
|
||||
|
||||
#include <zebra.h>
|
||||
#include "compiler.h"
|
||||
#include "debug.h"
|
||||
#include "memory.h"
|
||||
#include "libfrr.h"
|
||||
@ -43,17 +44,8 @@ DECLARE_LIST(mgmt_sessions, struct mgmt_fe_client_session, list_linkage);
|
||||
DEFINE_MTYPE_STATIC(LIB, MGMTD_FE_SESSION, "MGMTD Frontend session");
|
||||
|
||||
struct mgmt_fe_client_ctx {
|
||||
int conn_fd;
|
||||
struct event_loop *tm;
|
||||
struct event *conn_retry_tmr;
|
||||
struct event *conn_read_ev;
|
||||
struct event *conn_write_ev;
|
||||
struct event *msg_proc_ev;
|
||||
|
||||
struct mgmt_msg_state mstate;
|
||||
|
||||
struct msg_client client;
|
||||
struct mgmt_fe_client_params client_params;
|
||||
|
||||
struct mgmt_sessions_head client_sessions;
|
||||
};
|
||||
|
||||
@ -63,15 +55,7 @@ struct mgmt_fe_client_ctx {
|
||||
struct debug mgmt_dbg_fe_client = {0, "Management frontend client operations"};
|
||||
|
||||
static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = {
|
||||
.conn_fd = -1,
|
||||
};
|
||||
|
||||
/* Forward declarations */
|
||||
static void
|
||||
mgmt_fe_client_register_event(struct mgmt_fe_client_ctx *client_ctx,
|
||||
enum mgmt_fe_event event);
|
||||
static void mgmt_fe_client_schedule_conn_retry(
|
||||
struct mgmt_fe_client_ctx *client_ctx, unsigned long intvl_secs);
|
||||
.client = {.conn = {.fd = -1}}};
|
||||
|
||||
static struct mgmt_fe_client_session *
|
||||
mgmt_fe_find_session_by_client_id(struct mgmt_fe_client_ctx *client_ctx,
|
||||
@ -109,59 +93,13 @@ mgmt_fe_find_session_by_session_id(struct mgmt_fe_client_ctx *client_ctx,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void
|
||||
mgmt_fe_server_disconnect(struct mgmt_fe_client_ctx *client_ctx,
|
||||
bool reconnect)
|
||||
{
|
||||
if (client_ctx->conn_fd != -1) {
|
||||
close(client_ctx->conn_fd);
|
||||
client_ctx->conn_fd = -1;
|
||||
}
|
||||
|
||||
if (reconnect)
|
||||
mgmt_fe_client_schedule_conn_retry(
|
||||
client_ctx,
|
||||
client_ctx->client_params.conn_retry_intvl_sec);
|
||||
}
|
||||
|
||||
static inline void
|
||||
mgmt_fe_client_sched_msg_write(struct mgmt_fe_client_ctx *client_ctx)
|
||||
{
|
||||
mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_WRITE);
|
||||
}
|
||||
|
||||
static int mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx,
|
||||
Mgmtd__FeMessage *fe_msg)
|
||||
{
|
||||
/* users current expect this to fail here */
|
||||
if (client_ctx->conn_fd == -1) {
|
||||
MGMTD_FE_CLIENT_DBG("can't send message on closed connection");
|
||||
return -1;
|
||||
}
|
||||
|
||||
int rv = mgmt_msg_send_msg(
|
||||
&client_ctx->mstate, MGMT_MSG_VERSION_PROTOBUF, fe_msg,
|
||||
return msg_conn_send_msg(
|
||||
&client_ctx->client.conn, MGMT_MSG_VERSION_PROTOBUF, fe_msg,
|
||||
mgmtd__fe_message__get_packed_size(fe_msg),
|
||||
(size_t(*)(void *, void *))mgmtd__fe_message__pack,
|
||||
MGMTD_DBG_FE_CLIENT_CHECK());
|
||||
mgmt_fe_client_sched_msg_write(client_ctx);
|
||||
return rv;
|
||||
}
|
||||
|
||||
static void mgmt_fe_client_write(struct event *thread)
|
||||
{
|
||||
struct mgmt_fe_client_ctx *client_ctx;
|
||||
enum mgmt_msg_wsched rv;
|
||||
|
||||
client_ctx = (struct mgmt_fe_client_ctx *)EVENT_ARG(thread);
|
||||
rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd,
|
||||
MGMTD_DBG_FE_CLIENT_CHECK());
|
||||
if (rv == MSW_SCHED_STREAM)
|
||||
mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_WRITE);
|
||||
else if (rv == MSW_DISCONNECT)
|
||||
mgmt_fe_server_disconnect(client_ctx, true);
|
||||
else
|
||||
assert(rv == MSW_SCHED_NONE);
|
||||
(size_t(*)(void *, void *))mgmtd__fe_message__pack);
|
||||
}
|
||||
|
||||
static int
|
||||
@ -614,12 +552,16 @@ mgmt_fe_client_handle_msg(struct mgmt_fe_client_ctx *client_ctx,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mgmt_fe_client_process_msg(uint8_t version, void *user_ctx,
|
||||
uint8_t *data, size_t len)
|
||||
static void mgmt_fe_client_process_msg(uint8_t version, uint8_t *data,
|
||||
size_t len, struct msg_conn *conn)
|
||||
{
|
||||
struct mgmt_fe_client_ctx *client_ctx = user_ctx;
|
||||
struct mgmt_fe_client_ctx *client_ctx;
|
||||
struct msg_client *client;
|
||||
Mgmtd__FeMessage *fe_msg;
|
||||
|
||||
client = container_of(conn, struct msg_client, conn);
|
||||
client_ctx = container_of(client, struct mgmt_fe_client_ctx, client);
|
||||
|
||||
fe_msg = mgmtd__fe_message__unpack(NULL, len, data);
|
||||
if (!fe_msg) {
|
||||
MGMTD_FE_CLIENT_DBG("Failed to decode %zu bytes from server.",
|
||||
@ -633,105 +575,38 @@ static void mgmt_fe_client_process_msg(uint8_t version, void *user_ctx,
|
||||
mgmtd__fe_message__free_unpacked(fe_msg, NULL);
|
||||
}
|
||||
|
||||
static void mgmt_fe_client_proc_msgbufs(struct event *thread)
|
||||
static int _notify_connect_disconnect(struct msg_client *client, bool connected)
|
||||
{
|
||||
struct mgmt_fe_client_ctx *client_ctx;
|
||||
|
||||
client_ctx = (struct mgmt_fe_client_ctx *)EVENT_ARG(thread);
|
||||
if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_fe_client_process_msg,
|
||||
client_ctx, MGMTD_DBG_FE_CLIENT_CHECK()))
|
||||
mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG);
|
||||
}
|
||||
|
||||
static void mgmt_fe_client_read(struct event *thread)
|
||||
{
|
||||
struct mgmt_fe_client_ctx *client_ctx;
|
||||
enum mgmt_msg_rsched rv;
|
||||
|
||||
client_ctx = (struct mgmt_fe_client_ctx *)EVENT_ARG(thread);
|
||||
|
||||
rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd,
|
||||
MGMTD_DBG_FE_CLIENT_CHECK());
|
||||
if (rv == MSR_DISCONNECT) {
|
||||
mgmt_fe_server_disconnect(client_ctx, true);
|
||||
return;
|
||||
}
|
||||
if (rv == MSR_SCHED_BOTH)
|
||||
mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG);
|
||||
mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ);
|
||||
}
|
||||
|
||||
static void mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx)
|
||||
{
|
||||
const char *dbgtag = MGMTD_DBG_FE_CLIENT_CHECK() ? "FE-client" : NULL;
|
||||
|
||||
assert(client_ctx->conn_fd == -1);
|
||||
client_ctx->conn_fd = mgmt_msg_connect(
|
||||
MGMTD_FE_SERVER_PATH, MGMTD_SOCKET_FE_SEND_BUF_SIZE,
|
||||
MGMTD_SOCKET_FE_RECV_BUF_SIZE, dbgtag);
|
||||
struct mgmt_fe_client_ctx *client_ctx =
|
||||
container_of(client, struct mgmt_fe_client_ctx, client);
|
||||
int ret;
|
||||
|
||||
/* Send REGISTER_REQ message */
|
||||
if (client_ctx->conn_fd == -1 ||
|
||||
mgmt_fe_send_register_req(client_ctx) != 0) {
|
||||
mgmt_fe_server_disconnect(client_ctx, true);
|
||||
return;
|
||||
if (connected) {
|
||||
if ((ret = mgmt_fe_send_register_req(client_ctx)) != 0)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Start reading from the socket */
|
||||
mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ);
|
||||
|
||||
/* Notify client through registered callback (if any) */
|
||||
/* Notify FE client through registered callback (if any). */
|
||||
if (client_ctx->client_params.client_connect_notify)
|
||||
(void)(*client_ctx->client_params.client_connect_notify)(
|
||||
(uintptr_t)client_ctx,
|
||||
client_ctx->client_params.user_data, true);
|
||||
client_ctx->client_params.user_data, connected);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static void mgmt_fe_client_conn_timeout(struct event *thread)
|
||||
static int mgmt_fe_client_notify_connect(struct msg_client *client)
|
||||
{
|
||||
mgmt_fe_server_connect(EVENT_ARG(thread));
|
||||
return _notify_connect_disconnect(client, true);
|
||||
}
|
||||
|
||||
static void
|
||||
mgmt_fe_client_register_event(struct mgmt_fe_client_ctx *client_ctx,
|
||||
enum mgmt_fe_event event)
|
||||
static int mgmt_fe_client_notify_disconnect(struct msg_conn *conn)
|
||||
{
|
||||
struct timeval tv = {0};
|
||||
struct msg_client *client = container_of(conn, struct msg_client, conn);
|
||||
|
||||
switch (event) {
|
||||
case MGMTD_FE_CONN_READ:
|
||||
event_add_read(client_ctx->tm, mgmt_fe_client_read,
|
||||
client_ctx, client_ctx->conn_fd,
|
||||
&client_ctx->conn_read_ev);
|
||||
break;
|
||||
case MGMTD_FE_CONN_WRITE:
|
||||
event_add_write(client_ctx->tm, mgmt_fe_client_write,
|
||||
client_ctx, client_ctx->conn_fd,
|
||||
&client_ctx->conn_write_ev);
|
||||
break;
|
||||
case MGMTD_FE_PROC_MSG:
|
||||
tv.tv_usec = MGMTD_FE_MSG_PROC_DELAY_USEC;
|
||||
event_add_timer_tv(client_ctx->tm,
|
||||
mgmt_fe_client_proc_msgbufs, client_ctx,
|
||||
&tv, &client_ctx->msg_proc_ev);
|
||||
break;
|
||||
case MGMTD_FE_SERVER:
|
||||
assert(!"mgmt_fe_client_ctx_post_event called incorrectly");
|
||||
break;
|
||||
}
|
||||
return _notify_connect_disconnect(client, false);
|
||||
}
|
||||
|
||||
static void mgmt_fe_client_schedule_conn_retry(
|
||||
struct mgmt_fe_client_ctx *client_ctx, unsigned long intvl_secs)
|
||||
{
|
||||
MGMTD_FE_CLIENT_DBG(
|
||||
"Scheduling MGMTD Frontend server connection retry after %lu seconds",
|
||||
intvl_secs);
|
||||
event_add_timer(client_ctx->tm, mgmt_fe_client_conn_timeout,
|
||||
(void *)client_ctx, intvl_secs,
|
||||
&client_ctx->conn_retry_tmr);
|
||||
}
|
||||
|
||||
DEFPY(debug_mgmt_client_fe, debug_mgmt_client_fe_cmd,
|
||||
"[no] debug mgmt client frontend",
|
||||
@ -781,24 +656,19 @@ static struct cmd_node mgmt_dbg_node = {
|
||||
uintptr_t mgmt_fe_client_lib_init(struct mgmt_fe_client_params *params,
|
||||
struct event_loop *master_thread)
|
||||
{
|
||||
assert(master_thread && params && strlen(params->name)
|
||||
&& !mgmt_fe_client_ctx.tm);
|
||||
/* Don't call twice */
|
||||
assert(!mgmt_fe_client_ctx.client.conn.loop);
|
||||
|
||||
mgmt_fe_client_ctx.tm = master_thread;
|
||||
memcpy(&mgmt_fe_client_ctx.client_params, params,
|
||||
sizeof(mgmt_fe_client_ctx.client_params));
|
||||
if (!mgmt_fe_client_ctx.client_params.conn_retry_intvl_sec)
|
||||
mgmt_fe_client_ctx.client_params.conn_retry_intvl_sec =
|
||||
MGMTD_FE_DEFAULT_CONN_RETRY_INTVL_SEC;
|
||||
|
||||
mgmt_msg_init(&mgmt_fe_client_ctx.mstate, MGMTD_FE_MAX_NUM_MSG_PROC,
|
||||
MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN,
|
||||
"FE-client");
|
||||
mgmt_fe_client_ctx.client_params = *params;
|
||||
|
||||
mgmt_sessions_init(&mgmt_fe_client_ctx.client_sessions);
|
||||
|
||||
/* Start trying to connect to MGMTD frontend server immediately */
|
||||
mgmt_fe_client_schedule_conn_retry(&mgmt_fe_client_ctx, 1);
|
||||
msg_client_init(&mgmt_fe_client_ctx.client, master_thread,
|
||||
MGMTD_FE_SERVER_PATH, mgmt_fe_client_notify_connect,
|
||||
mgmt_fe_client_notify_disconnect,
|
||||
mgmt_fe_client_process_msg, MGMTD_FE_MAX_NUM_MSG_PROC,
|
||||
MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN,
|
||||
"FE-client", MGMTD_DBG_FE_CLIENT_CHECK());
|
||||
|
||||
MGMTD_FE_CLIENT_DBG("Initialized client '%s'", params->name);
|
||||
|
||||
@ -1056,23 +926,14 @@ mgmt_fe_register_yang_notify(uintptr_t lib_hndl, uintptr_t session_id,
|
||||
/*
|
||||
* Destroy library and cleanup everything.
|
||||
*/
|
||||
void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl)
|
||||
void mgmt_fe_client_lib_destroy(void)
|
||||
{
|
||||
struct mgmt_fe_client_ctx *client_ctx;
|
||||
|
||||
client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl;
|
||||
assert(client_ctx);
|
||||
struct mgmt_fe_client_ctx *client_ctx = &mgmt_fe_client_ctx;
|
||||
|
||||
MGMTD_FE_CLIENT_DBG("Destroying MGMTD Frontend Client '%s'",
|
||||
client_ctx->client_params.name);
|
||||
|
||||
mgmt_fe_server_disconnect(client_ctx, false);
|
||||
|
||||
mgmt_fe_destroy_client_sessions(lib_hndl);
|
||||
|
||||
EVENT_OFF(client_ctx->conn_retry_tmr);
|
||||
EVENT_OFF(client_ctx->conn_read_ev);
|
||||
EVENT_OFF(client_ctx->conn_write_ev);
|
||||
EVENT_OFF(client_ctx->msg_proc_ev);
|
||||
mgmt_msg_destroy(&client_ctx->mstate);
|
||||
mgmt_fe_destroy_client_sessions((uintptr_t)client_ctx);
|
||||
msg_client_cleanup(&client_ctx->client);
|
||||
memset(client_ctx, 0, sizeof(*client_ctx));
|
||||
}
|
||||
|
@ -351,7 +351,7 @@ mgmt_fe_register_yang_notify(uintptr_t lib_hndl, uintptr_t session_id,
|
||||
/*
|
||||
* Destroy library and cleanup everything.
|
||||
*/
|
||||
extern void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl);
|
||||
extern void mgmt_fe_client_lib_destroy(void);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
257
lib/mgmt_msg.c
257
lib/mgmt_msg.c
@ -22,7 +22,7 @@
|
||||
} while (0)
|
||||
|
||||
#define MGMT_MSG_ERR(ms, fmt, ...) \
|
||||
zlog_err("%s: %s: " fmt, ms->idtag, __func__, ##__VA_ARGS__)
|
||||
zlog_err("%s: %s: " fmt, (ms)->idtag, __func__, ##__VA_ARGS__)
|
||||
|
||||
/**
|
||||
* Read data from a socket into streams containing 1 or more full msgs headed by
|
||||
@ -127,8 +127,8 @@ enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
|
||||
* true if more to process (so reschedule) else false
|
||||
*/
|
||||
bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
|
||||
void (*handle_msg)(uint8_t version, void *user,
|
||||
uint8_t *msg, size_t msglen),
|
||||
void (*handle_msg)(uint8_t version, uint8_t *msg,
|
||||
size_t msglen, void *user),
|
||||
void *user, bool debug)
|
||||
{
|
||||
const char *dbgtag = debug ? ms->idtag : NULL;
|
||||
@ -156,9 +156,10 @@ bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
|
||||
assert(MGMT_MSG_IS_MARKER(mhdr->marker));
|
||||
assert(left >= mhdr->len);
|
||||
|
||||
handle_msg(MGMT_MSG_MARKER_VERSION(mhdr->marker), user,
|
||||
handle_msg(MGMT_MSG_MARKER_VERSION(mhdr->marker),
|
||||
(uint8_t *)(mhdr + 1),
|
||||
mhdr->len - sizeof(struct mgmt_msg_hdr));
|
||||
mhdr->len - sizeof(struct mgmt_msg_hdr),
|
||||
user);
|
||||
ms->nrxm++;
|
||||
nproc++;
|
||||
}
|
||||
@ -402,6 +403,7 @@ size_t mgmt_msg_reset_writes(struct mgmt_msg_state *ms)
|
||||
return nproc;
|
||||
}
|
||||
|
||||
|
||||
void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf,
|
||||
size_t max_write_buf, size_t max_msg_sz, const char *idtag)
|
||||
{
|
||||
@ -422,3 +424,248 @@ void mgmt_msg_destroy(struct mgmt_msg_state *ms)
|
||||
stream_free(ms->ins);
|
||||
free(ms->idtag);
|
||||
}
|
||||
|
||||
/*
|
||||
* Connections
|
||||
*/
|
||||
|
||||
#define MSG_CONN_DEFAULT_CONN_RETRY_MSEC 250
|
||||
#define MSG_CONN_SEND_BUF_SIZE (1u << 16)
|
||||
#define MSG_CONN_RECV_BUF_SIZE (1u << 16)
|
||||
|
||||
static void msg_client_sched_connect(struct msg_client *client,
|
||||
unsigned long msec);
|
||||
|
||||
static void msg_conn_sched_proc_msgs(struct msg_conn *conn);
|
||||
static void msg_conn_sched_read(struct msg_conn *conn);
|
||||
static void msg_conn_sched_write(struct msg_conn *conn);
|
||||
|
||||
static void msg_conn_write(struct event *thread)
|
||||
{
|
||||
struct msg_conn *conn = EVENT_ARG(thread);
|
||||
enum mgmt_msg_wsched rv;
|
||||
|
||||
rv = mgmt_msg_write(&conn->mstate, conn->fd, conn->debug);
|
||||
if (rv == MSW_SCHED_STREAM)
|
||||
msg_conn_sched_write(conn);
|
||||
else if (rv == MSW_DISCONNECT)
|
||||
msg_conn_disconnect(conn, conn->is_client);
|
||||
else
|
||||
assert(rv == MSW_SCHED_NONE);
|
||||
}
|
||||
|
||||
static void msg_conn_read(struct event *thread)
|
||||
{
|
||||
struct msg_conn *conn = EVENT_ARG(thread);
|
||||
enum mgmt_msg_rsched rv;
|
||||
|
||||
rv = mgmt_msg_read(&conn->mstate, conn->fd, conn->debug);
|
||||
if (rv == MSR_DISCONNECT) {
|
||||
msg_conn_disconnect(conn, conn->is_client);
|
||||
return;
|
||||
}
|
||||
if (rv == MSR_SCHED_BOTH)
|
||||
msg_conn_sched_proc_msgs(conn);
|
||||
msg_conn_sched_read(conn);
|
||||
}
|
||||
|
||||
/* collapse this into mgmt_msg_procbufs */
|
||||
static void msg_conn_proc_msgs(struct event *thread)
|
||||
{
|
||||
struct msg_conn *conn = EVENT_ARG(thread);
|
||||
|
||||
if (mgmt_msg_procbufs(&conn->mstate,
|
||||
(void (*)(uint8_t, uint8_t *, size_t,
|
||||
void *))conn->handle_msg,
|
||||
conn, conn->debug))
|
||||
/* there's more, schedule handling more */
|
||||
msg_conn_sched_proc_msgs(conn);
|
||||
}
|
||||
|
||||
static void msg_conn_sched_read(struct msg_conn *conn)
|
||||
{
|
||||
event_add_read(conn->loop, msg_conn_read, conn, conn->fd,
|
||||
&conn->read_ev);
|
||||
}
|
||||
|
||||
static void msg_conn_sched_write(struct msg_conn *conn)
|
||||
{
|
||||
event_add_write(conn->loop, msg_conn_write, conn, conn->fd,
|
||||
&conn->write_ev);
|
||||
}
|
||||
|
||||
static void msg_conn_sched_proc_msgs(struct msg_conn *conn)
|
||||
{
|
||||
event_add_event(conn->loop, msg_conn_proc_msgs, conn, 0,
|
||||
&conn->proc_msg_ev);
|
||||
}
|
||||
|
||||
|
||||
void msg_conn_disconnect(struct msg_conn *conn, bool reconnect)
|
||||
{
|
||||
|
||||
if (conn->fd != -1) {
|
||||
close(conn->fd);
|
||||
conn->fd = -1;
|
||||
|
||||
/* Notify client through registered callback (if any) */
|
||||
if (conn->notify_disconnect)
|
||||
(void)(*conn->notify_disconnect)(conn);
|
||||
}
|
||||
|
||||
if (reconnect) {
|
||||
assert(conn->is_client);
|
||||
msg_client_sched_connect(
|
||||
container_of(conn, struct msg_client, conn),
|
||||
MSG_CONN_DEFAULT_CONN_RETRY_MSEC);
|
||||
}
|
||||
}
|
||||
|
||||
int msg_conn_send_msg(struct msg_conn *conn, uint8_t version, void *msg,
|
||||
size_t mlen, size_t (*packf)(void *, void *))
|
||||
{
|
||||
if (conn->fd == -1) {
|
||||
MGMT_MSG_ERR(&conn->mstate,
|
||||
"can't send message on closed connection");
|
||||
return -1;
|
||||
}
|
||||
|
||||
int rv = mgmt_msg_send_msg(&conn->mstate, version, msg, mlen, packf,
|
||||
conn->debug);
|
||||
|
||||
msg_conn_sched_write(conn);
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
void msg_conn_cleanup(struct msg_conn *conn)
|
||||
{
|
||||
struct mgmt_msg_state *ms = &conn->mstate;
|
||||
|
||||
if (conn->fd != -1) {
|
||||
close(conn->fd);
|
||||
conn->fd = -1;
|
||||
}
|
||||
|
||||
EVENT_OFF(conn->read_ev);
|
||||
EVENT_OFF(conn->write_ev);
|
||||
EVENT_OFF(conn->proc_msg_ev);
|
||||
|
||||
mgmt_msg_destroy(ms);
|
||||
}
|
||||
|
||||
/*
|
||||
* Client Connections
|
||||
*/
|
||||
|
||||
static void msg_client_connect(struct msg_client *conn);
|
||||
|
||||
static void msg_client_connect_timer(struct event *thread)
|
||||
{
|
||||
msg_client_connect(EVENT_ARG(thread));
|
||||
}
|
||||
|
||||
static void msg_client_sched_connect(struct msg_client *client,
|
||||
unsigned long msec)
|
||||
{
|
||||
struct msg_conn *conn = &client->conn;
|
||||
const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
|
||||
|
||||
MGMT_MSG_DBG(dbgtag, "connection retry in %lu msec", msec);
|
||||
if (msec)
|
||||
event_add_timer_msec(conn->loop, msg_client_connect_timer,
|
||||
client, msec, &client->conn_retry_tmr);
|
||||
else
|
||||
event_add_event(conn->loop, msg_client_connect_timer, client, 0,
|
||||
&client->conn_retry_tmr);
|
||||
}
|
||||
|
||||
|
||||
/* Connect and start reading from the socket */
|
||||
static void msg_client_connect(struct msg_client *client)
|
||||
{
|
||||
struct msg_conn *conn = &client->conn;
|
||||
const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
|
||||
|
||||
conn->fd = mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE,
|
||||
MSG_CONN_RECV_BUF_SIZE, dbgtag);
|
||||
|
||||
if (conn->fd == -1)
|
||||
/* retry the connection */
|
||||
msg_client_sched_connect(client,
|
||||
MSG_CONN_DEFAULT_CONN_RETRY_MSEC);
|
||||
else if (client->notify_connect && client->notify_connect(client))
|
||||
/* client connect notify failed */
|
||||
msg_conn_disconnect(conn, true);
|
||||
else
|
||||
/* start reading */
|
||||
msg_conn_sched_read(conn);
|
||||
}
|
||||
|
||||
void msg_client_init(struct msg_client *client, struct event_loop *tm,
|
||||
const char *sopath,
|
||||
int (*notify_connect)(struct msg_client *client),
|
||||
int (*notify_disconnect)(struct msg_conn *client),
|
||||
void (*handle_msg)(uint8_t version, uint8_t *data,
|
||||
size_t len, struct msg_conn *client),
|
||||
size_t max_read_buf, size_t max_write_buf,
|
||||
size_t max_msg_sz, const char *idtag, bool debug)
|
||||
{
|
||||
struct msg_conn *conn = &client->conn;
|
||||
memset(client, 0, sizeof(*client));
|
||||
|
||||
conn->loop = tm;
|
||||
conn->fd = -1;
|
||||
conn->handle_msg = handle_msg;
|
||||
conn->notify_disconnect = notify_disconnect;
|
||||
conn->is_client = true;
|
||||
conn->debug = debug;
|
||||
client->sopath = strdup(sopath);
|
||||
client->notify_connect = notify_connect;
|
||||
|
||||
mgmt_msg_init(&conn->mstate, max_read_buf, max_write_buf, max_msg_sz,
|
||||
idtag);
|
||||
|
||||
/* XXX maybe just have client kick this off */
|
||||
/* Start trying to connect to server */
|
||||
msg_client_sched_connect(client, 0);
|
||||
}
|
||||
|
||||
void msg_client_cleanup(struct msg_client *client)
|
||||
{
|
||||
assert(client->conn.is_client);
|
||||
|
||||
EVENT_OFF(client->conn_retry_tmr);
|
||||
free(client->sopath);
|
||||
|
||||
msg_conn_cleanup(&client->conn);
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize and start reading from the accepted socket
|
||||
*
|
||||
* notify_connect - only called for disconnect i.e., connected == false
|
||||
*/
|
||||
void mgmt_msg_server_accept_init(
|
||||
struct msg_conn *conn, struct event_loop *tm, int fd,
|
||||
int (*notify_disconnect)(struct msg_conn *conn),
|
||||
void (*handle_msg)(uint8_t version, uint8_t *data, size_t len,
|
||||
struct msg_conn *conn),
|
||||
size_t max_read, size_t max_write, size_t max_size, const char *idtag)
|
||||
{
|
||||
conn->loop = tm;
|
||||
conn->fd = fd;
|
||||
conn->notify_disconnect = notify_disconnect;
|
||||
conn->handle_msg = handle_msg;
|
||||
conn->is_client = false;
|
||||
|
||||
mgmt_msg_init(&conn->mstate, max_read, max_write, max_size, idtag);
|
||||
|
||||
/* start reading */
|
||||
msg_conn_sched_read(conn);
|
||||
|
||||
/* Make socket non-blocking. */
|
||||
set_nonblocking(conn->fd);
|
||||
setsockopt_so_sendbuf(conn->fd, MSG_CONN_SEND_BUF_SIZE);
|
||||
setsockopt_so_recvbuf(conn->fd, MSG_CONN_RECV_BUF_SIZE);
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
#define MGMT_MSG_VERSION_PROTOBUF 0
|
||||
#define MGMT_MSG_VERSION_NATIVE 1
|
||||
|
||||
|
||||
struct mgmt_msg_state {
|
||||
struct stream *ins;
|
||||
struct stream *outs;
|
||||
@ -53,15 +54,14 @@ enum mgmt_msg_wsched {
|
||||
MSW_DISCONNECT, /* disconnect and start reconnecting */
|
||||
};
|
||||
|
||||
struct msg_conn;
|
||||
|
||||
|
||||
extern int mgmt_msg_connect(const char *path, size_t sendbuf, size_t recvbuf,
|
||||
const char *dbgtag);
|
||||
extern void mgmt_msg_destroy(struct mgmt_msg_state *ms);
|
||||
extern void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf,
|
||||
size_t max_write_buf, size_t max_msg_sz,
|
||||
const char *idtag);
|
||||
extern bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
|
||||
void (*handle_msg)(uint8_t version, void *user,
|
||||
uint8_t *msg, size_t msglen),
|
||||
void (*handle_msg)(uint8_t version, uint8_t *msg,
|
||||
size_t msglen, void *user),
|
||||
void *user, bool debug);
|
||||
extern enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
|
||||
bool debug);
|
||||
@ -72,4 +72,84 @@ extern int mgmt_msg_send_msg(struct mgmt_msg_state *ms, uint8_t version,
|
||||
extern enum mgmt_msg_wsched mgmt_msg_write(struct mgmt_msg_state *ms, int fd,
|
||||
bool debug);
|
||||
|
||||
extern void mgmt_msg_destroy(struct mgmt_msg_state *state);
|
||||
|
||||
extern void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf,
|
||||
size_t max_write_buf, size_t max_msg_sz,
|
||||
const char *idtag);
|
||||
|
||||
/*
|
||||
* Connections
|
||||
*/
|
||||
|
||||
struct msg_conn {
|
||||
int fd;
|
||||
struct mgmt_msg_state mstate;
|
||||
struct event_loop *loop;
|
||||
struct event *read_ev;
|
||||
struct event *write_ev;
|
||||
struct event *proc_msg_ev;
|
||||
int (*notify_disconnect)(struct msg_conn *conn);
|
||||
void (*handle_msg)(uint8_t version, uint8_t *data, size_t len,
|
||||
struct msg_conn *conn);
|
||||
bool is_client;
|
||||
bool debug;
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* `notify_disconnect` is not called when `msg_conn_cleanup` is called for a
|
||||
* msg_conn which is currently connected. The socket is closed but there is no
|
||||
* notification.
|
||||
*/
|
||||
extern void msg_conn_cleanup(struct msg_conn *conn);
|
||||
extern void msg_conn_disconnect(struct msg_conn *conn, bool reconnect);
|
||||
extern int msg_conn_send_msg(struct msg_conn *client, uint8_t version,
|
||||
void *msg, size_t mlen,
|
||||
size_t (*packf)(void *, void *));
|
||||
|
||||
/*
|
||||
* Client-side Connections
|
||||
*/
|
||||
|
||||
struct msg_client {
|
||||
struct msg_conn conn;
|
||||
struct event *conn_retry_tmr;
|
||||
char *sopath;
|
||||
int (*notify_connect)(struct msg_client *client);
|
||||
};
|
||||
|
||||
/*
|
||||
* `notify_disconnect` is not called when `msg_client_cleanup` is called for a
|
||||
* msg_client which is currently connected. The socket is closed but there is no
|
||||
* notification.
|
||||
*/
|
||||
extern void msg_client_cleanup(struct msg_client *client);
|
||||
|
||||
/*
|
||||
* `notify_disconnect` is not called when the user `msg_client_cleanup` is
|
||||
* called for a client which is currently connected. The socket is closed
|
||||
* but there is no notification.
|
||||
*/
|
||||
extern void msg_client_init(struct msg_client *client, struct event_loop *tm,
|
||||
const char *sopath,
|
||||
int (*notify_connect)(struct msg_client *client),
|
||||
int (*notify_disconnect)(struct msg_conn *client),
|
||||
void (*handle_msg)(uint8_t version, uint8_t *data,
|
||||
size_t len,
|
||||
struct msg_conn *client),
|
||||
size_t max_read_buf, size_t max_write_buf,
|
||||
size_t max_msg_sz, const char *idtag, bool debug);
|
||||
|
||||
/*
|
||||
* Server-side Connections
|
||||
*/
|
||||
|
||||
extern void mgmt_msg_server_accept_init(
|
||||
struct msg_conn *client, struct event_loop *tm, int fd,
|
||||
int (*notify_disconnect)(struct msg_conn *conn),
|
||||
void (*handle_msg)(uint8_t version, uint8_t *data, size_t len,
|
||||
struct msg_conn *conn),
|
||||
size_t max_read, size_t max_write, size_t max_size, const char *idtag);
|
||||
|
||||
#endif /* _MGMT_MSG_H */
|
||||
|
@ -3703,7 +3703,7 @@ void vty_terminate(void)
|
||||
struct vty_serv *vtyserv;
|
||||
|
||||
if (mgmt_lib_hndl) {
|
||||
mgmt_fe_client_lib_destroy(mgmt_lib_hndl);
|
||||
mgmt_fe_client_lib_destroy();
|
||||
mgmt_lib_hndl = 0;
|
||||
}
|
||||
|
||||
|
@ -569,8 +569,8 @@ static int mgmt_be_send_cfgapply_req(struct mgmt_be_client_adapter *adapter,
|
||||
return mgmt_be_adapter_send_msg(adapter, &be_msg);
|
||||
}
|
||||
|
||||
static void mgmt_be_adapter_process_msg(uint8_t version, void *user_ctx,
|
||||
uint8_t *data, size_t len)
|
||||
static void mgmt_be_adapter_process_msg(uint8_t version, uint8_t *data,
|
||||
size_t len, void *user_ctx)
|
||||
{
|
||||
struct mgmt_be_client_adapter *adapter = user_ctx;
|
||||
Mgmtd__BeMessage *be_msg;
|
||||
|
@ -1387,8 +1387,8 @@ mgmt_fe_adapter_handle_msg(struct mgmt_fe_client_adapter *adapter,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mgmt_fe_adapter_process_msg(uint8_t version, void *user_ctx,
|
||||
uint8_t *data, size_t len)
|
||||
static void mgmt_fe_adapter_process_msg(uint8_t version, uint8_t *data,
|
||||
size_t len, void *user_ctx)
|
||||
{
|
||||
struct mgmt_fe_client_adapter *adapter = user_ctx;
|
||||
Mgmtd__FeMessage *fe_msg;
|
||||
|
@ -71,7 +71,7 @@ static void sigint(void)
|
||||
/* Disable BFD events to avoid wasting processing. */
|
||||
bfd_protocol_integration_set_shutdown(true);
|
||||
|
||||
mgmt_be_client_lib_destroy(mgmt_lib_hndl);
|
||||
mgmt_be_client_lib_destroy();
|
||||
|
||||
static_vrf_terminate();
|
||||
|
||||
@ -208,7 +208,6 @@ int main(int argc, char **argv, char **envp)
|
||||
|
||||
/* Initialize MGMT backend functionalities */
|
||||
mgmt_lib_hndl = mgmt_be_client_lib_init(&mgmt_params, master);
|
||||
assert(mgmt_lib_hndl);
|
||||
|
||||
hook_register(routing_conf_event,
|
||||
routing_control_plane_protocols_name_validate);
|
||||
|
Loading…
Reference in New Issue
Block a user