lib: improve error handling for datastore notifications

Signed-off-by: Christian Hopps <chopps@labn.net>
This commit is contained in:
Christian Hopps 2025-01-17 21:21:33 +00:00
parent 1f1d166288
commit 597d79a89e

View File

@ -480,87 +480,96 @@ static struct op_changes_group *op_changes_group_next(void)
/* Query for changes and notify */ /* Query for changes and notify */
/* ---------------------------- */ /* ---------------------------- */
static void timer_walk_abort(struct nb_notif_walk_args *args);
static void timer_walk_continue(struct event *event); static void timer_walk_continue(struct event *event);
static void timer_walk_done(struct nb_notif_walk_args *args);
static struct op_change *__next_change(struct op_changes_group *group)
{
struct op_change *next = RB_NEXT(op_changes, group->cur_change);
/* Remove and free current so retry works */
RB_REMOVE(op_changes, group->cur_changes, group->cur_change);
op_change_free(group->cur_change);
return next;
}
static struct op_changes_group *__next_group(struct op_changes_group *group)
{
__dbg("done with oper-path collection for group");
op_changes_group_free(group);
return op_changes_group_next();
}
static enum nb_error oper_walk_done(const struct lyd_node *tree, void *arg, enum nb_error ret) static enum nb_error oper_walk_done(const struct lyd_node *tree, void *arg, enum nb_error ret)
{ {
struct nb_notif_walk_args *args = arg; struct nb_notif_walk_args *args = arg;
struct op_changes_group *group = args->group; struct op_changes_group *group = args->group;
const char *path = group->cur_change->path; const char *path = group->cur_change->path;
const char *op = group->cur_changes == &group->adds ? "add" : "delete";
/* we don't send batches when yielding as we need completed edit in any patch */ /* we don't send batches when yielding as we need completed edit in any patch */
assert(ret != NB_YIELD); assert(ret != NB_YIELD);
nb_notif_walk = NULL;
if (ret == NB_ERR_NOT_FOUND) { if (ret == NB_ERR_NOT_FOUND) {
__dbg("Path not found while walking oper tree: %s", path); __dbg("Path not found while walking oper tree: %s", path);
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args); ret = NB_OK;
return ret; } else if (ret != NB_OK) {
}
/* Something else went wrong with the walk */
if (ret != NB_OK) {
error: error:
__log_err("Error notifying for datastore change on path: %s: %s", path, __log_err("Error notifying for datastore path: %s: %s", path, nb_err_name(ret));
nb_err_name(ret));
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
/* XXX Need to inform mgmtd/front-ends things are out-of-sync */
return ret;
}
__dbg("done with oper-path collection for %s path: %s", op, path); timer_walk_abort(args);
goto done;
} else {
__dbg("Done with oper-path collection for path: %s", path);
/* Do we need this? */ /* Do we need this? */
while (tree->parent) while (tree->parent)
tree = lyd_parent(tree); tree = lyd_parent(tree);
/* Send the add (replace) notification */ /* Send the add (replace) notification */
if (mgmt_be_send_ds_replace_notification(path, tree)) { if (mgmt_be_send_ds_replace_notification(path, tree)) {
ret = NB_ERR; __log_err("Error sending notification message for path: %s", path);
goto error; ret = NB_ERR;
goto error;
}
} }
/* /*
* Advance to next change (either dels or adds or both). * Advance to next change.
*/ */
group->cur_change = RB_NEXT(op_changes, group->cur_change); group->cur_change = __next_change(group);
if (!group->cur_change) { if (!group->cur_change) {
__dbg("done with oper-path collection for group"); args->group = __next_group(group);
op_changes_group_free(group); if (!args->group) {
timer_walk_done(args);
group = op_changes_group_next();
args->group = group;
if (!group) {
__dbg("done with ALL oper-path collection for notification");
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
goto done; goto done;
} }
} }
/* Run next walk after giving other events a shot to run */
event_add_timer_msec(nb_notif_master, timer_walk_continue, args, 0, &nb_notif_timer); event_add_timer_msec(nb_notif_master, timer_walk_continue, args, 0, &nb_notif_timer);
done: done:
/* Done with current walk and scheduled next one if there is more */ /* Done with current walk and scheduled next one if there is more */
nb_notif_walk = NULL; nb_notif_walk = NULL;
return NB_OK; return ret;
} }
static LY_ERR nb_notify_delete_changes(struct nb_notif_walk_args *args) static int nb_notify_delete_changes(struct nb_notif_walk_args *args)
{ {
struct op_changes_group *group = args->group; struct op_changes_group *group = args->group;
LY_ERR err;
group->cur_change = RB_MIN(op_changes, group->cur_changes); group->cur_change = RB_MIN(op_changes, group->cur_changes);
while (group->cur_change) { while (group->cur_change) {
err = mgmt_be_send_ds_delete_notification(group->cur_change->path); if (mgmt_be_send_ds_delete_notification(group->cur_change->path)) {
assert(err == LY_SUCCESS); /* XXX */ __log_err("Error sending delete notification message for path: %s",
group->cur_change->path);
group->cur_change = RB_NEXT(op_changes, group->cur_change); return 1;
}
group->cur_change = __next_change(group);
} }
return 0;
return LY_SUCCESS;
} }
static void timer_walk_continue(struct event *event) static void timer_walk_continue(struct event *event)
@ -568,15 +577,17 @@ static void timer_walk_continue(struct event *event)
struct nb_notif_walk_args *args = EVENT_ARG(event); struct nb_notif_walk_args *args = EVENT_ARG(event);
struct op_changes_group *group = args->group; struct op_changes_group *group = args->group;
const char *path; const char *path;
LY_ERR err; int ret;
/* /*
* Notify about deletes until we have add changes to collect. * Notify about deletes until we have add changes to collect.
*/ */
while (group->cur_changes == &group->dels) { while (group->cur_changes == &group->dels) {
err = nb_notify_delete_changes(args); ret = nb_notify_delete_changes(args);
assert(err == LY_SUCCESS); /* XXX */ if (ret) {
assert(!group->cur_change); /* we send all the deletes in one message */ timer_walk_abort(args);
return;
}
/* after deletes advance to adds */ /* after deletes advance to adds */
group->cur_changes = &group->adds; group->cur_changes = &group->adds;
@ -584,14 +595,9 @@ static void timer_walk_continue(struct event *event)
if (group->cur_change) if (group->cur_change)
break; break;
__dbg("done with oper-path change group"); args->group = __next_group(group);
op_changes_group_free(group); if (!args->group) {
timer_walk_done(args);
group = op_changes_group_next();
args->group = group;
if (!group) {
__dbg("done with ALL oper-path changes");
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
return; return;
} }
} }
@ -621,6 +627,22 @@ static void timer_walk_start(struct event *event)
timer_walk_continue(event); timer_walk_continue(event);
} }
static void timer_walk_abort(struct nb_notif_walk_args *args)
{
__dbg("Failed notifying datastore changes, will retry");
__dbg("oper-state notify setting retry timer to fire in: %d msec ", NB_NOTIF_TIMER_MSEC);
event_add_timer_msec(nb_notif_master, timer_walk_continue, args, NB_NOTIF_TIMER_MSEC,
&nb_notif_timer);
}
static void timer_walk_done(struct nb_notif_walk_args *args)
{
__dbg("Finished notifying for all datastore changes");
assert(!args->group);
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
}
static void nb_notif_set_walk_timer(void) static void nb_notif_set_walk_timer(void)
{ {
if (nb_notif_walk) { if (nb_notif_walk) {
@ -659,19 +681,23 @@ void nb_notif_init(struct event_loop *tm)
void nb_notif_terminate(void) void nb_notif_terminate(void)
{ {
struct nb_notif_walk_args *args; struct nb_notif_walk_args *args = nb_notif_timer ? EVENT_ARG(nb_notif_timer) : NULL;
struct op_changes_group *group; struct op_changes_group *group;
__dbg("terminating: timer: %p timer arg: %p walk %p", nb_notif_timer, args, nb_notif_walk);
EVENT_OFF(nb_notif_timer); EVENT_OFF(nb_notif_timer);
if (nb_notif_walk) { if (nb_notif_walk) {
nb_oper_cancel_walk(nb_notif_walk); /* Grab walk args from walk if active. */
/* need to free the group that's in the walk */
args = nb_oper_walk_finish_arg(nb_notif_walk); args = nb_oper_walk_finish_arg(nb_notif_walk);
if (args) nb_oper_cancel_walk(nb_notif_walk);
op_changes_group_free(args->group);
nb_notif_walk = NULL; nb_notif_walk = NULL;
} }
if (args) {
op_changes_group_free(args->group);
XFREE(MTYPE_NB_NOTIF_WALK_ARGS, args);
}
while ((group = op_changes_group_next())) while ((group = op_changes_group_next()))
op_changes_group_free(group); op_changes_group_free(group);