From 0edcb505a347c8dde01c9217f627715273d0ed0f Mon Sep 17 00:00:00 2001 From: Chirag Shah Date: Wed, 22 Apr 2020 16:09:15 -0700 Subject: [PATCH 1/2] lib: use frr_pthread to spawn grpc pthread start grpc thread with frr_pthread library callbacks to integrate with rcu infrastructure. If a thread is created using native pthread callbacks and if zlog is used then it leads to crash. Signed-off-by: Chirag Shah --- lib/northbound_grpc.cpp | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index 66bf05c1ab..fd4101d679 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -28,6 +28,7 @@ #include "lib_errors.h" #include "northbound.h" #include "northbound_db.h" +#include "frr_pthread.h" #include #include @@ -36,6 +37,8 @@ #define GRPC_DEFAULT_PORT 50051 +static void *grpc_pthread_start(void *arg); + /* * NOTE: we can't use the FRR debugging infrastructure here since it uses * atomics and C++ has a different atomics API. Enable gRPC debugging @@ -43,7 +46,13 @@ */ static bool nb_dbg_client_grpc = 1; -static pthread_t grpc_pthread; +static struct frr_pthread *fpt; + +/* Default frr_pthread attributes */ +static const struct frr_pthread_attr attr = { + .start = grpc_pthread_start, + .stop = NULL, +}; class NorthboundImpl final : public frr::Northbound::Service { @@ -844,10 +853,13 @@ class NorthboundImpl final : public frr::Northbound::Service static void *grpc_pthread_start(void *arg) { - unsigned long *port = static_cast(arg); + struct frr_pthread *fpt = static_cast(arg); + unsigned long *port = static_cast(fpt->data); NorthboundImpl service; std::stringstream server_address; + frr_pthread_set_name(fpt); + server_address << "0.0.0.0:" << *port; grpc::ServerBuilder builder; @@ -867,19 +879,24 @@ static void *grpc_pthread_start(void *arg) static int frr_grpc_init(unsigned long *port) { + fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc"); + fpt->data = static_cast(port); + /* Create a pthread for gRPC since it runs its own event loop. */ - if (pthread_create(&grpc_pthread, NULL, grpc_pthread_start, port)) { + if (frr_pthread_run(fpt, NULL) < 0) { flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s", __func__, safe_strerror(errno)); return -1; } - pthread_detach(grpc_pthread); + pthread_detach(fpt->thread); return 0; } static int frr_grpc_finish(void) { + if (fpt) + frr_pthread_destroy(fpt); // TODO: cancel the gRPC pthreads gracefully. return 0; From ecf9fb30b75370c1354d07ae215c4950e5131dd8 Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Wed, 6 May 2020 19:36:19 -0400 Subject: [PATCH 2/2] lib: convert GRPC plugin to async Synchronous GRPC services are called from arbitrary threads. This makes access to anything outside the GRPC module unsafe. We need to convert the plugin to use the async model that allows us to control our own threads. Signed-off-by: Quentin Young --- lib/northbound_grpc.cpp | 1363 ++++++++++++++++++++++++++------------- 1 file changed, 917 insertions(+), 446 deletions(-) diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index fd4101d679..96a11d67bd 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -54,12 +54,70 @@ static const struct frr_pthread_attr attr = { .stop = NULL, }; -class NorthboundImpl final : public frr::Northbound::Service +enum CallStatus { CREATE, PROCESS, FINISH }; + +/* Thanks gooble */ +class RpcStateBase +{ + public: + virtual void doCallback() = 0; +}; + +class NorthboundImpl; + +template class RpcState : RpcStateBase +{ + public: + RpcState(NorthboundImpl *svc, + void (NorthboundImpl::*cb)(RpcState *)) + : callback(cb), responder(&ctx), async_responder(&ctx), + service(svc){}; + + void doCallback() override + { + (service->*callback)(this); + } + + grpc::ServerContext ctx; + Q request; + S response; + grpc::ServerAsyncResponseWriter responder; + grpc::ServerAsyncWriter async_responder; + + NorthboundImpl *service; + void (NorthboundImpl::*callback)(RpcState *); + + void *context; + CallStatus state = CREATE; +}; + +#define REQUEST_RPC(NAME) \ + do { \ + auto _rpcState = \ + new RpcState( \ + this, &NorthboundImpl::Handle##NAME); \ + _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \ + &_rpcState->responder, _cq, _cq, \ + _rpcState); \ + } while (0) + +#define REQUEST_RPC_STREAMING(NAME) \ + do { \ + auto _rpcState = \ + new RpcState( \ + this, &NorthboundImpl::Handle##NAME); \ + _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \ + &_rpcState->async_responder, _cq, _cq, \ + _rpcState); \ + } while (0) + +class NorthboundImpl { public: NorthboundImpl(void) { _nextCandidateId = 0; + _service = new frr::Northbound::AsyncService(); } ~NorthboundImpl(void) @@ -70,61 +128,136 @@ class NorthboundImpl final : public frr::Northbound::Service delete_candidate(&it->second); } - grpc::Status - GetCapabilities(grpc::ServerContext *context, - frr::GetCapabilitiesRequest const *request, - frr::GetCapabilitiesResponse *response) override + void Run(unsigned long port) + { + grpc::ServerBuilder builder; + std::stringstream server_address; + + server_address << "0.0.0.0:" << port; + + builder.AddListeningPort(server_address.str(), + grpc::InsecureServerCredentials()); + builder.RegisterService(_service); + + auto cq = builder.AddCompletionQueue(); + _cq = cq.get(); + auto _server = builder.BuildAndStart(); + + /* Schedule all RPC handlers */ + REQUEST_RPC(GetCapabilities); + REQUEST_RPC(CreateCandidate); + REQUEST_RPC(DeleteCandidate); + REQUEST_RPC(UpdateCandidate); + REQUEST_RPC(EditCandidate); + REQUEST_RPC(LoadToCandidate); + REQUEST_RPC(Commit); + REQUEST_RPC(GetTransaction); + REQUEST_RPC(LockConfig); + REQUEST_RPC(UnlockConfig); + REQUEST_RPC(Execute); + REQUEST_RPC_STREAMING(Get); + REQUEST_RPC_STREAMING(ListTransactions); + + zlog_notice("gRPC server listening on %s", + server_address.str().c_str()); + + /* Process inbound RPCs */ + void *tag; + bool ok; + while (true) { + _cq->Next(&tag, &ok); + GPR_ASSERT(ok); + static_cast(tag)->doCallback(); + tag = nullptr; + } + } + + void HandleGetCapabilities(RpcState *tag) { if (nb_dbg_client_grpc) zlog_debug("received RPC GetCapabilities()"); - // Response: string frr_version = 1; - response->set_frr_version(FRR_VERSION); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(GetCapabilities); + tag->state = PROCESS; + } + case PROCESS: { - // Response: bool rollback_support = 2; + // Response: string frr_version = 1; + tag->response.set_frr_version(FRR_VERSION); + + // Response: bool rollback_support = 2; #ifdef HAVE_CONFIG_ROLLBACKS - response->set_rollback_support(true); + tag->response.set_rollback_support(true); #else - response->set_rollback_support(false); + tag->response.set_rollback_support(false); #endif - // Response: repeated ModuleData supported_modules = 3; - struct yang_module *module; - RB_FOREACH (module, yang_modules, &yang_modules) { - auto m = response->add_supported_modules(); + // Response: repeated ModuleData supported_modules = 3; + struct yang_module *module; + RB_FOREACH (module, yang_modules, &yang_modules) { + auto m = tag->response.add_supported_modules(); - m->set_name(module->name); - if (module->info->rev_size) - m->set_revision(module->info->rev[0].date); - m->set_organization(module->info->org); + m->set_name(module->name); + if (module->info->rev_size) + m->set_revision( + module->info->rev[0].date); + m->set_organization(module->info->org); + } + + // Response: repeated Encoding supported_encodings = 4; + tag->response.add_supported_encodings(frr::JSON); + tag->response.add_supported_encodings(frr::XML); + + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; } - - // Response: repeated Encoding supported_encodings = 4; - response->add_supported_encodings(frr::JSON); - response->add_supported_encodings(frr::XML); - - return grpc::Status::OK; } - grpc::Status Get(grpc::ServerContext *context, - frr::GetRequest const *request, - grpc::ServerWriter *writer) override + void HandleGet(RpcState *tag) { - // Request: DataType type = 1; - int type = request->type(); - // Request: Encoding encoding = 2; - frr::Encoding encoding = request->encoding(); - // Request: bool with_defaults = 3; - bool with_defaults = request->with_defaults(); + switch (tag->state) { + case CREATE: { + auto mypaths = new std::list(); + tag->context = mypaths; + auto paths = tag->request.path(); + for (const std::string &path : paths) { + mypaths->push_back(std::string(path)); + } + REQUEST_RPC_STREAMING(Get); + tag->state = PROCESS; + } + case PROCESS: { + // Request: DataType type = 1; + int type = tag->request.type(); + // Request: Encoding encoding = 2; + frr::Encoding encoding = tag->request.encoding(); + // Request: bool with_defaults = 3; + bool with_defaults = tag->request.with_defaults(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC Get(type: %u, encoding: %u, with_defaults: %u)", + type, encoding, with_defaults); + + auto mypaths = static_cast *>( + tag->context); + + if (mypaths->empty()) { + tag->async_responder.Finish(grpc::Status::OK, + tag); + tag->state = FINISH; + return; + } - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC Get(type: %u, encoding: %u, with_defaults: %u)", - type, encoding, with_defaults); - // Request: repeated string path = 4; - auto paths = request->path(); - for (const std::string &path : paths) { frr::GetResponse response; grpc::Status status; @@ -133,391 +266,715 @@ class NorthboundImpl final : public frr::Northbound::Service // Response: DataTree data = 2; auto *data = response.mutable_data(); - data->set_encoding(request->encoding()); - status = get_path(data, path, type, + data->set_encoding(tag->request.encoding()); + status = get_path(data, mypaths->back().c_str(), type, encoding2lyd_format(encoding), with_defaults); // Something went wrong... - if (!status.ok()) - return status; + if (!status.ok()) { + tag->async_responder.WriteAndFinish( + response, grpc::WriteOptions(), status, + tag); + tag->state = FINISH; + return; + } - writer->Write(response); + mypaths->pop_back(); + + tag->async_responder.Write(response, tag); + + break; } + case FINISH: + if (nb_dbg_client_grpc) + zlog_debug("received RPC Get() end"); - if (nb_dbg_client_grpc) - zlog_debug("received RPC Get() end"); - - return grpc::Status::OK; + delete static_cast *>( + tag->context); + delete tag; + } } - grpc::Status - CreateCandidate(grpc::ServerContext *context, - frr::CreateCandidateRequest const *request, - frr::CreateCandidateResponse *response) override + void HandleCreateCandidate(RpcState *tag) { if (nb_dbg_client_grpc) zlog_debug("received RPC CreateCandidate()"); - struct candidate *candidate = create_candidate(); - if (!candidate) - return grpc::Status( - grpc::StatusCode::RESOURCE_EXHAUSTED, - "Can't create candidate configuration"); - - // Response: uint32 candidate_id = 1; - response->set_candidate_id(candidate->id); - - return grpc::Status::OK; - } - - grpc::Status - DeleteCandidate(grpc::ServerContext *context, - frr::DeleteCandidateRequest const *request, - frr::DeleteCandidateResponse *response) override - { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC DeleteCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); - - delete_candidate(candidate); - - return grpc::Status::OK; - } - - grpc::Status - UpdateCandidate(grpc::ServerContext *context, - frr::UpdateCandidateRequest const *request, - frr::UpdateCandidateResponse *response) override - { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC UpdateCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); - - if (candidate->transaction) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "candidate is in the middle of a transaction"); - - if (nb_candidate_update(candidate->config) != NB_OK) - return grpc::Status( - grpc::StatusCode::INTERNAL, - "failed to update candidate configuration"); - - return grpc::Status::OK; - } - - grpc::Status - EditCandidate(grpc::ServerContext *context, - frr::EditCandidateRequest const *request, - frr::EditCandidateResponse *response) override - { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC EditCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); - - // Create a copy of the candidate. For consistency, we need to - // ensure that either all changes are accepted or none are (in - // the event of an error). - struct nb_config *candidate_tmp = - nb_config_dup(candidate->config); - - auto pvs = request->update(); - for (const frr::PathValue &pv : pvs) { - if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), - pv.value()) - != 0) { - nb_config_free(candidate_tmp); - return grpc::Status( - grpc::StatusCode::INVALID_ARGUMENT, - "Failed to update \"" + pv.path() - + "\""); - } + switch (tag->state) { + case CREATE: { + REQUEST_RPC(CreateCandidate); + tag->state = PROCESS; } - - pvs = request->delete_(); - for (const frr::PathValue &pv : pvs) { - if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) - != 0) { - nb_config_free(candidate_tmp); - return grpc::Status( - grpc::StatusCode::INVALID_ARGUMENT, - "Failed to remove \"" + pv.path() - + "\""); + case PROCESS: { + struct candidate *candidate = create_candidate(); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + RESOURCE_EXHAUSTED, + "Can't create candidate configuration"), + tag); + } else { + tag->response.set_candidate_id(candidate->id); + tag->responder.Finish(tag->response, + grpc::Status::OK, tag); } + + tag->state = FINISH; + + break; + } + case FINISH: + delete tag; } - - // No errors, accept all changes. - nb_config_replace(candidate->config, candidate_tmp, false); - - return grpc::Status::OK; } - grpc::Status - LoadToCandidate(grpc::ServerContext *context, - frr::LoadToCandidateRequest const *request, - frr::LoadToCandidateResponse *response) override + void HandleDeleteCandidate(RpcState *tag) { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - // Request: LoadType type = 2; - int load_type = request->type(); - // Request: DataTree config = 3; - auto config = request->config(); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(DeleteCandidate); + tag->state = PROCESS; + } + case PROCESS: { - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC LoadToCandidate(candidate_id: %u)", - candidate_id); + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC DeleteCandidate(candidate_id: %u)", + candidate_id); - struct lyd_node *dnode = dnode_from_data_tree(&config, true); - if (!dnode) - return grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to parse the configuration"); + struct candidate *candidate = + get_candidate(candidate_id); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; + } else { + delete_candidate(candidate); + tag->responder.Finish(tag->response, + grpc::Status::OK, tag); + tag->state = FINISH; + return; + } + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } + } - struct nb_config *loaded_config = nb_config_new(dnode); + void HandleUpdateCandidate(RpcState *tag) + { + switch (tag->state) { + case CREATE: { + REQUEST_RPC(UpdateCandidate); + tag->state = PROCESS; + } + case PROCESS: { - if (load_type == frr::LoadToCandidateRequest::REPLACE) - nb_config_replace(candidate->config, loaded_config, + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC UpdateCandidate(candidate_id: %u)", + candidate_id); + + struct candidate *candidate = + get_candidate(candidate_id); + + if (!candidate) + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + else if (candidate->transaction) + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "candidate is in the middle of a transaction"), + tag); + else if (nb_candidate_update(candidate->config) + != NB_OK) + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::INTERNAL, + "failed to update candidate configuration"), + tag); + + else + tag->responder.Finish(tag->response, + grpc::Status::OK, tag); + + tag->state = FINISH; + + break; + } + case FINISH: + delete tag; + } + } + + void HandleEditCandidate(RpcState *tag) + { + switch (tag->state) { + case CREATE: { + REQUEST_RPC(EditCandidate); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC EditCandidate(candidate_id: %u)", + candidate_id); + + struct candidate *candidate = + get_candidate(candidate_id); + + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + break; + } + + struct nb_config *candidate_tmp = + nb_config_dup(candidate->config); + + auto pvs = tag->request.update(); + for (const frr::PathValue &pv : pvs) { + if (yang_dnode_edit(candidate_tmp->dnode, + pv.path(), pv.value()) + != 0) { + nb_config_free(candidate_tmp); + + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + INVALID_ARGUMENT, + "Failed to update \"" + + pv.path() + + "\""), + tag); + + tag->state = FINISH; + return; + } + } + + pvs = tag->request.delete_(); + for (const frr::PathValue &pv : pvs) { + if (yang_dnode_delete(candidate_tmp->dnode, + pv.path()) + != 0) { + nb_config_free(candidate_tmp); + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + INVALID_ARGUMENT, + "Failed to remove \"" + + pv.path() + + "\""), + tag); + tag->state = FINISH; + return; + } + } + + // No errors, accept all changes. + nb_config_replace(candidate->config, candidate_tmp, false); - else if (nb_config_merge(candidate->config, loaded_config, - false) - != NB_OK) - return grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to merge the loaded configuration"); - return grpc::Status::OK; + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + + tag->state = FINISH; + + break; + } + case FINISH: + delete tag; + } } - grpc::Status Commit(grpc::ServerContext *context, - frr::CommitRequest const *request, - frr::CommitResponse *response) override + void HandleLoadToCandidate(RpcState *tag) { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = request->candidate_id(); - // Request: Phase phase = 2; - int phase = request->phase(); - // Request: string comment = 3; - const std::string comment = request->comment(); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(LoadToCandidate); + tag->state = PROCESS; + } + case PROCESS: { - if (nb_dbg_client_grpc) - zlog_debug("received RPC Commit(candidate_id: %u)", - candidate_id); + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); - // Find candidate configuration. - struct candidate *candidate = get_candidate(candidate_id); - if (!candidate) - return grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"); + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC LoadToCandidate(candidate_id: %u)", + candidate_id); - int ret = NB_OK; - uint32_t transaction_id = 0; + // Request: LoadType type = 2; + int load_type = tag->request.type(); + // Request: DataTree config = 3; + auto config = tag->request.config(); - // Check for misuse of the two-phase commit protocol. - switch (phase) { - case frr::CommitRequest::PREPARE: - case frr::CommitRequest::ALL: - if (candidate->transaction) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "pending transaction in progress"); - break; - case frr::CommitRequest::ABORT: - case frr::CommitRequest::APPLY: - if (!candidate->transaction) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "no transaction in progress"); - break; - default: + + struct candidate *candidate = + get_candidate(candidate_id); + + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; + } + + struct lyd_node *dnode = + dnode_from_data_tree(&config, true); + if (!dnode) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::INTERNAL, + "Failed to parse the configuration"), + tag); + tag->state = FINISH; + return; + } + + struct nb_config *loaded_config = nb_config_new(dnode); + + if (load_type == frr::LoadToCandidateRequest::REPLACE) + nb_config_replace(candidate->config, + loaded_config, false); + else if (nb_config_merge(candidate->config, + loaded_config, false) + != NB_OK) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::INTERNAL, + "Failed to merge the loaded configuration"), + tag); + tag->state = FINISH; + return; + } + + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; break; } - - // Execute the user request. - switch (phase) { - case frr::CommitRequest::VALIDATE: - ret = nb_candidate_validate(candidate->config); - break; - case frr::CommitRequest::PREPARE: - ret = nb_candidate_commit_prepare( - candidate->config, NB_CLIENT_GRPC, NULL, - comment.c_str(), &candidate->transaction); - break; - case frr::CommitRequest::ABORT: - nb_candidate_commit_abort(candidate->transaction); - break; - case frr::CommitRequest::APPLY: - nb_candidate_commit_apply(candidate->transaction, true, - &transaction_id); - break; - case frr::CommitRequest::ALL: - ret = nb_candidate_commit( - candidate->config, NB_CLIENT_GRPC, NULL, true, - comment.c_str(), &transaction_id); - break; + case FINISH: + delete tag; } - - // Map northbound error codes to gRPC error codes. - switch (ret) { - case NB_ERR_NO_CHANGES: - return grpc::Status( - grpc::StatusCode::ABORTED, - "No configuration changes detected"); - case NB_ERR_LOCKED: - return grpc::Status( - grpc::StatusCode::UNAVAILABLE, - "There's already a transaction in progress"); - case NB_ERR_VALIDATION: - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Validation error"); - case NB_ERR_RESOURCE: - return grpc::Status( - grpc::StatusCode::RESOURCE_EXHAUSTED, - "Failed do allocate resources"); - case NB_ERR: - return grpc::Status(grpc::StatusCode::INTERNAL, - "Internal error"); - default: - break; - } - - // Response: uint32 transaction_id = 1; - if (transaction_id) - response->set_transaction_id(transaction_id); - - return grpc::Status::OK; } - grpc::Status - ListTransactions(grpc::ServerContext *context, - frr::ListTransactionsRequest const *request, - grpc::ServerWriter - *writer) override + void + HandleCommit(RpcState *tag) + { + switch (tag->state) { + case CREATE: { + REQUEST_RPC(Commit); + tag->state = PROCESS; + } + case PROCESS: { + + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC Commit(candidate_id: %u)", + candidate_id); + + // Request: Phase phase = 2; + int phase = tag->request.phase(); + // Request: string comment = 3; + const std::string comment = tag->request.comment(); + + // Find candidate configuration. + struct candidate *candidate = + get_candidate(candidate_id); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; + } + + int ret = NB_OK; + uint32_t transaction_id = 0; + + // Check for misuse of the two-phase commit protocol. + switch (phase) { + case frr::CommitRequest::PREPARE: + case frr::CommitRequest::ALL: + if (candidate->transaction) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "candidate is in the middle of a transaction"), + tag); + tag->state = FINISH; + return; + } + break; + case frr::CommitRequest::ABORT: + case frr::CommitRequest::APPLY: + if (!candidate->transaction) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "no transaction in progress"), + tag); + tag->state = FINISH; + return; + } + break; + default: + break; + } + + + // Execute the user request. + switch (phase) { + case frr::CommitRequest::VALIDATE: + ret = nb_candidate_validate(candidate->config); + break; + case frr::CommitRequest::PREPARE: + ret = nb_candidate_commit_prepare( + candidate->config, NB_CLIENT_GRPC, NULL, + comment.c_str(), + &candidate->transaction); + break; + case frr::CommitRequest::ABORT: + nb_candidate_commit_abort( + candidate->transaction); + break; + case frr::CommitRequest::APPLY: + nb_candidate_commit_apply( + candidate->transaction, true, + &transaction_id); + break; + case frr::CommitRequest::ALL: + ret = nb_candidate_commit( + candidate->config, NB_CLIENT_GRPC, NULL, + true, comment.c_str(), &transaction_id); + break; + } + + // Map northbound error codes to gRPC error codes. + switch (ret) { + case NB_ERR_NO_CHANGES: + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::ABORTED, + "No configuration changes detected"), + tag); + tag->state = FINISH; + return; + case NB_ERR_LOCKED: + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::UNAVAILABLE, + "There's already a transaction in progress"), + tag); + tag->state = FINISH; + return; + case NB_ERR_VALIDATION: + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Validation error"), + tag); + tag->state = FINISH; + return; + case NB_ERR_RESOURCE: + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + RESOURCE_EXHAUSTED, + "Failed do allocate resources"), + tag); + tag->state = FINISH; + return; + case NB_ERR: + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, + "Internal error"), + tag); + tag->state = FINISH; + return; + default: + break; + } + + // Response: uint32 transaction_id = 1; + if (transaction_id) + tag->response.set_transaction_id( + transaction_id); + + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + + break; + } + case FINISH: + delete tag; + } + } + + void + HandleListTransactions(RpcState *tag) { if (nb_dbg_client_grpc) zlog_debug("received RPC ListTransactions()"); - nb_db_transactions_iterate(list_transactions_cb, writer); - - return grpc::Status::OK; - } - - grpc::Status - GetTransaction(grpc::ServerContext *context, - frr::GetTransactionRequest const *request, - frr::GetTransactionResponse *response) override - { - struct nb_config *nb_config; - - // Request: uint32 transaction_id = 1; - uint32_t transaction_id = request->transaction_id(); - // Request: Encoding encoding = 2; - frr::Encoding encoding = request->encoding(); - // Request: bool with_defaults = 3; - bool with_defaults = request->with_defaults(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC GetTransaction(transaction_id: %u, encoding: %u)", - transaction_id, encoding); - - // Load configuration from the transactions database. - nb_config = nb_db_transaction_load(transaction_id); - if (!nb_config) - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Transaction not found"); - - // Response: DataTree config = 1; - auto config = response->mutable_config(); - config->set_encoding(encoding); - - // Dump data using the requested format. - if (data_tree_from_dnode(config, nb_config->dnode, - encoding2lyd_format(encoding), - with_defaults) - != 0) { - nb_config_free(nb_config); - return grpc::Status(grpc::StatusCode::INTERNAL, - "Failed to dump data"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC_STREAMING(ListTransactions); + tag->context = new std::list>(); + nb_db_transactions_iterate(list_transactions_cb, + tag->context); + tag->state = PROCESS; } + case PROCESS: { + auto list = static_cast> *>( + tag->context); + if (list->empty()) { + tag->async_responder.Finish(grpc::Status::OK, + tag); + tag->state = FINISH; + return; + } + auto item = list->back(); - nb_config_free(nb_config); - return grpc::Status::OK; + frr::ListTransactionsResponse response; + + // Response: uint32 id = 1; + response.set_id(std::get<0>(item)); + + // Response: string client = 2; + response.set_client(std::get<1>(item).c_str()); + + // Response: string date = 3; + response.set_date(std::get<2>(item).c_str()); + + // Response: string comment = 4; + response.set_comment(std::get<3>(item).c_str()); + + list->pop_back(); + + tag->async_responder.Write(response, tag); + break; + } + case FINISH: + delete static_cast> *>( + tag->context); + delete tag; + } } - grpc::Status LockConfig(grpc::ServerContext *context, - frr::LockConfigRequest const *request, - frr::LockConfigResponse *response) override + void HandleGetTransaction(RpcState *tag) { - if (nb_dbg_client_grpc) - zlog_debug("received RPC LockConfig()"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(GetTransaction); + tag->state = PROCESS; + } + case PROCESS: { + // Request: uint32 transaction_id = 1; + uint32_t transaction_id = tag->request.transaction_id(); + // Request: Encoding encoding = 2; + frr::Encoding encoding = tag->request.encoding(); + // Request: bool with_defaults = 3; + bool with_defaults = tag->request.with_defaults(); - if (nb_running_lock(NB_CLIENT_GRPC, NULL)) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "running configuration is locked already"); + if (nb_dbg_client_grpc) + zlog_debug( + "received RPC GetTransaction(transaction_id: %u, encoding: %u)", + transaction_id, encoding); - return grpc::Status::OK; + struct nb_config *nb_config; + + // Load configuration from the transactions database. + nb_config = nb_db_transaction_load(transaction_id); + if (!nb_config) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Transaction not found"), + tag); + tag->state = FINISH; + return; + } + + // Response: DataTree config = 1; + auto config = tag->response.mutable_config(); + config->set_encoding(encoding); + + // Dump data using the requested format. + if (data_tree_from_dnode(config, nb_config->dnode, + encoding2lyd_format(encoding), + with_defaults) + != 0) { + nb_config_free(nb_config); + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to dump data"), + tag); + tag->state = FINISH; + return; + } + + nb_config_free(nb_config); + + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } } - grpc::Status UnlockConfig(grpc::ServerContext *context, - frr::UnlockConfigRequest const *request, - frr::UnlockConfigResponse *response) override + void HandleLockConfig( + RpcState *tag) { - if (nb_dbg_client_grpc) - zlog_debug("received RPC UnlockConfig()"); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(LockConfig); + tag->state = PROCESS; + } + case PROCESS: { + auto rpcState = new RpcState( + this, &NorthboundImpl::HandleLockConfig); + _service->RequestLockConfig( + &rpcState->ctx, &rpcState->request, + &rpcState->responder, _cq, _cq, rpcState); - if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) - return grpc::Status( - grpc::StatusCode::FAILED_PRECONDITION, - "failed to unlock the running configuration"); + if (nb_dbg_client_grpc) + zlog_debug("received RPC LockConfig()"); - return grpc::Status::OK; + if (nb_running_lock(NB_CLIENT_GRPC, NULL)) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "running configuration is locked already"), + tag); + tag->state = FINISH; + return; + } + + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } } - grpc::Status Execute(grpc::ServerContext *context, - frr::ExecuteRequest const *request, - frr::ExecuteResponse *response) override + void HandleUnlockConfig(RpcState *tag) + { + switch (tag->state) { + case CREATE: { + REQUEST_RPC(UnlockConfig); + tag->state = PROCESS; + } + case PROCESS: { + + if (nb_dbg_client_grpc) + zlog_debug("received RPC UnlockConfig()"); + + if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode:: + FAILED_PRECONDITION, + "failed to unlock the running configuration"), + tag); + tag->state = FINISH; + return; + } + + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; + } + case FINISH: + delete tag; + } + } + + void + HandleExecute(RpcState *tag) { struct nb_node *nb_node; struct list *input_list; @@ -526,61 +983,100 @@ class NorthboundImpl final : public frr::Northbound::Service struct yang_data *data; const char *xpath; - // Request: string path = 1; - xpath = request->path().c_str(); - - if (nb_dbg_client_grpc) - zlog_debug("received RPC Execute(path: \"%s\")", xpath); - - if (request->path().empty()) - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Data path is empty"); - - nb_node = nb_node_find(xpath); - if (!nb_node) - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "Unknown data path"); - - input_list = yang_data_list_new(); - output_list = yang_data_list_new(); - - // Read input parameters. - auto input = request->input(); - for (const frr::PathValue &pv : input) { - // Request: repeated PathValue input = 2; - data = yang_data_new(pv.path().c_str(), - pv.value().c_str()); - listnode_add(input_list, data); + switch (tag->state) { + case CREATE: { + REQUEST_RPC(Execute); + tag->state = PROCESS; } + case PROCESS: { - // Execute callback registered for this XPath. - if (nb_callback_rpc(nb_node, xpath, input_list, output_list) - != NB_OK) { - flog_warn(EC_LIB_NB_CB_RPC, - "%s: rpc callback failed: %s", __func__, - xpath); + // Request: string path = 1; + xpath = tag->request.path().c_str(); + + if (nb_dbg_client_grpc) + zlog_debug("received RPC Execute(path: \"%s\")", + xpath); + + if (tag->request.path().empty()) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Data path is empty"), + tag); + tag->state = FINISH; + return; + } + + nb_node = nb_node_find(xpath); + if (!nb_node) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode:: + INVALID_ARGUMENT, + "Unknown data path"), + tag); + tag->state = FINISH; + return; + } + + input_list = yang_data_list_new(); + output_list = yang_data_list_new(); + + // Read input parameters. + auto input = tag->request.input(); + for (const frr::PathValue &pv : input) { + // Request: repeated PathValue input = 2; + data = yang_data_new(pv.path().c_str(), + pv.value().c_str()); + listnode_add(input_list, data); + } + + // Execute callback registered for this XPath. + if (nb_callback_rpc(nb_node, xpath, input_list, + output_list) + != NB_OK) { + flog_warn(EC_LIB_NB_CB_RPC, + "%s: rpc callback failed: %s", + __func__, xpath); + list_delete(&input_list); + list_delete(&output_list); + + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, + "RPC failed"), + tag); + tag->state = FINISH; + return; + } + + // Process output parameters. + for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { + // Response: repeated PathValue output = 1; + frr::PathValue *pv = tag->response.add_output(); + pv->set_path(data->xpath); + pv->set_value(data->value); + } + + // Release memory. list_delete(&input_list); list_delete(&output_list); - return grpc::Status(grpc::StatusCode::INTERNAL, - "RPC failed"); + + tag->responder.Finish(tag->response, grpc::Status::OK, + tag); + tag->state = FINISH; + break; } - - // Process output parameters. - for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { - // Response: repeated PathValue output = 1; - frr::PathValue *pv = response->add_output(); - pv->set_path(data->xpath); - pv->set_value(data->value); + case FINISH: + delete tag; } - - // Release memory. - list_delete(&input_list); - list_delete(&output_list); - - return grpc::Status::OK; } private: + frr::Northbound::AsyncService *_service; + grpc::ServerCompletionQueue *_cq; + struct candidate { uint32_t id; struct nb_config *config; @@ -649,24 +1145,12 @@ class NorthboundImpl final : public frr::Northbound::Service const char *client_name, const char *date, const char *comment) { - grpc::ServerWriter *writer = - static_cast *>(arg); - frr::ListTransactionsResponse response; - // Response: uint32 id = 1; - response.set_id(transaction_id); - - // Response: string client = 2; - response.set_client(client_name); - - // Response: string date = 3; - response.set_date(date); - - // Response: string comment = 4; - response.set_comment(comment); - - writer->Write(response); + auto list = static_cast> *>(arg); + list->push_back(std::make_tuple( + transaction_id, std::string(client_name), + std::string(date), std::string(comment))); } static int data_tree_from_dnode(frr::DataTree *dt, @@ -855,24 +1339,11 @@ static void *grpc_pthread_start(void *arg) { struct frr_pthread *fpt = static_cast(arg); unsigned long *port = static_cast(fpt->data); - NorthboundImpl service; - std::stringstream server_address; frr_pthread_set_name(fpt); - server_address << "0.0.0.0:" << *port; - - grpc::ServerBuilder builder; - builder.AddListeningPort(server_address.str(), - grpc::InsecureServerCredentials()); - builder.RegisterService(&service); - - std::unique_ptr server(builder.BuildAndStart()); - - zlog_notice("gRPC server listening on %s", - server_address.str().c_str()); - - server->Wait(); + NorthboundImpl nb; + nb.Run(*port); return NULL; }