zebra: some more i/o optimizations

* Separate flush task from write task, so we can continue adding to the
  write buffer while it's waiting to flush
* Handle write errors sooner rather than later
* Only schedule a process job if we have packets to process
* Tweak zserv_process_messages to not reschedule itself and rely on
  zserv_read() to do so in all proper cases

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
This commit is contained in:
Quentin Young 2018-04-25 18:45:21 -04:00
parent e1de21d760
commit 822167e704
2 changed files with 42 additions and 32 deletions

View File

@ -170,6 +170,7 @@ static void zserv_client_close(struct zserv *client)
{ {
THREAD_OFF(client->t_read); THREAD_OFF(client->t_read);
THREAD_OFF(client->t_write); THREAD_OFF(client->t_write);
THREAD_OFF(client->t_flush);
zserv_event(client, ZSERV_HANDLE_CLOSE); zserv_event(client, ZSERV_HANDLE_CLOSE);
} }
@ -177,7 +178,6 @@ static int zserv_flush_data(struct thread *thread)
{ {
struct zserv *client = THREAD_ARG(thread); struct zserv *client = THREAD_ARG(thread);
client->t_write = NULL;
switch (buffer_flush_available(client->wb, client->sock)) { switch (buffer_flush_available(client->wb, client->sock)) {
case BUFFER_ERROR: case BUFFER_ERROR:
zlog_warn( zlog_warn(
@ -217,6 +217,7 @@ static int zserv_write(struct thread *thread)
uint32_t wcmd; uint32_t wcmd;
int writerv = BUFFER_EMPTY; int writerv = BUFFER_EMPTY;
struct stream_fifo *cache = stream_fifo_new(); struct stream_fifo *cache = stream_fifo_new();
bool ok = true;
pthread_mutex_lock(&client->obuf_mtx); pthread_mutex_lock(&client->obuf_mtx);
{ {
@ -226,7 +227,7 @@ static int zserv_write(struct thread *thread)
} }
pthread_mutex_unlock(&client->obuf_mtx); pthread_mutex_unlock(&client->obuf_mtx);
while (cache->head) { while (stream_fifo_head(cache) && ok) {
msg = stream_fifo_pop(cache); msg = stream_fifo_pop(cache);
stream_set_getp(msg, 0); stream_set_getp(msg, 0);
@ -234,26 +235,29 @@ static int zserv_write(struct thread *thread)
writerv = buffer_write(client->wb, client->sock, writerv = buffer_write(client->wb, client->sock,
STREAM_DATA(msg), stream_get_endp(msg)); STREAM_DATA(msg), stream_get_endp(msg));
switch (writerv) {
case BUFFER_ERROR:
zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]",
__func__, zebra_route_string(client->proto),
client->sock);
zlog_warn("%s: closing connection to %s", __func__,
zebra_route_string(client->proto));
zserv_client_close(client);
ok = false;
break;
/* continue writing */
case BUFFER_PENDING:
case BUFFER_EMPTY:
break;
}
stream_free(msg); stream_free(msg);
} }
stream_fifo_free(cache); if (ok && writerv == BUFFER_PENDING)
switch (writerv) {
case BUFFER_ERROR:
zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]",
__func__, zebra_route_string(client->proto),
client->sock);
zlog_warn("%s: closing connection to %s", __func__,
zebra_route_string(client->proto));
zserv_client_close(client);
break;
case BUFFER_PENDING:
zserv_client_event(client, ZSERV_CLIENT_FLUSH_DATA); zserv_client_event(client, ZSERV_CLIENT_FLUSH_DATA);
break;
case BUFFER_EMPTY: stream_fifo_free(cache);
break;
}
atomic_store_explicit(&client->last_write_cmd, wcmd, atomic_store_explicit(&client->last_write_cmd, wcmd,
memory_order_relaxed); memory_order_relaxed);
@ -411,14 +415,15 @@ static int zserv_read(struct thread *thread)
stream_fifo_pop(cache)); stream_fifo_pop(cache));
} }
pthread_mutex_unlock(&client->ibuf_mtx); pthread_mutex_unlock(&client->ibuf_mtx);
/* Schedule job to process those packets */
zserv_event(client, ZSERV_PROCESS_MESSAGES);
} }
if (IS_ZEBRA_DEBUG_PACKET) if (IS_ZEBRA_DEBUG_PACKET)
zlog_debug("Read %d packets", p2p_orig - p2p); zlog_debug("Read %d packets", p2p_orig - p2p);
/* Schedule job to process those packets */
zserv_event(client, ZSERV_PROCESS_MESSAGES);
/* Reschedule ourselves */ /* Reschedule ourselves */
zserv_client_event(client, ZSERV_CLIENT_READ); zserv_client_event(client, ZSERV_CLIENT_READ);
@ -446,7 +451,7 @@ static void zserv_client_event(struct zserv *client,
break; break;
case ZSERV_CLIENT_FLUSH_DATA: case ZSERV_CLIENT_FLUSH_DATA:
thread_add_write(client->pthread->master, zserv_flush_data, thread_add_write(client->pthread->master, zserv_flush_data,
client, client->sock, &client->t_write); client, client->sock, &client->t_flush);
break; break;
} }
} }
@ -464,8 +469,11 @@ static void zserv_client_event(struct zserv *client,
* with the message is executed. This proceeds until there are no more messages, * with the message is executed. This proceeds until there are no more messages,
* an error occurs, or the processing limit is reached. * an error occurs, or the processing limit is reached.
* *
* This task reschedules itself if it cannot process everything on the input * The client's I/O thread can push at most zebrad.packets_to_process messages
* queue in one run. * onto the input buffer before notifying us there are packets to read. As long
* as we always process zebrad.packets_to_process messages here, then we can
* rely on the read thread to handle queuing this task enough times to process
* everything on the input queue.
*/ */
static int zserv_process_messages(struct thread *thread) static int zserv_process_messages(struct thread *thread)
{ {
@ -477,19 +485,19 @@ static int zserv_process_messages(struct thread *thread)
pthread_mutex_lock(&client->ibuf_mtx); pthread_mutex_lock(&client->ibuf_mtx);
{ {
for (uint32_t i = p2p - 1; i && client->ibuf_fifo->head; --i) uint32_t i;
stream_fifo_push(cache, for (i = 0; i < p2p && stream_fifo_head(client->ibuf_fifo);
stream_fifo_pop(client->ibuf_fifo)); ++i) {
msg = stream_fifo_pop(client->ibuf_fifo);
stream_fifo_push(cache, msg);
}
if (client->ibuf_fifo->head) msg = NULL;
zserv_event(client, ZSERV_PROCESS_MESSAGES);
} }
pthread_mutex_unlock(&client->ibuf_mtx); pthread_mutex_unlock(&client->ibuf_mtx);
while (p2p--) { while (stream_fifo_head(cache)) {
msg = stream_fifo_pop(cache); msg = stream_fifo_pop(cache);
if (!msg)
break;
zserv_handle_commands(client, msg); zserv_handle_commands(client, msg);
stream_free(msg); stream_free(msg);
} }
@ -614,6 +622,7 @@ static int zserv_handle_client_close(struct thread *thread)
*/ */
assert(!client->t_read); assert(!client->t_read);
assert(!client->t_write); assert(!client->t_write);
assert(!client->t_flush);
/* synchronously stop thread */ /* synchronously stop thread */
frr_pthread_stop(client->pthread, NULL); frr_pthread_stop(client->pthread, NULL);

View File

@ -72,6 +72,7 @@ struct zserv {
/* Threads for read/write. */ /* Threads for read/write. */
struct thread *t_read; struct thread *t_read;
struct thread *t_write; struct thread *t_write;
struct thread *t_flush;
/* default routing table this client munges */ /* default routing table this client munges */
int rtm_table; int rtm_table;