zebra: multithreaded zserv

Handle each zclient in its own thread.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
This commit is contained in:
Quentin Young 2018-03-14 00:49:34 -04:00
parent 68542a6da6
commit 329e35dab8
3 changed files with 166 additions and 73 deletions

View File

@ -37,6 +37,7 @@
#include "logicalrouter.h"
#include "libfrr.h"
#include "routemap.h"
#include "frr_pthread.h"
#include "zebra/rib.h"
#include "zebra/zserv.h"
@ -378,6 +379,8 @@ int main(int argc, char **argv)
/* Needed for BSD routing socket. */
pid = getpid();
frr_pthread_init();
/* This must be done only after locking pidfile (bug #403). */
zebra_zserv_socket_init(zserv_path);

View File

@ -72,22 +72,43 @@ static void zebra_event(struct zserv *client, enum event event);
int zebra_server_send_message(struct zserv *client, struct stream *msg)
{
stream_fifo_push(client->obuf_fifo, msg);
zebra_event(client, ZEBRA_WRITE);
pthread_mutex_lock(&client->obuf_mtx);
{
stream_fifo_push(client->obuf_fifo, msg);
zebra_event(client, ZEBRA_WRITE);
}
pthread_mutex_unlock(&client->obuf_mtx);
return 0;
}
/* Lifecycle ---------------------------------------------------------------- */
/* Hooks for client connect / disconnect */
DEFINE_HOOK(zapi_client_connect, (struct zserv *client), (client));
DEFINE_KOOH(zapi_client_close, (struct zserv *client), (client));
/* free zebra client information. */
/*
* Deinitialize zebra client.
*
* - Deregister and deinitialize related internal resources
* - Gracefully close socket
* - Free associated resources
* - Free client structure
*
* This does *not* take any action on the struct thread * fields. These are
* managed by the owning pthread and any tasks associated with them must have
* been stopped prior to invoking this function.
*/
static void zebra_client_free(struct zserv *client)
{
hook_call(zapi_client_close, client);
/*
* Ensure these have been nulled. This does not equate to the
* associated task(s) being scheduled or unscheduled on the client
* pthread's threadmaster.
*/
assert(!client->t_read);
assert(!client->t_write);
/* Close file descriptor. */
if (client->sock) {
unsigned long nroutes;
@ -113,13 +134,9 @@ static void zebra_client_free(struct zserv *client)
if (client->wb)
buffer_free(client->wb);
/* Release threads. */
if (client->t_read)
thread_cancel(client->t_read);
if (client->t_write)
thread_cancel(client->t_write);
if (client->t_suicide)
thread_cancel(client->t_suicide);
/* Free buffer mutexes */
pthread_mutex_destroy(&client->obuf_mtx);
pthread_mutex_destroy(&client->ibuf_mtx);
/* Free bitmaps. */
for (afi_t afi = AFI_IP; afi < AFI_MAX; afi++)
@ -134,12 +151,37 @@ static void zebra_client_free(struct zserv *client)
}
/*
* Called from client thread to terminate itself.
* Finish closing a client.
*
* This task is scheduled by a ZAPI client pthread on the main pthread when it
* wants to stop itself. When this executes, the client connection should
* already have been closed. This task's responsibility is to gracefully
* terminate the client thread, update relevant internal datastructures and
* free any resources allocated by the main thread.
*/
static int zebra_client_handle_close(struct thread *thread)
{
struct zserv *client = THREAD_ARG(thread);
frr_pthread_stop(client->pthread, NULL);
listnode_delete(zebrad.client_list, client);
zebra_client_free(client);
return 0;
}
/*
* Gracefully shut down a client connection.
*
* Cancel any pending tasks for the client's thread. Then schedule a task on the
* main thread to shut down the calling thread.
*
* Must be called from the client pthread, never the main thread.
*/
static void zebra_client_close(struct zserv *client)
{
listnode_delete(zebrad.client_list, client);
zebra_client_free(client);
THREAD_OFF(client->t_read);
THREAD_OFF(client->t_write);
thread_add_event(zebrad.master, zebra_client_handle_close, client, 0,
NULL);
}
/* Make new client. */
@ -157,6 +199,8 @@ static void zebra_client_create(int sock)
client->obuf_fifo = stream_fifo_new();
client->ibuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
client->obuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
pthread_mutex_init(&client->ibuf_mtx, NULL);
pthread_mutex_init(&client->obuf_mtx, NULL);
client->wb = buffer_new(0);
/* Set table number. */
@ -177,21 +221,23 @@ static void zebra_client_create(int sock)
/* Add this client to linked list. */
listnode_add(zebrad.client_list, client);
zebra_vrf_update_all(client);
struct frr_pthread_attr zclient_pthr_attrs = {
.id = frr_pthread_get_id(),
.start = frr_pthread_attr_default.start,
.stop = frr_pthread_attr_default.stop
};
client->pthread = frr_pthread_new(&zclient_pthr_attrs, "Zebra API client thread");
hook_call(zapi_client_connect, client);
zebra_vrf_update_all(client);
/* start read loop */
zebra_event(client, ZEBRA_READ);
}
static int zserv_delayed_close(struct thread *thread)
{
struct zserv *client = THREAD_ARG(thread);
/* call callbacks */
hook_call(zapi_client_connect, client);
client->t_suicide = NULL;
zebra_client_close(client);
return 0;
/* start pthread */
frr_pthread_run(client->pthread, NULL);
}
/*
@ -225,10 +271,6 @@ static int zserv_flush_data(struct thread *thread)
struct zserv *client = THREAD_ARG(thread);
client->t_write = NULL;
if (client->t_suicide) {
zebra_client_close(client);
return -1;
}
switch (buffer_flush_available(client->wb, client->sock)) {
case BUFFER_ERROR:
zlog_warn(
@ -239,7 +281,7 @@ static int zserv_flush_data(struct thread *thread)
break;
case BUFFER_PENDING:
client->t_write = NULL;
thread_add_write(zebrad.master, zserv_flush_data, client,
thread_add_write(client->pthread->master, zserv_flush_data, client,
client->sock, &client->t_write);
break;
case BUFFER_EMPTY:
@ -260,13 +302,15 @@ static int zserv_write(struct thread *thread)
struct stream *msg;
int writerv;
if (client->t_suicide)
return -1;
if (client->is_synchronous)
return 0;
msg = stream_fifo_pop(client->obuf_fifo);
pthread_mutex_lock(&client->obuf_mtx);
{
msg = stream_fifo_pop(client->obuf_fifo);
}
pthread_mutex_unlock(&client->obuf_mtx);
stream_set_getp(msg, 0);
client->last_write_cmd = stream_getw_from(msg, 6);
@ -277,30 +321,27 @@ static int zserv_write(struct thread *thread)
switch (writerv) {
case BUFFER_ERROR:
zlog_warn(
"%s: buffer_write failed to zserv client fd %d, closing",
__func__, client->sock);
/*
* Schedule a delayed close since many of the functions that
* call this one do not check the return code. They do not
* allow for the possibility that an I/O error may have caused
* the client to be deleted.
*/
client->t_suicide = NULL;
thread_add_event(zebrad.master, zserv_delayed_close, client, 0,
&client->t_suicide);
zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]",
__func__, zebra_route_string(client->proto),
client->sock);
zlog_warn("%s: closing connection to %s", __func__,
zebra_route_string(client->proto));
zebra_client_close(client);
return -1;
case BUFFER_EMPTY:
THREAD_OFF(client->t_write);
break;
case BUFFER_PENDING:
thread_add_write(zebrad.master, zserv_flush_data, client,
client->sock, &client->t_write);
thread_add_write(client->pthread->master, zserv_flush_data,
client, client->sock, &client->t_write);
break;
case BUFFER_EMPTY:
break;
}
if (client->obuf_fifo->count)
zebra_event(client, ZEBRA_WRITE);
pthread_mutex_lock(&client->obuf_mtx);
{
if (client->obuf_fifo->count)
zebra_event(client, ZEBRA_WRITE);
}
pthread_mutex_unlock(&client->obuf_mtx);
client->last_write_time = monotime(NULL);
return 0;
@ -326,6 +367,18 @@ static void zserv_write_incoming(struct stream *orig, uint16_t command)
}
#endif
/*
* Read and process messages from a client.
*
* This task runs on the main pthread. It is scheduled by client pthreads when
* they have new messages available on their input queues. The client is passed
* as the task argument.
*
* Each message is popped off the client's input queue and the action associated
* with the message is executed. This proceeds until there are no more messages,
* an error occurs, or the processing limit is reached. In the last case, this
* task reschedules itself.
*/
static int zserv_process_messages(struct thread *thread)
{
struct zserv *client = THREAD_ARG(thread);
@ -334,8 +387,14 @@ static int zserv_process_messages(struct thread *thread)
struct stream *msg;
bool hdrvalid;
int p2p = zebrad.packets_to_process;
do {
msg = stream_fifo_pop(client->ibuf_fifo);
pthread_mutex_lock(&client->ibuf_mtx);
{
msg = stream_fifo_pop(client->ibuf_fifo);
}
pthread_mutex_unlock(&client->ibuf_mtx);
/* break if out of messages */
if (!msg)
@ -363,32 +422,56 @@ static int zserv_process_messages(struct thread *thread)
/* process commands */
zserv_handle_commands(client, &hdr, msg, zvrf);
} while (msg);
} while (msg && --p2p);
/* reschedule self if necessary */
pthread_mutex_lock(&client->ibuf_mtx);
{
if (client->ibuf_fifo->count)
thread_add_event(zebrad.master, &zserv_process_messages,
client, 0, NULL);
}
pthread_mutex_unlock(&client->ibuf_mtx);
return 0;
}
/* Handler of zebra service request. */
/*
* Read and process data from a client socket.
*
* The responsibilities here are to read raw data from the client socket,
* validate the header, encapsulate it into a single stream object, push it
* onto the input queue and then notify the main thread that there is new data
* available.
*
* This function first looks for any data in the client structure's working
* input buffer. If data is present, it is assumed that reading stopped in a
* previous invocation of this task and needs to be resumed to finish a message.
* Otherwise, the socket data stream is assumed to be at the beginning of a new
* ZAPI message (specifically at the header). The header is read and validated.
* If the header passed validation then the length field found in the header is
* used to compute the total length of the message. That much data is read (but
* not inspected), appended to the header, placed into a stream and pushed onto
* the client's input queue. A task is then scheduled on the main thread to
* process the client's input queue. Finally, if all of this was successful,
* this task reschedules itself.
*
* Any failure in any of these actions is handled by terminating the client.
*/
static int zserv_read(struct thread *thread)
{
int sock;
struct zserv *client;
size_t already;
#if defined(HANDLE_ZAPI_FUZZING)
int packets = 1;
int p2p = 1;
#else
int packets = zebrad.packets_to_process;
int p2p = zebrad.packets_to_process;
#endif
/* Get thread data. Reset reading thread because I'm running. */
sock = THREAD_FD(thread);
client = THREAD_ARG(thread);
if (client->t_suicide) {
zebra_client_close(client);
return -1;
}
while (packets) {
while (p2p--) {
struct zmsghdr hdr;
ssize_t nb;
bool hdrvalid;
@ -486,18 +569,18 @@ static int zserv_read(struct thread *thread)
stream_set_getp(client->ibuf_work, 0);
struct stream *msg = stream_dup(client->ibuf_work);
stream_fifo_push(client->ibuf_fifo, msg);
pthread_mutex_lock(&client->ibuf_mtx);
{
stream_fifo_push(client->ibuf_fifo, msg);
}
pthread_mutex_unlock(&client->ibuf_mtx);
if (client->t_suicide)
goto zread_fail;
--packets;
stream_reset(client->ibuf_work);
}
if (IS_ZEBRA_DEBUG_PACKET)
zlog_debug("Read %d packets",
zebrad.packets_to_process - packets);
zebrad.packets_to_process - p2p);
/* Schedule job to process those packets */
thread_add_event(zebrad.master, &zserv_process_messages, client, 0,
@ -517,16 +600,18 @@ static void zebra_event(struct zserv *client, enum event event)
{
switch (event) {
case ZEBRA_READ:
thread_add_read(zebrad.master, zserv_read, client, client->sock,
&client->t_read);
thread_add_read(client->pthread->master, zserv_read, client,
client->sock, &client->t_read);
break;
case ZEBRA_WRITE:
thread_add_write(zebrad.master, zserv_write, client,
thread_add_write(client->pthread->master, zserv_write, client,
client->sock, &client->t_write);
break;
}
}
/* Main thread lifecycle ----------------------------------------------------*/
/* Accept code of zebra server socket. */
static int zebra_accept(struct thread *thread)
{

View File

@ -50,11 +50,16 @@
/* Client structure. */
struct zserv {
/* Client pthread */
struct frr_pthread *pthread;
/* Client file descriptor. */
int sock;
/* Input/output buffer to the client. */
pthread_mutex_t ibuf_mtx;
struct stream_fifo *ibuf_fifo;
pthread_mutex_t obuf_mtx;
struct stream_fifo *obuf_fifo;
/* Private I/O buffers */