zebra: fix write task collision

Only one I/O task can be scheduled per file descriptor. Having two
separate tasks for buffer filling and buffer flushing was breaking that
invariant and causing messages to never be written.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
This commit is contained in:
Quentin Young 2018-04-26 00:06:15 -04:00
parent 1f312c843d
commit 29bed51b74
2 changed files with 43 additions and 55 deletions

View File

@ -76,8 +76,6 @@ enum zserv_client_event {
ZSERV_CLIENT_READ, ZSERV_CLIENT_READ,
/* Schedule a buffer write */ /* Schedule a buffer write */
ZSERV_CLIENT_WRITE, ZSERV_CLIENT_WRITE,
/* Schedule a buffer flush */
ZSERV_CLIENT_FLUSH_DATA,
}; };
/* /*
@ -170,54 +168,50 @@ static void zserv_client_close(struct zserv *client)
{ {
THREAD_OFF(client->t_read); THREAD_OFF(client->t_read);
THREAD_OFF(client->t_write); THREAD_OFF(client->t_write);
THREAD_OFF(client->t_flush);
zserv_event(client, ZSERV_HANDLE_CLOSE); zserv_event(client, ZSERV_HANDLE_CLOSE);
} }
static int zserv_flush_data(struct thread *thread)
{
struct zserv *client = THREAD_ARG(thread);
switch (buffer_flush_available(client->wb, client->sock)) {
case BUFFER_ERROR:
zlog_warn(
"%s: buffer_flush_available failed on zserv client fd %d, closing",
__func__, client->sock);
zserv_client_close(client);
client = NULL;
break;
case BUFFER_PENDING:
zserv_client_event(client, ZSERV_CLIENT_FLUSH_DATA);
break;
case BUFFER_EMPTY:
break;
}
if (client)
client->last_write_time = monotime(NULL);
return 0;
}
/* /*
* Write all pending messages to client socket. * Write all pending messages to client socket.
* *
* Any messages queued with zserv_send_message() before this function executes * This function first attempts to flush any buffered data. If unsuccessful,
* will be pushed to the output buffer. The buffer will then take care of * the function reschedules itself and returns. If successful, it pops all
* writing chunks until it is empty. * available messages from the output queue and continues to write data
* directly to the socket until the socket would block. If the socket never
* blocks and all data is written, the function returns without rescheduling
* itself. If the socket ends up throwing EWOULDBLOCK, the remaining data is
* buffered and the function reschedules itself.
* *
* This function does not reschedule itself. As far as it is concerned it * The utility of the buffer is that it allows us to vastly reduce lock
* always writes all data. This saves us a mutex hit in thread_add_event at the * contention by allowing us to pop *all* messages off the output queue at once
* theoretical expense of buffer memory usage. In practice this should never be * instead of locking and unlocking each time we want to pop a single message
* an issue. * off the queue. The same thing could arguably be accomplished faster by
* allowing the main thread to write directly into the buffer instead of
* enqueuing packets onto an intermediary queue, but the intermediary queue
* allows us to expose information about input and output queues to the user in
* terms of number of packets rather than size of data.
*/ */
static int zserv_write(struct thread *thread) static int zserv_write(struct thread *thread)
{ {
struct zserv *client = THREAD_ARG(thread); struct zserv *client = THREAD_ARG(thread);
struct stream *msg; struct stream *msg;
uint32_t wcmd; uint32_t wcmd;
int writerv = BUFFER_EMPTY; int writerv;
struct stream_fifo *cache = stream_fifo_new(); struct stream_fifo *cache;
bool ok = true;
/* If we have any data pending, try to flush it first */
switch (buffer_flush_available(client->wb, client->sock)) {
case BUFFER_ERROR:
goto zwrite_fail;
case BUFFER_PENDING:
client->last_write_time = monotime(NULL);
zserv_client_event(client, ZSERV_CLIENT_WRITE);
return 0;
case BUFFER_EMPTY:
break;
}
cache = stream_fifo_new();
pthread_mutex_lock(&client->obuf_mtx); pthread_mutex_lock(&client->obuf_mtx);
{ {
@ -227,7 +221,7 @@ static int zserv_write(struct thread *thread)
} }
pthread_mutex_unlock(&client->obuf_mtx); pthread_mutex_unlock(&client->obuf_mtx);
while (stream_fifo_head(cache) && ok) { while (stream_fifo_head(cache)) {
msg = stream_fifo_pop(cache); msg = stream_fifo_pop(cache);
stream_set_getp(msg, 0); stream_set_getp(msg, 0);
@ -237,15 +231,9 @@ static int zserv_write(struct thread *thread)
switch (writerv) { switch (writerv) {
case BUFFER_ERROR: case BUFFER_ERROR:
zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]", stream_free(msg);
__func__, zebra_route_string(client->proto), stream_fifo_free(cache);
client->sock); goto zwrite_fail;
zlog_warn("%s: closing connection to %s", __func__,
zebra_route_string(client->proto));
zserv_client_close(client);
ok = false;
break;
/* continue writing */
case BUFFER_PENDING: case BUFFER_PENDING:
case BUFFER_EMPTY: case BUFFER_EMPTY:
break; break;
@ -254,8 +242,8 @@ static int zserv_write(struct thread *thread)
stream_free(msg); stream_free(msg);
} }
if (ok && writerv == BUFFER_PENDING) if (!buffer_empty(client->wb))
zserv_client_event(client, ZSERV_CLIENT_FLUSH_DATA); zserv_client_event(client, ZSERV_CLIENT_WRITE);
stream_fifo_free(cache); stream_fifo_free(cache);
@ -266,6 +254,12 @@ static int zserv_write(struct thread *thread)
(uint32_t)monotime(NULL), memory_order_relaxed); (uint32_t)monotime(NULL), memory_order_relaxed);
return 0; return 0;
zwrite_fail:
zlog_warn("%s: could not write to %s [fd = %d], closing.", __func__,
zebra_route_string(client->proto), client->sock);
zserv_client_close(client);
return 0;
} }
/* /*
@ -449,10 +443,6 @@ static void zserv_client_event(struct zserv *client,
thread_add_write(client->pthread->master, zserv_write, client, thread_add_write(client->pthread->master, zserv_write, client,
client->sock, &client->t_write); client->sock, &client->t_write);
break; break;
case ZSERV_CLIENT_FLUSH_DATA:
thread_add_write(client->pthread->master, zserv_flush_data,
client, client->sock, &client->t_flush);
break;
} }
} }
@ -622,7 +612,6 @@ static int zserv_handle_client_close(struct thread *thread)
*/ */
assert(!client->t_read); assert(!client->t_read);
assert(!client->t_write); assert(!client->t_write);
assert(!client->t_flush);
/* synchronously stop thread */ /* synchronously stop thread */
frr_pthread_stop(client->pthread, NULL); frr_pthread_stop(client->pthread, NULL);

View File

@ -72,7 +72,6 @@ struct zserv {
/* Threads for read/write. */ /* Threads for read/write. */
struct thread *t_read; struct thread *t_read;
struct thread *t_write; struct thread *t_write;
struct thread *t_flush;
/* default routing table this client munges */ /* default routing table this client munges */
int rtm_table; int rtm_table;