lib: adapt plugin to use new Sysrepo version

Sysrepo recently underwent a complete rewrite, where some substantial
architectural changes were made (the most important one being the
extinction of the sysrepod daemon). While most of the existing API
was preserved, quite a few backward-incompatible changes [1] were
introduced (mostly simplifications). This commit adapts our sysrepo
northbound plugin to those API changes in order for it to be compatible
with the latest Sysrepo version.

Additional notes:
* The old Sysrepo version is EOL and not supported anymore.
* The new Sysrepo version requires libyang 1.x.

Closes #6936

[1] https://github.com/sysrepo/sysrepo/blob/devel/CHANGES

Signed-off-by: Renato Westphal <renato@opensourcerouting.org>
This commit is contained in:
Renato Westphal 2020-08-19 20:33:40 -03:00
parent be8d09f125
commit 24ed137c20
2 changed files with 76 additions and 239 deletions

View File

@ -37,13 +37,11 @@ DEFINE_MTYPE_STATIC(LIB, SYSREPO, "Sysrepo module")
static struct debug nb_dbg_client_sysrepo = {0, "Northbound client: Sysrepo"}; static struct debug nb_dbg_client_sysrepo = {0, "Northbound client: Sysrepo"};
static struct thread_master *master; static struct thread_master *master;
static struct list *sysrepo_threads;
static sr_session_ctx_t *session; static sr_session_ctx_t *session;
static sr_conn_ctx_t *connection; static sr_conn_ctx_t *connection;
static struct nb_transaction *transaction; static struct nb_transaction *transaction;
static int frr_sr_read_cb(struct thread *thread); static int frr_sr_read_cb(struct thread *thread);
static int frr_sr_write_cb(struct thread *thread);
static int frr_sr_finish(void); static int frr_sr_finish(void);
/* Convert FRR YANG data value to sysrepo YANG data value. */ /* Convert FRR YANG data value to sysrepo YANG data value. */
@ -236,25 +234,23 @@ static int frr_sr_process_change(struct nb_config *candidate,
return NB_OK; return NB_OK;
} }
static int frr_sr_config_change_cb_verify(sr_session_ctx_t *session, static int frr_sr_config_change_cb_prepare(sr_session_ctx_t *session,
const char *module_name, const char *module_name,
bool startup_config) bool startup_config)
{ {
sr_change_iter_t *it; sr_change_iter_t *it;
int ret; int ret;
sr_change_oper_t sr_op; sr_change_oper_t sr_op;
sr_val_t *sr_old_val, *sr_new_val; sr_val_t *sr_old_val, *sr_new_val;
char xpath[XPATH_MAXLEN];
struct nb_context context = {}; struct nb_context context = {};
struct nb_config *candidate; struct nb_config *candidate;
char errmsg[BUFSIZ] = {0}; char errmsg[BUFSIZ] = {0};
snprintf(xpath, sizeof(xpath), "/%s:*", module_name); ret = sr_get_changes_iter(session, "//*", &it);
ret = sr_get_changes_iter(session, xpath, &it);
if (ret != SR_ERR_OK) { if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO, flog_err(EC_LIB_LIBSYSREPO,
"%s: sr_get_changes_iter() failed for xpath %s", "%s: sr_get_changes_iter() failed for \"%s\"",
__func__, xpath); __func__, module_name);
return ret; return ret;
} }
@ -307,12 +303,14 @@ static int frr_sr_config_change_cb_verify(sr_session_ctx_t *session,
__func__, nb_err_name(ret), errmsg); __func__, nb_err_name(ret), errmsg);
} }
if (!transaction)
nb_config_free(candidate);
/* Map northbound return code to sysrepo return code. */ /* Map northbound return code to sysrepo return code. */
switch (ret) { switch (ret) {
case NB_OK: case NB_OK:
return SR_ERR_OK; return SR_ERR_OK;
case NB_ERR_NO_CHANGES: case NB_ERR_NO_CHANGES:
nb_config_free(candidate);
return SR_ERR_OK; return SR_ERR_OK;
case NB_ERR_LOCKED: case NB_ERR_LOCKED:
return SR_ERR_LOCKED; return SR_ERR_LOCKED;
@ -356,22 +354,23 @@ static int frr_sr_config_change_cb_abort(sr_session_ctx_t *session,
/* Callback for changes in the running configuration. */ /* Callback for changes in the running configuration. */
static int frr_sr_config_change_cb(sr_session_ctx_t *session, static int frr_sr_config_change_cb(sr_session_ctx_t *session,
const char *module_name, const char *module_name, const char *xpath,
sr_notif_event_t sr_ev, void *private_ctx) sr_event_t sr_ev, uint32_t request_id,
void *private_data)
{ {
switch (sr_ev) { switch (sr_ev) {
case SR_EV_ENABLED: case SR_EV_ENABLED:
return frr_sr_config_change_cb_verify(session, module_name, return frr_sr_config_change_cb_prepare(session, module_name,
true); true);
case SR_EV_VERIFY: case SR_EV_CHANGE:
return frr_sr_config_change_cb_verify(session, module_name, return frr_sr_config_change_cb_prepare(session, module_name,
false); false);
case SR_EV_APPLY: case SR_EV_DONE:
return frr_sr_config_change_cb_apply(session, module_name); return frr_sr_config_change_cb_apply(session, module_name);
case SR_EV_ABORT: case SR_EV_ABORT:
return frr_sr_config_change_cb_abort(session, module_name); return frr_sr_config_change_cb_abort(session, module_name);
default: default:
flog_err(EC_LIB_LIBSYSREPO, "%s: unknown sysrepo event: %u", flog_err(EC_LIB_LIBSYSREPO, "%s: unexpected sysrepo event: %u",
__func__, sr_ev); __func__, sr_ev);
return SR_ERR_INTERNAL; return SR_ERR_INTERNAL;
} }
@ -381,70 +380,49 @@ static int frr_sr_state_data_iter_cb(const struct lys_node *snode,
struct yang_translator *translator, struct yang_translator *translator,
struct yang_data *data, void *arg) struct yang_data *data, void *arg)
{ {
struct list *elements = arg; struct lyd_node *dnode = arg;
listnode_add(elements, data); ly_errno = 0;
dnode = lyd_new_path(dnode, ly_native_ctx, data->xpath, data->value, 0,
LYD_PATH_OPT_UPDATE);
if (!dnode && ly_errno) {
flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed",
__func__);
yang_data_free(data);
return NB_ERR;
}
yang_data_free(data);
return NB_OK; return NB_OK;
} }
/* Callback for state retrieval. */ /* Callback for state retrieval. */
static int frr_sr_state_cb(const char *xpath, sr_val_t **values, static int frr_sr_state_cb(sr_session_ctx_t *session, const char *module_name,
size_t *values_cnt, uint64_t request_id, const char *xpath, const char *request_xpath,
const char *original_xpath, void *private_ctx) uint32_t request_id, struct lyd_node **parent,
void *private_ctx)
{ {
struct list *elements; struct lyd_node *dnode;
struct yang_data *data;
struct listnode *node;
sr_val_t *v;
int ret, count, i = 0;
elements = yang_data_list_new(); dnode = *parent;
if (nb_oper_data_iterate(xpath, NULL, NB_OPER_DATA_ITER_NORECURSE, if (nb_oper_data_iterate(request_xpath, NULL, 0,
frr_sr_state_data_iter_cb, elements) frr_sr_state_data_iter_cb, dnode)
!= NB_OK) { != NB_OK) {
flog_warn(EC_LIB_NB_OPERATIONAL_DATA, flog_warn(EC_LIB_NB_OPERATIONAL_DATA,
"%s: failed to obtain operational data [xpath %s]", "%s: failed to obtain operational data [xpath %s]",
__func__, xpath); __func__, xpath);
goto exit; return SR_ERR_INTERNAL;
} }
if (list_isempty(elements)) *parent = dnode;
goto exit;
count = listcount(elements);
ret = sr_new_values(count, &v);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO, "%s: sr_new_values(): %s", __func__,
sr_strerror(ret));
goto exit;
}
for (ALL_LIST_ELEMENTS_RO(elements, node, data)) {
if (yang_data_frr2sr(data, &v[i++]) != 0) {
flog_err(EC_LIB_SYSREPO_DATA_CONVERT,
"%s: failed to convert data to sysrepo format",
__func__);
}
}
*values = v;
*values_cnt = count;
list_delete(&elements);
return SR_ERR_OK;
exit:
list_delete(&elements);
*values = NULL;
*values_cnt = 0;
return SR_ERR_OK; return SR_ERR_OK;
} }
static int frr_sr_config_rpc_cb(const char *xpath, const sr_val_t *sr_input, static int frr_sr_config_rpc_cb(sr_session_ctx_t *session, const char *xpath,
const size_t input_cnt, sr_val_t **sr_output, const sr_val_t *sr_input,
const size_t input_cnt, sr_event_t sr_ev,
uint32_t request_id, sr_val_t **sr_output,
size_t *sr_output_cnt, void *private_ctx) size_t *sr_output_cnt, void *private_ctx)
{ {
struct nb_node *nb_node; struct nb_node *nb_node;
@ -551,8 +529,7 @@ static int frr_sr_notification_send(const char *xpath, struct list *arguments)
} }
} }
ret = sr_event_notif_send(session, xpath, values, values_cnt, ret = sr_event_notif_send(session, xpath, values, values_cnt);
SR_EV_NOTIF_DEFAULT);
if (ret != SR_ERR_OK) { if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO, flog_err(EC_LIB_LIBSYSREPO,
"%s: sr_event_notif_send() failed for xpath %s", "%s: sr_event_notif_send() failed for xpath %s",
@ -563,102 +540,13 @@ static int frr_sr_notification_send(const char *xpath, struct list *arguments)
return NB_OK; return NB_OK;
} }
/* Code to integrate the sysrepo client into FRR main event loop. */
struct sysrepo_thread {
struct thread *thread;
sr_fd_event_t event;
int fd;
};
static struct sysrepo_thread *frr_sr_fd_lookup(sr_fd_event_t event, int fd)
{
struct sysrepo_thread *sr_thread;
struct listnode *node;
for (ALL_LIST_ELEMENTS_RO(sysrepo_threads, node, sr_thread)) {
if (sr_thread->event == event && sr_thread->fd == fd)
return sr_thread;
}
return NULL;
}
static void frr_sr_fd_add(int event, int fd)
{
struct sysrepo_thread *sr_thread;
if (frr_sr_fd_lookup(event, fd) != NULL)
return;
sr_thread = XCALLOC(MTYPE_SYSREPO, sizeof(*sr_thread));
sr_thread->event = event;
sr_thread->fd = fd;
listnode_add(sysrepo_threads, sr_thread);
switch (event) {
case SR_FD_INPUT_READY:
thread_add_read(master, frr_sr_read_cb, NULL, fd,
&sr_thread->thread);
break;
case SR_FD_OUTPUT_READY:
thread_add_write(master, frr_sr_write_cb, NULL, fd,
&sr_thread->thread);
break;
default:
return;
}
}
static void frr_sr_fd_free(struct sysrepo_thread *sr_thread)
{
THREAD_OFF(sr_thread->thread);
XFREE(MTYPE_SYSREPO, sr_thread);
}
static void frr_sr_fd_del(int event, int fd)
{
struct sysrepo_thread *sr_thread;
sr_thread = frr_sr_fd_lookup(event, fd);
if (!sr_thread)
return;
listnode_delete(sysrepo_threads, sr_thread);
frr_sr_fd_free(sr_thread);
}
static void frr_sr_fd_update(sr_fd_change_t *fd_change_set,
size_t fd_change_set_cnt)
{
for (size_t i = 0; i < fd_change_set_cnt; i++) {
int fd = fd_change_set[i].fd;
int event = fd_change_set[i].events;
if (event != SR_FD_INPUT_READY && event != SR_FD_OUTPUT_READY)
continue;
switch (fd_change_set[i].action) {
case SR_FD_START_WATCHING:
frr_sr_fd_add(event, fd);
break;
case SR_FD_STOP_WATCHING:
frr_sr_fd_del(event, fd);
break;
default:
break;
}
}
}
static int frr_sr_read_cb(struct thread *thread) static int frr_sr_read_cb(struct thread *thread)
{ {
sr_subscription_ctx_t *sr_subscription = THREAD_ARG(thread);
int fd = THREAD_FD(thread); int fd = THREAD_FD(thread);
sr_fd_change_t *fd_change_set = NULL;
size_t fd_change_set_cnt = 0;
int ret; int ret;
ret = sr_fd_event_process(fd, SR_FD_INPUT_READY, &fd_change_set, ret = sr_process_events(sr_subscription, session, NULL);
&fd_change_set_cnt);
if (ret != SR_ERR_OK) { if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO, "%s: sr_fd_event_process(): %s", flog_err(EC_LIB_LIBSYSREPO, "%s: sr_fd_event_process(): %s",
__func__, sr_strerror(ret)); __func__, sr_strerror(ret));
@ -666,31 +554,7 @@ static int frr_sr_read_cb(struct thread *thread)
} }
thread = NULL; thread = NULL;
thread_add_read(master, frr_sr_read_cb, NULL, fd, &thread); thread_add_read(master, frr_sr_read_cb, sr_subscription, fd, &thread);
frr_sr_fd_update(fd_change_set, fd_change_set_cnt);
free(fd_change_set);
return 0;
}
static int frr_sr_write_cb(struct thread *thread)
{
int fd = THREAD_FD(thread);
sr_fd_change_t *fd_change_set = NULL;
size_t fd_change_set_cnt = 0;
int ret;
ret = sr_fd_event_process(fd, SR_FD_OUTPUT_READY, &fd_change_set,
&fd_change_set_cnt);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO, "%s: sr_fd_event_process(): %s",
__func__, sr_strerror(ret));
return -1;
}
frr_sr_fd_update(fd_change_set, fd_change_set_cnt);
free(fd_change_set);
return 0; return 0;
} }
@ -700,8 +564,8 @@ static void frr_sr_subscribe_config(struct yang_module *module)
int ret; int ret;
ret = sr_module_change_subscribe( ret = sr_module_change_subscribe(
session, module->name, frr_sr_config_change_cb, NULL, 0, session, module->name, NULL, frr_sr_config_change_cb, NULL, 0,
SR_SUBSCR_DEFAULT | SR_SUBSCR_EV_ENABLED, SR_SUBSCR_DEFAULT | SR_SUBSCR_ENABLED | SR_SUBSCR_NO_THREAD,
&module->sr_subscription); &module->sr_subscription);
if (ret != SR_ERR_OK) if (ret != SR_ERR_OK)
flog_err(EC_LIB_LIBSYSREPO, "sr_module_change_subscribe(): %s", flog_err(EC_LIB_LIBSYSREPO, "sr_module_change_subscribe(): %s",
@ -725,11 +589,11 @@ static int frr_sr_subscribe_state(const struct lys_node *snode, void *arg)
DEBUGD(&nb_dbg_client_sysrepo, "%s: providing data to '%s'", __func__, DEBUGD(&nb_dbg_client_sysrepo, "%s: providing data to '%s'", __func__,
nb_node->xpath); nb_node->xpath);
ret = sr_dp_get_items_subscribe( ret = sr_oper_get_items_subscribe(
session, nb_node->xpath, frr_sr_state_cb, NULL, session, snode->module->name, nb_node->xpath, frr_sr_state_cb,
SR_SUBSCR_CTX_REUSE, &module->sr_subscription); NULL, SR_SUBSCR_CTX_REUSE, &module->sr_subscription);
if (ret != SR_ERR_OK) if (ret != SR_ERR_OK)
flog_err(EC_LIB_LIBSYSREPO, "sr_dp_get_items_subscribe(): %s", flog_err(EC_LIB_LIBSYSREPO, "sr_oper_get_items_subscribe(): %s",
sr_strerror(ret)); sr_strerror(ret));
return YANG_ITER_CONTINUE; return YANG_ITER_CONTINUE;
@ -750,7 +614,7 @@ static int frr_sr_subscribe_rpc(const struct lys_node *snode, void *arg)
nb_node->xpath); nb_node->xpath);
ret = sr_rpc_subscribe(session, nb_node->xpath, frr_sr_config_rpc_cb, ret = sr_rpc_subscribe(session, nb_node->xpath, frr_sr_config_rpc_cb,
NULL, SR_SUBSCR_CTX_REUSE, NULL, 0, SR_SUBSCR_CTX_REUSE,
&module->sr_subscription); &module->sr_subscription);
if (ret != SR_ERR_OK) if (ret != SR_ERR_OK)
flog_err(EC_LIB_LIBSYSREPO, "sr_rpc_subscribe(): %s", flog_err(EC_LIB_LIBSYSREPO, "sr_rpc_subscribe(): %s",
@ -759,30 +623,6 @@ static int frr_sr_subscribe_rpc(const struct lys_node *snode, void *arg)
return YANG_ITER_CONTINUE; return YANG_ITER_CONTINUE;
} }
static int frr_sr_subscribe_action(const struct lys_node *snode, void *arg)
{
struct yang_module *module = arg;
struct nb_node *nb_node;
int ret;
if (snode->nodetype != LYS_ACTION)
return YANG_ITER_CONTINUE;
nb_node = snode->priv;
DEBUGD(&nb_dbg_client_sysrepo, "%s: providing action to '%s'", __func__,
nb_node->xpath);
ret = sr_action_subscribe(session, nb_node->xpath, frr_sr_config_rpc_cb,
NULL, SR_SUBSCR_CTX_REUSE,
&module->sr_subscription);
if (ret != SR_ERR_OK)
flog_err(EC_LIB_LIBSYSREPO, "sr_action_subscribe(): %s",
sr_strerror(ret));
return YANG_ITER_CONTINUE;
}
/* CLI commands. */ /* CLI commands. */
DEFUN (debug_nb_sr, DEFUN (debug_nb_sr,
debug_nb_sr_cmd, debug_nb_sr_cmd,
@ -830,22 +670,13 @@ static void frr_sr_cli_init(void)
} }
/* FRR's Sysrepo initialization. */ /* FRR's Sysrepo initialization. */
static int frr_sr_init(const char *program_name) static int frr_sr_init(void)
{ {
struct yang_module *module; struct yang_module *module;
int sysrepo_fd, ret; int ret;
sysrepo_threads = list_new();
ret = sr_fd_watcher_init(&sysrepo_fd, NULL);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_fd_watcher_init(): %s",
__func__, sr_strerror(ret));
goto cleanup;
}
/* Connect to Sysrepo. */ /* Connect to Sysrepo. */
ret = sr_connect(program_name, SR_CONN_DEFAULT, &connection); ret = sr_connect(SR_CONN_DEFAULT, &connection);
if (ret != SR_ERR_OK) { if (ret != SR_ERR_OK) {
flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_connect(): %s", __func__, flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_connect(): %s", __func__,
sr_strerror(ret)); sr_strerror(ret));
@ -853,8 +684,7 @@ static int frr_sr_init(const char *program_name)
} }
/* Start session. */ /* Start session. */
ret = sr_session_start(connection, SR_DS_RUNNING, SR_SESS_DEFAULT, ret = sr_session_start(connection, SR_DS_RUNNING, &session);
&session);
if (ret != SR_ERR_OK) { if (ret != SR_ERR_OK) {
flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_session_start(): %s", flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_session_start(): %s",
__func__, sr_strerror(ret)); __func__, sr_strerror(ret));
@ -863,19 +693,28 @@ static int frr_sr_init(const char *program_name)
/* Perform subscriptions. */ /* Perform subscriptions. */
RB_FOREACH (module, yang_modules, &yang_modules) { RB_FOREACH (module, yang_modules, &yang_modules) {
int event_pipe;
frr_sr_subscribe_config(module); frr_sr_subscribe_config(module);
yang_snodes_iterate_module(module->info, frr_sr_subscribe_state, yang_snodes_iterate_module(module->info, frr_sr_subscribe_state,
0, module); 0, module);
yang_snodes_iterate_module(module->info, frr_sr_subscribe_rpc, yang_snodes_iterate_module(module->info, frr_sr_subscribe_rpc,
0, module); 0, module);
yang_snodes_iterate_module(module->info,
frr_sr_subscribe_action, 0, module); /* Watch subscriptions. */
ret = sr_get_event_pipe(module->sr_subscription, &event_pipe);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_SYSREPO_INIT,
"%s: sr_get_event_pipe(): %s", __func__,
sr_strerror(ret));
goto cleanup;
}
thread_add_read(master, frr_sr_read_cb, module->sr_subscription,
event_pipe, &module->sr_thread);
} }
hook_register(nb_notification_send, frr_sr_notification_send); hook_register(nb_notification_send, frr_sr_notification_send);
frr_sr_fd_add(SR_FD_INPUT_READY, sysrepo_fd);
return 0; return 0;
cleanup: cleanup:
@ -891,7 +730,8 @@ static int frr_sr_finish(void)
RB_FOREACH (module, yang_modules, &yang_modules) { RB_FOREACH (module, yang_modules, &yang_modules) {
if (!module->sr_subscription) if (!module->sr_subscription)
continue; continue;
sr_unsubscribe(session, module->sr_subscription); sr_unsubscribe(module->sr_subscription);
THREAD_OFF(module->sr_thread);
} }
if (session) if (session)
@ -899,10 +739,6 @@ static int frr_sr_finish(void)
if (connection) if (connection)
sr_disconnect(connection); sr_disconnect(connection);
sysrepo_threads->del = (void (*)(void *))frr_sr_fd_free;
list_delete(&sysrepo_threads);
sr_fd_watcher_cleanup();
return 0; return 0;
} }
@ -910,7 +746,7 @@ static int frr_sr_module_late_init(struct thread_master *tm)
{ {
master = tm; master = tm;
if (frr_sr_init(frr_get_progname()) < 0) { if (frr_sr_init() < 0) {
flog_err(EC_LIB_SYSREPO_INIT, flog_err(EC_LIB_SYSREPO_INIT,
"failed to initialize the Sysrepo module"); "failed to initialize the Sysrepo module");
return -1; return -1;

View File

@ -63,6 +63,7 @@ struct yang_module {
#endif #endif
#ifdef HAVE_SYSREPO #ifdef HAVE_SYSREPO
sr_subscription_ctx_t *sr_subscription; sr_subscription_ctx_t *sr_subscription;
struct thread *sr_thread;
#endif #endif
}; };
RB_HEAD(yang_modules, yang_module); RB_HEAD(yang_modules, yang_module);