diff --git a/lib/frr_zmq.c b/lib/frr_zmq.c index 05f0fce5fc..ce52848a25 100644 --- a/lib/frr_zmq.c +++ b/lib/frr_zmq.c @@ -135,8 +135,8 @@ static int frrzmq_read_msg(struct thread *t) if (read) frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT); - _thread_add_read_write(t->xref, t->master, frrzmq_read_msg, cbp, - cb->fd, &cb->read.thread); + thread_add_read(t->master, frrzmq_read_msg, cbp, + cb->fd, &cb->read.thread); return 0; out_err: @@ -191,11 +191,11 @@ int _frrzmq_thread_add_read(const struct xref_threadsched *xref, if (events & ZMQ_POLLIN) { thread_cancel(&cb->read.thread); - _thread_add_event(xref, master, frrzmq_read_msg, cbp, fd, + thread_add_event(master, frrzmq_read_msg, cbp, fd, &cb->read.thread); } else - _thread_add_read_write(xref, master, frrzmq_read_msg, cbp, fd, - &cb->read.thread); + thread_add_read(master, frrzmq_read_msg, cbp, fd, + &cb->read.thread); return 0; } @@ -241,8 +241,8 @@ static int frrzmq_write_msg(struct thread *t) if (written) frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN); - _thread_add_read_write(t->xref, t->master, frrzmq_write_msg, cbp, - cb->fd, &cb->write.thread); + thread_add_write(t->master, frrzmq_write_msg, cbp, + cb->fd, &cb->write.thread); return 0; out_err: @@ -297,8 +297,8 @@ int _frrzmq_thread_add_write(const struct xref_threadsched *xref, _thread_add_event(xref, master, frrzmq_write_msg, cbp, fd, &cb->write.thread); } else - _thread_add_read_write(xref, master, frrzmq_write_msg, cbp, fd, - &cb->write.thread); + thread_add_write(master, frrzmq_write_msg, cbp, fd, + &cb->write.thread); return 0; } @@ -310,7 +310,7 @@ void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core) thread_cancel(&core->thread); if ((*cb)->read.cancelled && !(*cb)->read.thread - && (*cb)->write.cancelled && (*cb)->write.thread) + && (*cb)->write.cancelled && !(*cb)->write.thread) XFREE(MTYPE_ZEROMQ_CB, *cb); } @@ -330,12 +330,16 @@ void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core, len = sizeof(events); if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len)) return; - if (events & event && core->thread && !core->cancelled) { + if ((events & event) && core->thread && !core->cancelled) { struct thread_master *tm = core->thread->master; + thread_cancel(&core->thread); - thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg - : frrzmq_write_msg), - cbp, cb->fd, &core->thread); + if (event == ZMQ_POLLIN) + thread_add_event(tm, frrzmq_read_msg, + cbp, cb->fd, &core->thread); + else + thread_add_event(tm, frrzmq_write_msg, + cbp, cb->fd, &core->thread); } }