mirror of
https://git.proxmox.com/git/mirror_frr
synced 2025-04-29 23:08:38 +00:00

This commit introduces the MGMT Backend Interface which can be used by back-end management client daemons like BGPd, Staticd, Zebra to connect with new FRR Management daemon (MGMTd) and utilize the new FRR Management Framework to let any Frontend clients to retrieve any operational data or manipulate any configuration data owned by the individual Backend daemon component. This commit includes the following functionalities in the changeset: 1. Add new Backend server for Backend daemons connect to. 2. Add a C-based Backend client library which can be used by daemons to communicate with MGMTd via the Backend interface. 3. Maintain a backend adapter for each connection from an appropriate Backend client to facilitate client requests and track one or more transactions initiated from Frontend client sessions that involves the backend client component. 4. Add the following commands to inspect various Backend client related information a. show mgmt backend-adapter all b. show mgmt backend-yang-xpath-registry c. show mgmt yang-xpath-subscription Co-authored-by: Pushpasis Sarkar <pushpasis@gmail.com> Co-authored-by: Abhinay Ramesh <rabhinay@vmware.com> Co-authored-by: Ujwal P <ujwalp@vmware.com> Signed-off-by: Yash Ranjan <ranjany@vmware.com>
1320 lines
36 KiB
C++
1320 lines
36 KiB
C++
// SPDX-License-Identifier: GPL-2.0-or-later
|
|
//
|
|
// Copyright (c) 2021-2022, LabN Consulting, L.L.C
|
|
// Copyright (C) 2019 NetDEF, Inc.
|
|
// Renato Westphal
|
|
//
|
|
|
|
#include <zebra.h>
|
|
#include <grpcpp/grpcpp.h>
|
|
#include "grpc/frr-northbound.grpc.pb.h"
|
|
|
|
#include "log.h"
|
|
#include "libfrr.h"
|
|
#include "lib/version.h"
|
|
#include "lib/thread.h"
|
|
#include "command.h"
|
|
#include "lib_errors.h"
|
|
#include "northbound.h"
|
|
#include "northbound_db.h"
|
|
#include "frr_pthread.h"
|
|
|
|
#include <iostream>
|
|
#include <sstream>
|
|
#include <memory>
|
|
#include <string>
|
|
|
|
#define GRPC_DEFAULT_PORT 50051
|
|
|
|
|
|
// ------------------------------------------------------
|
|
// File Local Variables
|
|
// ------------------------------------------------------
|
|
|
|
/*
|
|
* NOTE: we can't use the FRR debugging infrastructure here since it uses
|
|
* atomics and C++ has a different atomics API. Enable gRPC debugging
|
|
* unconditionally until we figure out a way to solve this problem.
|
|
*/
|
|
static bool nb_dbg_client_grpc = 0;
|
|
|
|
static struct thread_master *main_master;
|
|
|
|
static struct frr_pthread *fpt;
|
|
|
|
static bool grpc_running;
|
|
|
|
#define grpc_debug(...) \
|
|
do { \
|
|
if (nb_dbg_client_grpc) \
|
|
zlog_debug(__VA_ARGS__); \
|
|
} while (0)
|
|
|
|
// ------------------------------------------------------
|
|
// New Types
|
|
// ------------------------------------------------------
|
|
|
|
enum CallState { CREATE, PROCESS, MORE, FINISH, DELETED };
|
|
const char *call_states[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"};
|
|
|
|
struct candidate {
|
|
uint64_t id;
|
|
struct nb_config *config;
|
|
struct nb_transaction *transaction;
|
|
};
|
|
|
|
class Candidates
|
|
{
|
|
public:
|
|
~Candidates(void)
|
|
{
|
|
// Delete candidates.
|
|
for (auto it = _cdb.begin(); it != _cdb.end(); it++)
|
|
delete_candidate(it->first);
|
|
}
|
|
|
|
struct candidate *create_candidate(void)
|
|
{
|
|
uint64_t id = ++_next_id;
|
|
assert(id); // TODO: implement an algorithm for unique reusable
|
|
// IDs.
|
|
struct candidate *c = &_cdb[id];
|
|
c->id = id;
|
|
c->config = nb_config_dup(running_config);
|
|
c->transaction = NULL;
|
|
|
|
return c;
|
|
}
|
|
|
|
bool contains(uint64_t candidate_id)
|
|
{
|
|
return _cdb.count(candidate_id) > 0;
|
|
}
|
|
|
|
void delete_candidate(uint64_t candidate_id)
|
|
{
|
|
struct candidate *c = &_cdb[candidate_id];
|
|
char errmsg[BUFSIZ] = {0};
|
|
|
|
nb_config_free(c->config);
|
|
if (c->transaction)
|
|
nb_candidate_commit_abort(c->transaction, errmsg,
|
|
sizeof(errmsg));
|
|
_cdb.erase(c->id);
|
|
}
|
|
|
|
struct candidate *get_candidate(uint64_t id)
|
|
{
|
|
return _cdb.count(id) == 0 ? NULL : &_cdb[id];
|
|
}
|
|
|
|
private:
|
|
uint64_t _next_id = 0;
|
|
std::map<uint64_t, struct candidate> _cdb;
|
|
};
|
|
|
|
/*
|
|
* RpcStateBase is the common base class used to track a gRPC RPC.
|
|
*/
|
|
class RpcStateBase
|
|
{
|
|
public:
|
|
virtual void do_request(::frr::Northbound::AsyncService *service,
|
|
::grpc::ServerCompletionQueue *cq,
|
|
bool no_copy) = 0;
|
|
|
|
RpcStateBase(const char *name) : name(name){};
|
|
|
|
virtual ~RpcStateBase() = default;
|
|
|
|
CallState get_state() const
|
|
{
|
|
return state;
|
|
}
|
|
|
|
bool is_initial_process() const
|
|
{
|
|
/* Will always be true for Unary */
|
|
return entered_state == CREATE;
|
|
}
|
|
|
|
// Returns "more" status, if false caller can delete
|
|
bool run(frr::Northbound::AsyncService *service,
|
|
grpc::ServerCompletionQueue *cq)
|
|
{
|
|
/*
|
|
* We enter in either CREATE or MORE state, and transition to
|
|
* PROCESS state.
|
|
*/
|
|
this->entered_state = this->state;
|
|
this->state = PROCESS;
|
|
grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name,
|
|
call_states[this->entered_state],
|
|
call_states[this->state]);
|
|
/*
|
|
* We schedule the callback on the main pthread, and wait for
|
|
* the state to transition out of the PROCESS state. The new
|
|
* state will either be MORE or FINISH. It will always be FINISH
|
|
* for Unary RPCs.
|
|
*/
|
|
thread_add_event(main_master, c_callback, (void *)this, 0,
|
|
NULL);
|
|
|
|
pthread_mutex_lock(&this->cmux);
|
|
while (this->state == PROCESS)
|
|
pthread_cond_wait(&this->cond, &this->cmux);
|
|
pthread_mutex_unlock(&this->cmux);
|
|
|
|
grpc_debug("%s RPC in %s on grpc-io-thread", name,
|
|
call_states[this->state]);
|
|
|
|
if (this->state == FINISH) {
|
|
/*
|
|
* Server is done (FINISH) so prep to receive a new
|
|
* request of this type. We could do this earlier but
|
|
* that would mean we could be handling multiple same
|
|
* type requests in parallel without limit.
|
|
*/
|
|
this->do_request(service, cq, false);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
protected:
|
|
virtual CallState run_mainthread(struct thread *thread) = 0;
|
|
|
|
static void c_callback(struct thread *thread)
|
|
{
|
|
auto _tag = static_cast<RpcStateBase *>(THREAD_ARG(thread));
|
|
/*
|
|
* We hold the lock until the callback finishes and has updated
|
|
* _tag->state, then we signal done and release.
|
|
*/
|
|
pthread_mutex_lock(&_tag->cmux);
|
|
|
|
CallState enter_state = _tag->state;
|
|
grpc_debug("%s RPC: running %s on main thread", _tag->name,
|
|
call_states[enter_state]);
|
|
|
|
_tag->state = _tag->run_mainthread(thread);
|
|
|
|
grpc_debug("%s RPC: %s -> %s [main thread]", _tag->name,
|
|
call_states[enter_state], call_states[_tag->state]);
|
|
|
|
pthread_cond_signal(&_tag->cond);
|
|
pthread_mutex_unlock(&_tag->cmux);
|
|
return;
|
|
}
|
|
|
|
grpc::ServerContext ctx;
|
|
pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
|
|
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
|
|
CallState state = CREATE;
|
|
CallState entered_state = CREATE;
|
|
|
|
public:
|
|
const char *name;
|
|
};
|
|
|
|
/*
|
|
* The UnaryRpcState class is used to track the execution of a Unary RPC.
|
|
*
|
|
* Template Args:
|
|
* Q - the request type for a given unary RPC
|
|
* S - the response type for a given unary RPC
|
|
*/
|
|
template <typename Q, typename S> class UnaryRpcState : public RpcStateBase
|
|
{
|
|
public:
|
|
typedef void (frr::Northbound::AsyncService::*reqfunc_t)(
|
|
::grpc::ServerContext *, Q *,
|
|
::grpc::ServerAsyncResponseWriter<S> *,
|
|
::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
|
|
void *);
|
|
|
|
UnaryRpcState(Candidates *cdb, reqfunc_t rfunc,
|
|
grpc::Status (*cb)(UnaryRpcState<Q, S> *),
|
|
const char *name)
|
|
: RpcStateBase(name), cdb(cdb), requestf(rfunc), callback(cb),
|
|
responder(&ctx){};
|
|
|
|
void do_request(::frr::Northbound::AsyncService *service,
|
|
::grpc::ServerCompletionQueue *cq,
|
|
bool no_copy) override
|
|
{
|
|
grpc_debug("%s, posting a request for: %s", __func__, name);
|
|
auto copy = no_copy ? this
|
|
: new UnaryRpcState(cdb, requestf, callback,
|
|
name);
|
|
(service->*requestf)(©->ctx, ©->request,
|
|
©->responder, cq, cq, copy);
|
|
}
|
|
|
|
CallState run_mainthread(struct thread *thread) override
|
|
{
|
|
// Unary RPC are always finished, see "Unary" :)
|
|
grpc::Status status = this->callback(this);
|
|
responder.Finish(response, status, this);
|
|
return FINISH;
|
|
}
|
|
|
|
Candidates *cdb;
|
|
|
|
Q request;
|
|
S response;
|
|
grpc::ServerAsyncResponseWriter<S> responder;
|
|
|
|
grpc::Status (*callback)(UnaryRpcState<Q, S> *);
|
|
reqfunc_t requestf = NULL;
|
|
};
|
|
|
|
/*
|
|
* The StreamRpcState class is used to track the execution of a Streaming RPC.
|
|
*
|
|
* Template Args:
|
|
* Q - the request type for a given streaming RPC
|
|
* S - the response type for a given streaming RPC
|
|
* X - the type used to track the streaming state
|
|
*/
|
|
template <typename Q, typename S, typename X>
|
|
class StreamRpcState : public RpcStateBase
|
|
{
|
|
public:
|
|
typedef void (frr::Northbound::AsyncService::*reqsfunc_t)(
|
|
::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *,
|
|
::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
|
|
void *);
|
|
|
|
StreamRpcState(reqsfunc_t rfunc, bool (*cb)(StreamRpcState<Q, S, X> *),
|
|
const char *name)
|
|
: RpcStateBase(name), requestsf(rfunc), callback(cb),
|
|
async_responder(&ctx){};
|
|
|
|
void do_request(::frr::Northbound::AsyncService *service,
|
|
::grpc::ServerCompletionQueue *cq,
|
|
bool no_copy) override
|
|
{
|
|
grpc_debug("%s, posting a request for: %s", __func__, name);
|
|
auto copy =
|
|
no_copy ? this
|
|
: new StreamRpcState(requestsf, callback, name);
|
|
(service->*requestsf)(©->ctx, ©->request,
|
|
©->async_responder, cq, cq, copy);
|
|
}
|
|
|
|
CallState run_mainthread(struct thread *thread) override
|
|
{
|
|
if (this->callback(this))
|
|
return MORE;
|
|
else
|
|
return FINISH;
|
|
}
|
|
|
|
Q request;
|
|
S response;
|
|
grpc::ServerAsyncWriter<S> async_responder;
|
|
|
|
bool (*callback)(StreamRpcState<Q, S, X> *);
|
|
reqsfunc_t requestsf = NULL;
|
|
|
|
X context;
|
|
};
|
|
|
|
// ------------------------------------------------------
|
|
// Utility Functions
|
|
// ------------------------------------------------------
|
|
|
|
static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)
|
|
{
|
|
switch (encoding) {
|
|
case frr::JSON:
|
|
return LYD_JSON;
|
|
case frr::XML:
|
|
return LYD_XML;
|
|
default:
|
|
flog_err(EC_LIB_DEVELOPMENT,
|
|
"%s: unknown data encoding format (%u)", __func__,
|
|
encoding);
|
|
exit(1);
|
|
}
|
|
}
|
|
|
|
static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path,
|
|
const char *value)
|
|
{
|
|
LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value,
|
|
LYD_NEW_PATH_UPDATE, &dnode);
|
|
if (err != LY_SUCCESS) {
|
|
flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s",
|
|
__func__, ly_errmsg(ly_native_ctx));
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int yang_dnode_delete(struct lyd_node *dnode, const std::string &path)
|
|
{
|
|
dnode = yang_dnode_get(dnode, path.c_str());
|
|
if (!dnode)
|
|
return -1;
|
|
|
|
lyd_free_tree(dnode);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static LY_ERR data_tree_from_dnode(frr::DataTree *dt,
|
|
const struct lyd_node *dnode,
|
|
LYD_FORMAT lyd_format, bool with_defaults)
|
|
{
|
|
char *strp;
|
|
int options = 0;
|
|
|
|
SET_FLAG(options, LYD_PRINT_WITHSIBLINGS);
|
|
if (with_defaults)
|
|
SET_FLAG(options, LYD_PRINT_WD_ALL);
|
|
else
|
|
SET_FLAG(options, LYD_PRINT_WD_TRIM);
|
|
|
|
LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options);
|
|
if (err == LY_SUCCESS) {
|
|
if (strp) {
|
|
dt->set_data(strp);
|
|
free(strp);
|
|
}
|
|
}
|
|
return err;
|
|
}
|
|
|
|
static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt,
|
|
bool config_only)
|
|
{
|
|
struct lyd_node *dnode;
|
|
int options, opt2;
|
|
LY_ERR err;
|
|
|
|
if (config_only) {
|
|
options = LYD_PARSE_NO_STATE;
|
|
opt2 = LYD_VALIDATE_NO_STATE;
|
|
} else {
|
|
options = LYD_PARSE_STRICT;
|
|
opt2 = 0;
|
|
}
|
|
|
|
err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(),
|
|
encoding2lyd_format(dt->encoding()), options,
|
|
opt2, &dnode);
|
|
if (err != LY_SUCCESS) {
|
|
flog_warn(EC_LIB_LIBYANG, "%s: lyd_parse_mem() failed: %s",
|
|
__func__, ly_errmsg(ly_native_ctx));
|
|
}
|
|
return dnode;
|
|
}
|
|
|
|
static struct lyd_node *get_dnode_config(const std::string &path)
|
|
{
|
|
struct lyd_node *dnode;
|
|
|
|
if (!yang_dnode_exists(running_config->dnode,
|
|
path.empty() ? NULL : path.c_str()))
|
|
return NULL;
|
|
|
|
dnode = yang_dnode_get(running_config->dnode,
|
|
path.empty() ? NULL : path.c_str());
|
|
if (dnode)
|
|
dnode = yang_dnode_dup(dnode);
|
|
|
|
return dnode;
|
|
}
|
|
|
|
static int get_oper_data_cb(const struct lysc_node *snode,
|
|
struct yang_translator *translator,
|
|
struct yang_data *data, void *arg)
|
|
{
|
|
struct lyd_node *dnode = static_cast<struct lyd_node *>(arg);
|
|
int ret = yang_dnode_edit(dnode, data->xpath, data->value);
|
|
yang_data_free(data);
|
|
|
|
return (ret == 0) ? NB_OK : NB_ERR;
|
|
}
|
|
|
|
static struct lyd_node *get_dnode_state(const std::string &path)
|
|
{
|
|
struct lyd_node *dnode = yang_dnode_new(ly_native_ctx, false);
|
|
if (nb_oper_data_iterate(path.c_str(), NULL, 0, get_oper_data_cb, dnode)
|
|
!= NB_OK) {
|
|
yang_dnode_free(dnode);
|
|
return NULL;
|
|
}
|
|
|
|
return dnode;
|
|
}
|
|
|
|
static grpc::Status get_path(frr::DataTree *dt, const std::string &path,
|
|
int type, LYD_FORMAT lyd_format,
|
|
bool with_defaults)
|
|
{
|
|
struct lyd_node *dnode_config = NULL;
|
|
struct lyd_node *dnode_state = NULL;
|
|
struct lyd_node *dnode_final;
|
|
|
|
// Configuration data.
|
|
if (type == frr::GetRequest_DataType_ALL
|
|
|| type == frr::GetRequest_DataType_CONFIG) {
|
|
dnode_config = get_dnode_config(path);
|
|
if (!dnode_config)
|
|
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
|
|
"Data path not found");
|
|
}
|
|
|
|
// Operational data.
|
|
if (type == frr::GetRequest_DataType_ALL
|
|
|| type == frr::GetRequest_DataType_STATE) {
|
|
dnode_state = get_dnode_state(path);
|
|
if (!dnode_state) {
|
|
if (dnode_config)
|
|
yang_dnode_free(dnode_config);
|
|
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
|
|
"Failed to fetch operational data");
|
|
}
|
|
}
|
|
|
|
switch (type) {
|
|
case frr::GetRequest_DataType_ALL:
|
|
//
|
|
// Combine configuration and state data into a single
|
|
// dnode.
|
|
//
|
|
if (lyd_merge_siblings(&dnode_state, dnode_config,
|
|
LYD_MERGE_DESTRUCT)
|
|
!= LY_SUCCESS) {
|
|
yang_dnode_free(dnode_state);
|
|
yang_dnode_free(dnode_config);
|
|
return grpc::Status(
|
|
grpc::StatusCode::INTERNAL,
|
|
"Failed to merge configuration and state data",
|
|
ly_errmsg(ly_native_ctx));
|
|
}
|
|
|
|
dnode_final = dnode_state;
|
|
break;
|
|
case frr::GetRequest_DataType_CONFIG:
|
|
dnode_final = dnode_config;
|
|
break;
|
|
case frr::GetRequest_DataType_STATE:
|
|
dnode_final = dnode_state;
|
|
break;
|
|
}
|
|
|
|
// Validate data to create implicit default nodes if necessary.
|
|
int validate_opts = 0;
|
|
if (type == frr::GetRequest_DataType_CONFIG)
|
|
validate_opts = LYD_VALIDATE_NO_STATE;
|
|
else
|
|
validate_opts = 0;
|
|
|
|
LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx,
|
|
validate_opts, NULL);
|
|
|
|
if (err)
|
|
flog_warn(EC_LIB_LIBYANG, "%s: lyd_validate_all() failed: %s",
|
|
__func__, ly_errmsg(ly_native_ctx));
|
|
// Dump data using the requested format.
|
|
if (!err)
|
|
err = data_tree_from_dnode(dt, dnode_final, lyd_format,
|
|
with_defaults);
|
|
yang_dnode_free(dnode_final);
|
|
if (err)
|
|
return grpc::Status(grpc::StatusCode::INTERNAL,
|
|
"Failed to dump data");
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
|
|
// ------------------------------------------------------
|
|
// RPC Callback Functions: run on main thread
|
|
// ------------------------------------------------------
|
|
|
|
grpc::Status HandleUnaryGetCapabilities(
|
|
UnaryRpcState<frr::GetCapabilitiesRequest, frr::GetCapabilitiesResponse>
|
|
*tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
// Response: string frr_version = 1;
|
|
tag->response.set_frr_version(FRR_VERSION);
|
|
|
|
// Response: bool rollback_support = 2;
|
|
#ifdef HAVE_CONFIG_ROLLBACKS
|
|
tag->response.set_rollback_support(true);
|
|
#else
|
|
tag->response.set_rollback_support(false);
|
|
#endif
|
|
// Response: repeated ModuleData supported_modules = 3;
|
|
struct yang_module *module;
|
|
RB_FOREACH (module, yang_modules, &yang_modules) {
|
|
auto m = tag->response.add_supported_modules();
|
|
|
|
m->set_name(module->name);
|
|
if (module->info->revision)
|
|
m->set_revision(module->info->revision);
|
|
m->set_organization(module->info->org);
|
|
}
|
|
|
|
// Response: repeated Encoding supported_encodings = 4;
|
|
tag->response.add_supported_encodings(frr::JSON);
|
|
tag->response.add_supported_encodings(frr::XML);
|
|
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
// Define the context variable type for this streaming handler
|
|
typedef std::list<std::string> GetContextType;
|
|
|
|
bool HandleStreamingGet(
|
|
StreamRpcState<frr::GetRequest, frr::GetResponse, GetContextType> *tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
auto mypathps = &tag->context;
|
|
if (tag->is_initial_process()) {
|
|
// Fill our context container first time through
|
|
grpc_debug("%s: initialize streaming state", __func__);
|
|
auto paths = tag->request.path();
|
|
for (const std::string &path : paths) {
|
|
mypathps->push_back(std::string(path));
|
|
}
|
|
}
|
|
|
|
// Request: DataType type = 1;
|
|
int type = tag->request.type();
|
|
// Request: Encoding encoding = 2;
|
|
frr::Encoding encoding = tag->request.encoding();
|
|
// Request: bool with_defaults = 3;
|
|
bool with_defaults = tag->request.with_defaults();
|
|
|
|
if (mypathps->empty()) {
|
|
tag->async_responder.Finish(grpc::Status::OK, tag);
|
|
return false;
|
|
}
|
|
|
|
frr::GetResponse response;
|
|
grpc::Status status;
|
|
|
|
// Response: int64 timestamp = 1;
|
|
response.set_timestamp(time(NULL));
|
|
|
|
// Response: DataTree data = 2;
|
|
auto *data = response.mutable_data();
|
|
data->set_encoding(tag->request.encoding());
|
|
status = get_path(data, mypathps->back().c_str(), type,
|
|
encoding2lyd_format(encoding), with_defaults);
|
|
|
|
if (!status.ok()) {
|
|
tag->async_responder.WriteAndFinish(
|
|
response, grpc::WriteOptions(), status, tag);
|
|
return false;
|
|
}
|
|
|
|
mypathps->pop_back();
|
|
if (mypathps->empty()) {
|
|
tag->async_responder.WriteAndFinish(
|
|
response, grpc::WriteOptions(), grpc::Status::OK, tag);
|
|
return false;
|
|
} else {
|
|
tag->async_responder.Write(response, tag);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
grpc::Status HandleUnaryCreateCandidate(
|
|
UnaryRpcState<frr::CreateCandidateRequest, frr::CreateCandidateResponse>
|
|
*tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
struct candidate *candidate = tag->cdb->create_candidate();
|
|
if (!candidate)
|
|
return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
|
|
"Can't create candidate configuration");
|
|
tag->response.set_candidate_id(candidate->id);
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
grpc::Status HandleUnaryDeleteCandidate(
|
|
UnaryRpcState<frr::DeleteCandidateRequest, frr::DeleteCandidateResponse>
|
|
*tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
uint32_t candidate_id = tag->request.candidate_id();
|
|
|
|
grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
|
|
|
|
if (!tag->cdb->contains(candidate_id))
|
|
return grpc::Status(grpc::StatusCode::NOT_FOUND,
|
|
"candidate configuration not found");
|
|
tag->cdb->delete_candidate(candidate_id);
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
grpc::Status HandleUnaryUpdateCandidate(
|
|
UnaryRpcState<frr::UpdateCandidateRequest, frr::UpdateCandidateResponse>
|
|
*tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
uint32_t candidate_id = tag->request.candidate_id();
|
|
|
|
grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
|
|
|
|
struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
|
|
|
|
if (!candidate)
|
|
return grpc::Status(grpc::StatusCode::NOT_FOUND,
|
|
"candidate configuration not found");
|
|
if (candidate->transaction)
|
|
return grpc::Status(
|
|
grpc::StatusCode::FAILED_PRECONDITION,
|
|
"candidate is in the middle of a transaction");
|
|
if (nb_candidate_update(candidate->config) != NB_OK)
|
|
return grpc::Status(grpc::StatusCode::INTERNAL,
|
|
"failed to update candidate configuration");
|
|
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
grpc::Status HandleUnaryEditCandidate(
|
|
UnaryRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse>
|
|
*tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
uint32_t candidate_id = tag->request.candidate_id();
|
|
|
|
grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
|
|
|
|
struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
|
|
if (!candidate)
|
|
return grpc::Status(grpc::StatusCode::NOT_FOUND,
|
|
"candidate configuration not found");
|
|
|
|
struct nb_config *candidate_tmp = nb_config_dup(candidate->config);
|
|
|
|
auto pvs = tag->request.update();
|
|
for (const frr::PathValue &pv : pvs) {
|
|
if (yang_dnode_edit(candidate_tmp->dnode, pv.path(),
|
|
pv.value().c_str()) != 0) {
|
|
nb_config_free(candidate_tmp);
|
|
|
|
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
|
|
"Failed to update \"" + pv.path() +
|
|
"\"");
|
|
}
|
|
}
|
|
|
|
pvs = tag->request.delete_();
|
|
for (const frr::PathValue &pv : pvs) {
|
|
if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) {
|
|
nb_config_free(candidate_tmp);
|
|
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
|
|
"Failed to remove \"" + pv.path() +
|
|
"\"");
|
|
}
|
|
}
|
|
|
|
// No errors, accept all changes.
|
|
nb_config_replace(candidate->config, candidate_tmp, false);
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
grpc::Status HandleUnaryLoadToCandidate(
|
|
UnaryRpcState<frr::LoadToCandidateRequest, frr::LoadToCandidateResponse>
|
|
*tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
uint32_t candidate_id = tag->request.candidate_id();
|
|
|
|
grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
|
|
|
|
// Request: LoadType type = 2;
|
|
int load_type = tag->request.type();
|
|
// Request: DataTree config = 3;
|
|
auto config = tag->request.config();
|
|
|
|
struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
|
|
if (!candidate)
|
|
return grpc::Status(grpc::StatusCode::NOT_FOUND,
|
|
"candidate configuration not found");
|
|
|
|
struct lyd_node *dnode = dnode_from_data_tree(&config, true);
|
|
if (!dnode)
|
|
return grpc::Status(grpc::StatusCode::INTERNAL,
|
|
"Failed to parse the configuration");
|
|
|
|
struct nb_config *loaded_config = nb_config_new(dnode);
|
|
if (load_type == frr::LoadToCandidateRequest::REPLACE)
|
|
nb_config_replace(candidate->config, loaded_config, false);
|
|
else if (nb_config_merge(candidate->config, loaded_config, false) !=
|
|
NB_OK)
|
|
return grpc::Status(grpc::StatusCode::INTERNAL,
|
|
"Failed to merge the loaded configuration");
|
|
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
grpc::Status
|
|
HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
// Request: uint32 candidate_id = 1;
|
|
uint32_t candidate_id = tag->request.candidate_id();
|
|
|
|
grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
|
|
|
|
// Request: Phase phase = 2;
|
|
int phase = tag->request.phase();
|
|
// Request: string comment = 3;
|
|
const std::string comment = tag->request.comment();
|
|
|
|
// Find candidate configuration.
|
|
struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
|
|
if (!candidate)
|
|
return grpc::Status(grpc::StatusCode::NOT_FOUND,
|
|
"candidate configuration not found");
|
|
|
|
int ret = NB_OK;
|
|
uint32_t transaction_id = 0;
|
|
|
|
// Check for misuse of the two-phase commit protocol.
|
|
switch (phase) {
|
|
case frr::CommitRequest::PREPARE:
|
|
case frr::CommitRequest::ALL:
|
|
if (candidate->transaction)
|
|
return grpc::Status(
|
|
grpc::StatusCode::FAILED_PRECONDITION,
|
|
"candidate is in the middle of a transaction");
|
|
break;
|
|
case frr::CommitRequest::ABORT:
|
|
case frr::CommitRequest::APPLY:
|
|
if (!candidate->transaction)
|
|
return grpc::Status(
|
|
grpc::StatusCode::FAILED_PRECONDITION,
|
|
"no transaction in progress");
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
|
|
// Execute the user request.
|
|
struct nb_context context = {};
|
|
context.client = NB_CLIENT_GRPC;
|
|
char errmsg[BUFSIZ] = {0};
|
|
|
|
switch (phase) {
|
|
case frr::CommitRequest::VALIDATE:
|
|
grpc_debug("`-> Performing VALIDATE");
|
|
ret = nb_candidate_validate(&context, candidate->config, errmsg,
|
|
sizeof(errmsg));
|
|
break;
|
|
case frr::CommitRequest::PREPARE:
|
|
grpc_debug("`-> Performing PREPARE");
|
|
ret = nb_candidate_commit_prepare(
|
|
context, candidate->config, comment.c_str(),
|
|
&candidate->transaction, false, false, errmsg,
|
|
sizeof(errmsg));
|
|
break;
|
|
case frr::CommitRequest::ABORT:
|
|
grpc_debug("`-> Performing ABORT");
|
|
nb_candidate_commit_abort(candidate->transaction, errmsg,
|
|
sizeof(errmsg));
|
|
break;
|
|
case frr::CommitRequest::APPLY:
|
|
grpc_debug("`-> Performing APPLY");
|
|
nb_candidate_commit_apply(candidate->transaction, true,
|
|
&transaction_id, errmsg,
|
|
sizeof(errmsg));
|
|
break;
|
|
case frr::CommitRequest::ALL:
|
|
grpc_debug("`-> Performing ALL");
|
|
ret = nb_candidate_commit(context, candidate->config, true,
|
|
comment.c_str(), &transaction_id,
|
|
errmsg, sizeof(errmsg));
|
|
break;
|
|
}
|
|
|
|
// Map northbound error codes to gRPC status codes.
|
|
grpc::Status status;
|
|
switch (ret) {
|
|
case NB_OK:
|
|
status = grpc::Status::OK;
|
|
break;
|
|
case NB_ERR_NO_CHANGES:
|
|
status = grpc::Status(grpc::StatusCode::ABORTED, errmsg);
|
|
break;
|
|
case NB_ERR_LOCKED:
|
|
status = grpc::Status(grpc::StatusCode::UNAVAILABLE, errmsg);
|
|
break;
|
|
case NB_ERR_VALIDATION:
|
|
status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
|
|
errmsg);
|
|
break;
|
|
case NB_ERR_RESOURCE:
|
|
status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
|
|
errmsg);
|
|
break;
|
|
case NB_ERR:
|
|
default:
|
|
status = grpc::Status(grpc::StatusCode::INTERNAL, errmsg);
|
|
break;
|
|
}
|
|
|
|
grpc_debug("`-> Result: %s (message: '%s')",
|
|
nb_err_name((enum nb_error)ret), errmsg);
|
|
|
|
if (ret == NB_OK) {
|
|
// Response: uint32 transaction_id = 1;
|
|
if (transaction_id)
|
|
tag->response.set_transaction_id(transaction_id);
|
|
}
|
|
if (strlen(errmsg) > 0)
|
|
tag->response.set_error_message(errmsg);
|
|
|
|
return status;
|
|
}
|
|
|
|
grpc::Status HandleUnaryLockConfig(
|
|
UnaryRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
if (nb_running_lock(NB_CLIENT_GRPC, NULL))
|
|
return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
|
|
"running configuration is locked already");
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
grpc::Status HandleUnaryUnlockConfig(
|
|
UnaryRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
if (nb_running_unlock(NB_CLIENT_GRPC, NULL))
|
|
return grpc::Status(
|
|
grpc::StatusCode::FAILED_PRECONDITION,
|
|
"failed to unlock the running configuration");
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
static void list_transactions_cb(void *arg, int transaction_id,
|
|
const char *client_name, const char *date,
|
|
const char *comment)
|
|
{
|
|
auto list = static_cast<std::list<
|
|
std::tuple<int, std::string, std::string, std::string>> *>(arg);
|
|
list->push_back(
|
|
std::make_tuple(transaction_id, std::string(client_name),
|
|
std::string(date), std::string(comment)));
|
|
}
|
|
|
|
// Define the context variable type for this streaming handler
|
|
typedef std::list<std::tuple<int, std::string, std::string, std::string>>
|
|
ListTransactionsContextType;
|
|
|
|
bool HandleStreamingListTransactions(
|
|
StreamRpcState<frr::ListTransactionsRequest,
|
|
frr::ListTransactionsResponse,
|
|
ListTransactionsContextType> *tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
auto list = &tag->context;
|
|
if (tag->is_initial_process()) {
|
|
grpc_debug("%s: initialize streaming state", __func__);
|
|
// Fill our context container first time through
|
|
nb_db_transactions_iterate(list_transactions_cb, list);
|
|
list->push_back(std::make_tuple(
|
|
0xFFFF, std::string("fake client"),
|
|
std::string("fake date"), std::string("fake comment")));
|
|
list->push_back(std::make_tuple(0xFFFE,
|
|
std::string("fake client2"),
|
|
std::string("fake date"),
|
|
std::string("fake comment2")));
|
|
}
|
|
|
|
if (list->empty()) {
|
|
tag->async_responder.Finish(grpc::Status::OK, tag);
|
|
return false;
|
|
}
|
|
|
|
auto item = list->back();
|
|
|
|
frr::ListTransactionsResponse response;
|
|
|
|
// Response: uint32 id = 1;
|
|
response.set_id(std::get<0>(item));
|
|
|
|
// Response: string client = 2;
|
|
response.set_client(std::get<1>(item).c_str());
|
|
|
|
// Response: string date = 3;
|
|
response.set_date(std::get<2>(item).c_str());
|
|
|
|
// Response: string comment = 4;
|
|
response.set_comment(std::get<3>(item).c_str());
|
|
|
|
list->pop_back();
|
|
if (list->empty()) {
|
|
tag->async_responder.WriteAndFinish(
|
|
response, grpc::WriteOptions(), grpc::Status::OK, tag);
|
|
return false;
|
|
} else {
|
|
tag->async_responder.Write(response, tag);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
grpc::Status HandleUnaryGetTransaction(
|
|
UnaryRpcState<frr::GetTransactionRequest, frr::GetTransactionResponse>
|
|
*tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
// Request: uint32 transaction_id = 1;
|
|
uint32_t transaction_id = tag->request.transaction_id();
|
|
// Request: Encoding encoding = 2;
|
|
frr::Encoding encoding = tag->request.encoding();
|
|
// Request: bool with_defaults = 3;
|
|
bool with_defaults = tag->request.with_defaults();
|
|
|
|
grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__,
|
|
transaction_id, encoding);
|
|
|
|
struct nb_config *nb_config;
|
|
|
|
// Load configuration from the transactions database.
|
|
nb_config = nb_db_transaction_load(transaction_id);
|
|
if (!nb_config)
|
|
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
|
|
"Transaction not found");
|
|
|
|
// Response: DataTree config = 1;
|
|
auto config = tag->response.mutable_config();
|
|
config->set_encoding(encoding);
|
|
|
|
// Dump data using the requested format.
|
|
if (data_tree_from_dnode(config, nb_config->dnode,
|
|
encoding2lyd_format(encoding), with_defaults)
|
|
!= 0) {
|
|
nb_config_free(nb_config);
|
|
return grpc::Status(grpc::StatusCode::INTERNAL,
|
|
"Failed to dump data");
|
|
}
|
|
|
|
nb_config_free(nb_config);
|
|
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
grpc::Status HandleUnaryExecute(
|
|
UnaryRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
struct nb_node *nb_node;
|
|
struct list *input_list;
|
|
struct list *output_list;
|
|
struct listnode *node;
|
|
struct yang_data *data;
|
|
const char *xpath;
|
|
char errmsg[BUFSIZ] = {0};
|
|
|
|
// Request: string path = 1;
|
|
xpath = tag->request.path().c_str();
|
|
|
|
grpc_debug("%s(path: \"%s\")", __func__, xpath);
|
|
|
|
if (tag->request.path().empty())
|
|
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
|
|
"Data path is empty");
|
|
|
|
nb_node = nb_node_find(xpath);
|
|
if (!nb_node)
|
|
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
|
|
"Unknown data path");
|
|
|
|
input_list = yang_data_list_new();
|
|
output_list = yang_data_list_new();
|
|
|
|
// Read input parameters.
|
|
auto input = tag->request.input();
|
|
for (const frr::PathValue &pv : input) {
|
|
// Request: repeated PathValue input = 2;
|
|
data = yang_data_new(pv.path().c_str(), pv.value().c_str());
|
|
listnode_add(input_list, data);
|
|
}
|
|
|
|
// Execute callback registered for this XPath.
|
|
if (nb_callback_rpc(nb_node, xpath, input_list, output_list, errmsg,
|
|
sizeof(errmsg))
|
|
!= NB_OK) {
|
|
flog_warn(EC_LIB_NB_CB_RPC, "%s: rpc callback failed: %s",
|
|
__func__, xpath);
|
|
list_delete(&input_list);
|
|
list_delete(&output_list);
|
|
|
|
return grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed");
|
|
}
|
|
|
|
// Process output parameters.
|
|
for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) {
|
|
// Response: repeated PathValue output = 1;
|
|
frr::PathValue *pv = tag->response.add_output();
|
|
pv->set_path(data->xpath);
|
|
pv->set_value(data->value);
|
|
}
|
|
|
|
// Release memory.
|
|
list_delete(&input_list);
|
|
list_delete(&output_list);
|
|
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
// ------------------------------------------------------
|
|
// Thread Initialization and Run Functions
|
|
// ------------------------------------------------------
|
|
|
|
|
|
#define REQUEST_NEWRPC(NAME, cdb) \
|
|
do { \
|
|
auto _rpcState = new UnaryRpcState<frr::NAME##Request, \
|
|
frr::NAME##Response>( \
|
|
(cdb), &frr::Northbound::AsyncService::Request##NAME, \
|
|
&HandleUnary##NAME, #NAME); \
|
|
_rpcState->do_request(&service, cq.get(), true); \
|
|
} while (0)
|
|
|
|
#define REQUEST_NEWRPC_STREAMING(NAME) \
|
|
do { \
|
|
auto _rpcState = new StreamRpcState<frr::NAME##Request, \
|
|
frr::NAME##Response, \
|
|
NAME##ContextType>( \
|
|
&frr::Northbound::AsyncService::Request##NAME, \
|
|
&HandleStreaming##NAME, #NAME); \
|
|
_rpcState->do_request(&service, cq.get(), true); \
|
|
} while (0)
|
|
|
|
struct grpc_pthread_attr {
|
|
struct frr_pthread_attr attr;
|
|
unsigned long port;
|
|
};
|
|
|
|
// Capture these objects so we can try to shut down cleanly
|
|
static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER;
|
|
static grpc::Server *s_server;
|
|
|
|
static void *grpc_pthread_start(void *arg)
|
|
{
|
|
struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
|
|
uint port = (uint) reinterpret_cast<intptr_t>(fpt->data);
|
|
|
|
Candidates candidates;
|
|
grpc::ServerBuilder builder;
|
|
std::stringstream server_address;
|
|
frr::Northbound::AsyncService service;
|
|
|
|
frr_pthread_set_name(fpt);
|
|
|
|
server_address << "0.0.0.0:" << port;
|
|
builder.AddListeningPort(server_address.str(),
|
|
grpc::InsecureServerCredentials());
|
|
builder.RegisterService(&service);
|
|
builder.AddChannelArgument(
|
|
GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
|
|
std::unique_ptr<grpc::ServerCompletionQueue> cq =
|
|
builder.AddCompletionQueue();
|
|
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
|
|
s_server = server.get();
|
|
|
|
pthread_mutex_lock(&s_server_lock); // Make coverity happy
|
|
grpc_running = true;
|
|
pthread_mutex_unlock(&s_server_lock); // Make coverity happy
|
|
|
|
/* Schedule unary RPC handlers */
|
|
REQUEST_NEWRPC(GetCapabilities, NULL);
|
|
REQUEST_NEWRPC(CreateCandidate, &candidates);
|
|
REQUEST_NEWRPC(DeleteCandidate, &candidates);
|
|
REQUEST_NEWRPC(UpdateCandidate, &candidates);
|
|
REQUEST_NEWRPC(EditCandidate, &candidates);
|
|
REQUEST_NEWRPC(LoadToCandidate, &candidates);
|
|
REQUEST_NEWRPC(Commit, &candidates);
|
|
REQUEST_NEWRPC(GetTransaction, NULL);
|
|
REQUEST_NEWRPC(LockConfig, NULL);
|
|
REQUEST_NEWRPC(UnlockConfig, NULL);
|
|
REQUEST_NEWRPC(Execute, NULL);
|
|
|
|
/* Schedule streaming RPC handlers */
|
|
REQUEST_NEWRPC_STREAMING(Get);
|
|
REQUEST_NEWRPC_STREAMING(ListTransactions);
|
|
|
|
zlog_notice("gRPC server listening on %s",
|
|
server_address.str().c_str());
|
|
|
|
/* Process inbound RPCs */
|
|
bool ok;
|
|
void *tag;
|
|
while (true) {
|
|
if (!cq->Next(&tag, &ok)) {
|
|
grpc_debug("%s: CQ empty exiting", __func__);
|
|
break;
|
|
}
|
|
|
|
grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag,
|
|
ok);
|
|
|
|
if (!ok) {
|
|
delete static_cast<RpcStateBase *>(tag);
|
|
break;
|
|
}
|
|
|
|
RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
|
|
if (rpc->get_state() != FINISH)
|
|
rpc->run(&service, cq.get());
|
|
else {
|
|
grpc_debug("%s RPC FINISH -> [delete]", rpc->name);
|
|
delete rpc;
|
|
}
|
|
}
|
|
|
|
/* This was probably done for us to get here, but let's be safe */
|
|
pthread_mutex_lock(&s_server_lock);
|
|
grpc_running = false;
|
|
if (s_server) {
|
|
grpc_debug("%s: shutdown server and CQ", __func__);
|
|
server->Shutdown();
|
|
s_server = NULL;
|
|
}
|
|
pthread_mutex_unlock(&s_server_lock);
|
|
|
|
grpc_debug("%s: shutting down CQ", __func__);
|
|
cq->Shutdown();
|
|
|
|
grpc_debug("%s: draining the CQ", __func__);
|
|
while (cq->Next(&tag, &ok)) {
|
|
grpc_debug("%s: drain tag %p", __func__, tag);
|
|
delete static_cast<RpcStateBase *>(tag);
|
|
}
|
|
|
|
zlog_info("%s: exiting from grpc pthread", __func__);
|
|
return NULL;
|
|
}
|
|
|
|
|
|
static int frr_grpc_init(uint port)
|
|
{
|
|
struct frr_pthread_attr attr = {
|
|
.start = grpc_pthread_start,
|
|
.stop = NULL,
|
|
};
|
|
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc");
|
|
fpt->data = reinterpret_cast<void *>((intptr_t)port);
|
|
|
|
/* Create a pthread for gRPC since it runs its own event loop. */
|
|
if (frr_pthread_run(fpt, NULL) < 0) {
|
|
flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s",
|
|
__func__, safe_strerror(errno));
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int frr_grpc_finish(void)
|
|
{
|
|
grpc_debug("%s: entered", __func__);
|
|
|
|
if (!fpt)
|
|
return 0;
|
|
|
|
/*
|
|
* Shut the server down here in main thread. This will cause the wait on
|
|
* the completion queue (cq.Next()) to exit and cleanup everything else.
|
|
*/
|
|
pthread_mutex_lock(&s_server_lock);
|
|
grpc_running = false;
|
|
if (s_server) {
|
|
grpc_debug("%s: shutdown server", __func__);
|
|
s_server->Shutdown();
|
|
s_server = NULL;
|
|
}
|
|
pthread_mutex_unlock(&s_server_lock);
|
|
|
|
grpc_debug("%s: joining and destroy grpc thread", __func__);
|
|
pthread_join(fpt->thread, NULL);
|
|
frr_pthread_destroy(fpt);
|
|
|
|
// Fix protobuf 'memory leaks' during shutdown.
|
|
// https://groups.google.com/g/protobuf/c/4y_EmQiCGgs
|
|
google::protobuf::ShutdownProtobufLibrary();
|
|
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* This is done this way because module_init and module_late_init are both
|
|
* called during daemon pre-fork initialization. Because the GRPC library
|
|
* spawns threads internally, we need to delay initializing it until after
|
|
* fork. This is done by scheduling this init function as an event task, since
|
|
* the event loop doesn't run until after fork.
|
|
*/
|
|
static void frr_grpc_module_very_late_init(struct thread *thread)
|
|
{
|
|
const char *args = THIS_MODULE->load_args;
|
|
uint port = GRPC_DEFAULT_PORT;
|
|
|
|
if (args) {
|
|
port = std::stoul(args);
|
|
if (port < 1024 || port > UINT16_MAX) {
|
|
flog_err(EC_LIB_GRPC_INIT,
|
|
"%s: port number must be between 1025 and %d",
|
|
__func__, UINT16_MAX);
|
|
goto error;
|
|
}
|
|
}
|
|
|
|
if (frr_grpc_init(port) < 0)
|
|
goto error;
|
|
|
|
return;
|
|
|
|
error:
|
|
flog_err(EC_LIB_GRPC_INIT, "failed to initialize the gRPC module");
|
|
}
|
|
|
|
static int frr_grpc_module_late_init(struct thread_master *tm)
|
|
{
|
|
main_master = tm;
|
|
hook_register(frr_fini, frr_grpc_finish);
|
|
thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL);
|
|
return 0;
|
|
}
|
|
|
|
static int frr_grpc_module_init(void)
|
|
{
|
|
hook_register(frr_late_init, frr_grpc_module_late_init);
|
|
|
|
return 0;
|
|
}
|
|
|
|
FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION,
|
|
.description = "FRR gRPC northbound module",
|
|
.init = frr_grpc_module_init, );
|