tests: add ZeroMQ test

Signed-off-by: David Lamparter <equinox@opensourcerouting.org>
This commit is contained in:
David Lamparter 2017-08-24 18:13:31 +02:00
parent b6116506ec
commit a0b974def7
3 changed files with 221 additions and 0 deletions

1
tests/.gitignore vendored
View File

@ -45,4 +45,5 @@ __pycache__
/lib/test_timer_correctness
/lib/test_timer_performance
/lib/test_ttable
/lib/test_zmq
/ospf6d/test_lsdb

View File

@ -72,6 +72,12 @@ check_PROGRAMS = \
$(TESTS_OSPF6D) \
# end
if ZEROMQ
check_PROGRAMS += \
lib/test_zmq \
# end
endif
../vtysh/vtysh_cmd.c:
$(MAKE) -C ../vtysh vtysh_cmd.c
@ -112,6 +118,7 @@ lib_test_timer_correctness_SOURCES = lib/test_timer_correctness.c \
lib_test_timer_performance_SOURCES = lib/test_timer_performance.c \
helpers/c/prng.c
lib_test_ttable_SOURCES = lib/test_ttable.c
lib_test_zmq_SOURCES = lib/test_zmq.c
lib_cli_test_cli_SOURCES = lib/cli/test_cli.c lib/cli/common_cli.c
lib_cli_test_commands_SOURCES = lib/cli/test_commands_defun.c \
lib/cli/test_commands.c \
@ -147,6 +154,7 @@ lib_test_table_LDADD = $(ALL_TESTS_LDADD) -lm
lib_test_timer_correctness_LDADD = $(ALL_TESTS_LDADD)
lib_test_timer_performance_LDADD = $(ALL_TESTS_LDADD)
lib_test_ttable_LDADD = $(ALL_TESTS_LDADD)
lib_test_zmq_LDADD = $(ALL_TESTS_LDADD) ../lib/libfrrzmq.la
lib_cli_test_cli_LDADD = $(ALL_TESTS_LDADD)
lib_cli_test_commands_LDADD = $(ALL_TESTS_LDADD)
bgpd_test_aspath_LDADD = $(BGP_TEST_LDADD)

212
tests/lib/test_zmq.c Normal file
View File

@ -0,0 +1,212 @@
/*
* ZeroMQ event test
* Copyright (C) 2017 David Lamparter, for NetDEF, Inc.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; see the file COPYING; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <zebra.h>
#include "memory.h"
#include "sigevent.h"
#include "frr_zmq.h"
DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer")
static struct thread_master *master;
static void msg_buf_free(void *data, void *hint)
{
XFREE(MTYPE_TESTBUF, data);
}
static void run_client(int syncfd)
{
int i, j;
char buf[32];
char dummy;
void *zmqctx = NULL;
void *zmqsock;
read(syncfd, &dummy, 1);
zmqctx = zmq_ctx_new();
zmq_ctx_set(zmqctx, ZMQ_IPV6, 1);
zmqsock = zmq_socket(zmqctx, ZMQ_REQ);
if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) {
perror("zmq_connect");
exit(1);
}
/* single-part */
for (i = 0; i < 8; i++) {
snprintf(buf, sizeof(buf), "msg #%d %c%c%c",
i, 'a' + i, 'b' + i, 'c' + i);
printf("client send: %s\n", buf);
fflush(stdout);
zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
zmq_recv(zmqsock, buf, sizeof(buf), 0);
printf("client recv: %s\n", buf);
}
/* multipart */
for (i = 2; i < 5; i++) {
int more;
printf("---\n");
for (j = 1; j <= i; j++) {
zmq_msg_t part;
char *dyn = XMALLOC(MTYPE_TESTBUF, 32);
snprintf(dyn, 32, "part %d/%d", j, i);
printf("client send: %s\n", dyn);
fflush(stdout);
zmq_msg_init_data(&part, dyn, strlen(dyn) + 1,
msg_buf_free, NULL);
zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0);
}
zmq_msg_t part;
do {
char *data;
zmq_msg_recv(&part, zmqsock, 0);
data = zmq_msg_data(&part);
more = zmq_msg_more(&part);
printf("client recv (more: %d): %s\n", more, data);
} while (more);
zmq_msg_close(&part);
}
zmq_close(zmqsock);
zmq_ctx_term(zmqctx);
}
static struct frrzmq_cb *cb;
static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
unsigned partnum)
{
int more = zmq_msg_more(msg);
char *in = zmq_msg_data(msg);
size_t i;
zmq_msg_t reply;
char *out;
printf("server recv part %u (more: %d): %s\n", partnum, more, in);
fflush(stdout);
/* REQ-REP doesn't allow sending a reply here */
if (more)
return;
out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1);
for (i = 0; i < strlen(in); i++)
out[i] = toupper(in[i]);
out[i] = '\0';
zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE);
out = XMALLOC(MTYPE_TESTBUF, 32);
snprintf(out, 32, "msg# was %u", partnum);
zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
zmq_msg_send(&reply, zmqsock, 0);
}
static void serverfn(void *arg, void *zmqsock)
{
static int num = 0;
char buf[32];
size_t i;
zmq_recv(zmqsock, buf, sizeof(buf), 0);
printf("server recv: %s\n", buf);
fflush(stdout);
for (i = 0; i < strlen(buf); i++)
buf[i] = toupper(buf[i]);
zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
if (++num < 4)
return;
/* change to multipart callback */
frrzmq_thread_cancel(cb);
cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock);
}
static void sigchld(void)
{
printf("child exited.\n");
frrzmq_thread_cancel(cb);
}
static struct quagga_signal_t sigs[] = {
{
.signal = SIGCHLD,
.handler = sigchld,
},
};
static void run_server(int syncfd)
{
void *zmqsock;
char dummy = 0;
struct thread t;
master = thread_master_create(NULL);
signal_init(master, array_size(sigs), sigs);
frrzmq_init();
zmqsock = zmq_socket(frrzmq_context, ZMQ_REP);
if (zmq_bind(zmqsock, "tcp://*:17171")) {
perror("zmq_bind");
exit(1);
}
cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock);
write(syncfd, &dummy, sizeof(dummy));
while (thread_fetch(master, &t))
thread_call(&t);
zmq_close(zmqsock);
frrzmq_finish();
thread_master_free(master);
log_memstats_stderr("test");
}
int main(void)
{
int syncpipe[2];
pid_t child;
if (pipe(syncpipe)) {
perror("pipe");
exit(1);
}
child = fork();
if (child < 0) {
perror("fork");
exit(1);
} else if (child == 0) {
run_client(syncpipe[0]);
exit(0);
}
run_server(syncpipe[1]);
exit(0);
}