Merge pull request #6960 from opensourcerouting/sysrepo-plugin-update

lib: adapt plugin to use new Sysrepo version
This commit is contained in:
Donald Sharp 2020-08-20 20:11:15 -04:00 committed by GitHub
commit 731a536e36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 76 additions and 239 deletions

View File

@ -37,13 +37,11 @@ DEFINE_MTYPE_STATIC(LIB, SYSREPO, "Sysrepo module")
static struct debug nb_dbg_client_sysrepo = {0, "Northbound client: Sysrepo"};
static struct thread_master *master;
static struct list *sysrepo_threads;
static sr_session_ctx_t *session;
static sr_conn_ctx_t *connection;
static struct nb_transaction *transaction;
static int frr_sr_read_cb(struct thread *thread);
static int frr_sr_write_cb(struct thread *thread);
static int frr_sr_finish(void);
/* Convert FRR YANG data value to sysrepo YANG data value. */
@ -236,25 +234,23 @@ static int frr_sr_process_change(struct nb_config *candidate,
return NB_OK;
}
static int frr_sr_config_change_cb_verify(sr_session_ctx_t *session,
const char *module_name,
bool startup_config)
static int frr_sr_config_change_cb_prepare(sr_session_ctx_t *session,
const char *module_name,
bool startup_config)
{
sr_change_iter_t *it;
int ret;
sr_change_oper_t sr_op;
sr_val_t *sr_old_val, *sr_new_val;
char xpath[XPATH_MAXLEN];
struct nb_context context = {};
struct nb_config *candidate;
char errmsg[BUFSIZ] = {0};
snprintf(xpath, sizeof(xpath), "/%s:*", module_name);
ret = sr_get_changes_iter(session, xpath, &it);
ret = sr_get_changes_iter(session, "//*", &it);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO,
"%s: sr_get_changes_iter() failed for xpath %s",
__func__, xpath);
"%s: sr_get_changes_iter() failed for \"%s\"",
__func__, module_name);
return ret;
}
@ -307,12 +303,14 @@ static int frr_sr_config_change_cb_verify(sr_session_ctx_t *session,
__func__, nb_err_name(ret), errmsg);
}
if (!transaction)
nb_config_free(candidate);
/* Map northbound return code to sysrepo return code. */
switch (ret) {
case NB_OK:
return SR_ERR_OK;
case NB_ERR_NO_CHANGES:
nb_config_free(candidate);
return SR_ERR_OK;
case NB_ERR_LOCKED:
return SR_ERR_LOCKED;
@ -356,22 +354,23 @@ static int frr_sr_config_change_cb_abort(sr_session_ctx_t *session,
/* Callback for changes in the running configuration. */
static int frr_sr_config_change_cb(sr_session_ctx_t *session,
const char *module_name,
sr_notif_event_t sr_ev, void *private_ctx)
const char *module_name, const char *xpath,
sr_event_t sr_ev, uint32_t request_id,
void *private_data)
{
switch (sr_ev) {
case SR_EV_ENABLED:
return frr_sr_config_change_cb_verify(session, module_name,
true);
case SR_EV_VERIFY:
return frr_sr_config_change_cb_verify(session, module_name,
false);
case SR_EV_APPLY:
return frr_sr_config_change_cb_prepare(session, module_name,
true);
case SR_EV_CHANGE:
return frr_sr_config_change_cb_prepare(session, module_name,
false);
case SR_EV_DONE:
return frr_sr_config_change_cb_apply(session, module_name);
case SR_EV_ABORT:
return frr_sr_config_change_cb_abort(session, module_name);
default:
flog_err(EC_LIB_LIBSYSREPO, "%s: unknown sysrepo event: %u",
flog_err(EC_LIB_LIBSYSREPO, "%s: unexpected sysrepo event: %u",
__func__, sr_ev);
return SR_ERR_INTERNAL;
}
@ -381,70 +380,49 @@ static int frr_sr_state_data_iter_cb(const struct lys_node *snode,
struct yang_translator *translator,
struct yang_data *data, void *arg)
{
struct list *elements = arg;
struct lyd_node *dnode = arg;
listnode_add(elements, data);
ly_errno = 0;
dnode = lyd_new_path(dnode, ly_native_ctx, data->xpath, data->value, 0,
LYD_PATH_OPT_UPDATE);
if (!dnode && ly_errno) {
flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed",
__func__);
yang_data_free(data);
return NB_ERR;
}
yang_data_free(data);
return NB_OK;
}
/* Callback for state retrieval. */
static int frr_sr_state_cb(const char *xpath, sr_val_t **values,
size_t *values_cnt, uint64_t request_id,
const char *original_xpath, void *private_ctx)
static int frr_sr_state_cb(sr_session_ctx_t *session, const char *module_name,
const char *xpath, const char *request_xpath,
uint32_t request_id, struct lyd_node **parent,
void *private_ctx)
{
struct list *elements;
struct yang_data *data;
struct listnode *node;
sr_val_t *v;
int ret, count, i = 0;
struct lyd_node *dnode;
elements = yang_data_list_new();
if (nb_oper_data_iterate(xpath, NULL, NB_OPER_DATA_ITER_NORECURSE,
frr_sr_state_data_iter_cb, elements)
dnode = *parent;
if (nb_oper_data_iterate(request_xpath, NULL, 0,
frr_sr_state_data_iter_cb, dnode)
!= NB_OK) {
flog_warn(EC_LIB_NB_OPERATIONAL_DATA,
"%s: failed to obtain operational data [xpath %s]",
__func__, xpath);
goto exit;
return SR_ERR_INTERNAL;
}
if (list_isempty(elements))
goto exit;
count = listcount(elements);
ret = sr_new_values(count, &v);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO, "%s: sr_new_values(): %s", __func__,
sr_strerror(ret));
goto exit;
}
for (ALL_LIST_ELEMENTS_RO(elements, node, data)) {
if (yang_data_frr2sr(data, &v[i++]) != 0) {
flog_err(EC_LIB_SYSREPO_DATA_CONVERT,
"%s: failed to convert data to sysrepo format",
__func__);
}
}
*values = v;
*values_cnt = count;
list_delete(&elements);
return SR_ERR_OK;
exit:
list_delete(&elements);
*values = NULL;
*values_cnt = 0;
*parent = dnode;
return SR_ERR_OK;
}
static int frr_sr_config_rpc_cb(const char *xpath, const sr_val_t *sr_input,
const size_t input_cnt, sr_val_t **sr_output,
static int frr_sr_config_rpc_cb(sr_session_ctx_t *session, const char *xpath,
const sr_val_t *sr_input,
const size_t input_cnt, sr_event_t sr_ev,
uint32_t request_id, sr_val_t **sr_output,
size_t *sr_output_cnt, void *private_ctx)
{
struct nb_node *nb_node;
@ -551,8 +529,7 @@ static int frr_sr_notification_send(const char *xpath, struct list *arguments)
}
}
ret = sr_event_notif_send(session, xpath, values, values_cnt,
SR_EV_NOTIF_DEFAULT);
ret = sr_event_notif_send(session, xpath, values, values_cnt);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO,
"%s: sr_event_notif_send() failed for xpath %s",
@ -563,102 +540,13 @@ static int frr_sr_notification_send(const char *xpath, struct list *arguments)
return NB_OK;
}
/* Code to integrate the sysrepo client into FRR main event loop. */
struct sysrepo_thread {
struct thread *thread;
sr_fd_event_t event;
int fd;
};
static struct sysrepo_thread *frr_sr_fd_lookup(sr_fd_event_t event, int fd)
{
struct sysrepo_thread *sr_thread;
struct listnode *node;
for (ALL_LIST_ELEMENTS_RO(sysrepo_threads, node, sr_thread)) {
if (sr_thread->event == event && sr_thread->fd == fd)
return sr_thread;
}
return NULL;
}
static void frr_sr_fd_add(int event, int fd)
{
struct sysrepo_thread *sr_thread;
if (frr_sr_fd_lookup(event, fd) != NULL)
return;
sr_thread = XCALLOC(MTYPE_SYSREPO, sizeof(*sr_thread));
sr_thread->event = event;
sr_thread->fd = fd;
listnode_add(sysrepo_threads, sr_thread);
switch (event) {
case SR_FD_INPUT_READY:
thread_add_read(master, frr_sr_read_cb, NULL, fd,
&sr_thread->thread);
break;
case SR_FD_OUTPUT_READY:
thread_add_write(master, frr_sr_write_cb, NULL, fd,
&sr_thread->thread);
break;
default:
return;
}
}
static void frr_sr_fd_free(struct sysrepo_thread *sr_thread)
{
THREAD_OFF(sr_thread->thread);
XFREE(MTYPE_SYSREPO, sr_thread);
}
static void frr_sr_fd_del(int event, int fd)
{
struct sysrepo_thread *sr_thread;
sr_thread = frr_sr_fd_lookup(event, fd);
if (!sr_thread)
return;
listnode_delete(sysrepo_threads, sr_thread);
frr_sr_fd_free(sr_thread);
}
static void frr_sr_fd_update(sr_fd_change_t *fd_change_set,
size_t fd_change_set_cnt)
{
for (size_t i = 0; i < fd_change_set_cnt; i++) {
int fd = fd_change_set[i].fd;
int event = fd_change_set[i].events;
if (event != SR_FD_INPUT_READY && event != SR_FD_OUTPUT_READY)
continue;
switch (fd_change_set[i].action) {
case SR_FD_START_WATCHING:
frr_sr_fd_add(event, fd);
break;
case SR_FD_STOP_WATCHING:
frr_sr_fd_del(event, fd);
break;
default:
break;
}
}
}
static int frr_sr_read_cb(struct thread *thread)
{
sr_subscription_ctx_t *sr_subscription = THREAD_ARG(thread);
int fd = THREAD_FD(thread);
sr_fd_change_t *fd_change_set = NULL;
size_t fd_change_set_cnt = 0;
int ret;
ret = sr_fd_event_process(fd, SR_FD_INPUT_READY, &fd_change_set,
&fd_change_set_cnt);
ret = sr_process_events(sr_subscription, session, NULL);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO, "%s: sr_fd_event_process(): %s",
__func__, sr_strerror(ret));
@ -666,31 +554,7 @@ static int frr_sr_read_cb(struct thread *thread)
}
thread = NULL;
thread_add_read(master, frr_sr_read_cb, NULL, fd, &thread);
frr_sr_fd_update(fd_change_set, fd_change_set_cnt);
free(fd_change_set);
return 0;
}
static int frr_sr_write_cb(struct thread *thread)
{
int fd = THREAD_FD(thread);
sr_fd_change_t *fd_change_set = NULL;
size_t fd_change_set_cnt = 0;
int ret;
ret = sr_fd_event_process(fd, SR_FD_OUTPUT_READY, &fd_change_set,
&fd_change_set_cnt);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO, "%s: sr_fd_event_process(): %s",
__func__, sr_strerror(ret));
return -1;
}
frr_sr_fd_update(fd_change_set, fd_change_set_cnt);
free(fd_change_set);
thread_add_read(master, frr_sr_read_cb, sr_subscription, fd, &thread);
return 0;
}
@ -700,8 +564,8 @@ static void frr_sr_subscribe_config(struct yang_module *module)
int ret;
ret = sr_module_change_subscribe(
session, module->name, frr_sr_config_change_cb, NULL, 0,
SR_SUBSCR_DEFAULT | SR_SUBSCR_EV_ENABLED,
session, module->name, NULL, frr_sr_config_change_cb, NULL, 0,
SR_SUBSCR_DEFAULT | SR_SUBSCR_ENABLED | SR_SUBSCR_NO_THREAD,
&module->sr_subscription);
if (ret != SR_ERR_OK)
flog_err(EC_LIB_LIBSYSREPO, "sr_module_change_subscribe(): %s",
@ -725,11 +589,11 @@ static int frr_sr_subscribe_state(const struct lys_node *snode, void *arg)
DEBUGD(&nb_dbg_client_sysrepo, "%s: providing data to '%s'", __func__,
nb_node->xpath);
ret = sr_dp_get_items_subscribe(
session, nb_node->xpath, frr_sr_state_cb, NULL,
SR_SUBSCR_CTX_REUSE, &module->sr_subscription);
ret = sr_oper_get_items_subscribe(
session, snode->module->name, nb_node->xpath, frr_sr_state_cb,
NULL, SR_SUBSCR_CTX_REUSE, &module->sr_subscription);
if (ret != SR_ERR_OK)
flog_err(EC_LIB_LIBSYSREPO, "sr_dp_get_items_subscribe(): %s",
flog_err(EC_LIB_LIBSYSREPO, "sr_oper_get_items_subscribe(): %s",
sr_strerror(ret));
return YANG_ITER_CONTINUE;
@ -750,7 +614,7 @@ static int frr_sr_subscribe_rpc(const struct lys_node *snode, void *arg)
nb_node->xpath);
ret = sr_rpc_subscribe(session, nb_node->xpath, frr_sr_config_rpc_cb,
NULL, SR_SUBSCR_CTX_REUSE,
NULL, 0, SR_SUBSCR_CTX_REUSE,
&module->sr_subscription);
if (ret != SR_ERR_OK)
flog_err(EC_LIB_LIBSYSREPO, "sr_rpc_subscribe(): %s",
@ -759,30 +623,6 @@ static int frr_sr_subscribe_rpc(const struct lys_node *snode, void *arg)
return YANG_ITER_CONTINUE;
}
static int frr_sr_subscribe_action(const struct lys_node *snode, void *arg)
{
struct yang_module *module = arg;
struct nb_node *nb_node;
int ret;
if (snode->nodetype != LYS_ACTION)
return YANG_ITER_CONTINUE;
nb_node = snode->priv;
DEBUGD(&nb_dbg_client_sysrepo, "%s: providing action to '%s'", __func__,
nb_node->xpath);
ret = sr_action_subscribe(session, nb_node->xpath, frr_sr_config_rpc_cb,
NULL, SR_SUBSCR_CTX_REUSE,
&module->sr_subscription);
if (ret != SR_ERR_OK)
flog_err(EC_LIB_LIBSYSREPO, "sr_action_subscribe(): %s",
sr_strerror(ret));
return YANG_ITER_CONTINUE;
}
/* CLI commands. */
DEFUN (debug_nb_sr,
debug_nb_sr_cmd,
@ -830,22 +670,13 @@ static void frr_sr_cli_init(void)
}
/* FRR's Sysrepo initialization. */
static int frr_sr_init(const char *program_name)
static int frr_sr_init(void)
{
struct yang_module *module;
int sysrepo_fd, ret;
sysrepo_threads = list_new();
ret = sr_fd_watcher_init(&sysrepo_fd, NULL);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_fd_watcher_init(): %s",
__func__, sr_strerror(ret));
goto cleanup;
}
int ret;
/* Connect to Sysrepo. */
ret = sr_connect(program_name, SR_CONN_DEFAULT, &connection);
ret = sr_connect(SR_CONN_DEFAULT, &connection);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_connect(): %s", __func__,
sr_strerror(ret));
@ -853,8 +684,7 @@ static int frr_sr_init(const char *program_name)
}
/* Start session. */
ret = sr_session_start(connection, SR_DS_RUNNING, SR_SESS_DEFAULT,
&session);
ret = sr_session_start(connection, SR_DS_RUNNING, &session);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_session_start(): %s",
__func__, sr_strerror(ret));
@ -863,19 +693,28 @@ static int frr_sr_init(const char *program_name)
/* Perform subscriptions. */
RB_FOREACH (module, yang_modules, &yang_modules) {
int event_pipe;
frr_sr_subscribe_config(module);
yang_snodes_iterate_module(module->info, frr_sr_subscribe_state,
0, module);
yang_snodes_iterate_module(module->info, frr_sr_subscribe_rpc,
0, module);
yang_snodes_iterate_module(module->info,
frr_sr_subscribe_action, 0, module);
/* Watch subscriptions. */
ret = sr_get_event_pipe(module->sr_subscription, &event_pipe);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_SYSREPO_INIT,
"%s: sr_get_event_pipe(): %s", __func__,
sr_strerror(ret));
goto cleanup;
}
thread_add_read(master, frr_sr_read_cb, module->sr_subscription,
event_pipe, &module->sr_thread);
}
hook_register(nb_notification_send, frr_sr_notification_send);
frr_sr_fd_add(SR_FD_INPUT_READY, sysrepo_fd);
return 0;
cleanup:
@ -891,7 +730,8 @@ static int frr_sr_finish(void)
RB_FOREACH (module, yang_modules, &yang_modules) {
if (!module->sr_subscription)
continue;
sr_unsubscribe(session, module->sr_subscription);
sr_unsubscribe(module->sr_subscription);
THREAD_OFF(module->sr_thread);
}
if (session)
@ -899,10 +739,6 @@ static int frr_sr_finish(void)
if (connection)
sr_disconnect(connection);
sysrepo_threads->del = (void (*)(void *))frr_sr_fd_free;
list_delete(&sysrepo_threads);
sr_fd_watcher_cleanup();
return 0;
}
@ -910,7 +746,7 @@ static int frr_sr_module_late_init(struct thread_master *tm)
{
master = tm;
if (frr_sr_init(frr_get_progname()) < 0) {
if (frr_sr_init() < 0) {
flog_err(EC_LIB_SYSREPO_INIT,
"failed to initialize the Sysrepo module");
return -1;

View File

@ -63,6 +63,7 @@ struct yang_module {
#endif
#ifdef HAVE_SYSREPO
sr_subscription_ctx_t *sr_subscription;
struct thread *sr_thread;
#endif
};
RB_HEAD(yang_modules, yang_module);