diff --git a/Makefile.am b/Makefile.am index da0b4490..34f0407b 100644 --- a/Makefile.am +++ b/Makefile.am @@ -49,7 +49,7 @@ MAINTAINERCLEANFILES = Makefile.in aclocal.m4 configure depcomp \ dist_doc_DATA = LICENSE INSTALL README.recovery SECURITY AUTHORS SUBDIRS = include common_lib lib exec tools test cts pkgconfig \ - man init conf qdevices + man init conf qdevices vqsim coverity: rm -rf cov diff --git a/configure.ac b/configure.ac index fbec166e..f46b83b7 100644 --- a/configure.ac +++ b/configure.ac @@ -204,6 +204,7 @@ AC_CONFIG_FILES([Makefile tools/Makefile conf/Makefile qdevices/Makefile + vqsim/Makefile Doxyfile conf/logrotate/Makefile conf/tmpfiles.d/Makefile]) @@ -418,6 +419,11 @@ AC_ARG_ENABLE([qnetd], [ enable_qnetd="no" ]) AM_CONDITIONAL(BUILD_QNETD, test x$enable_qnetd = xyes) +AC_ARG_ENABLE([vqsim], + [ --enable-vqsim : Quorum simulator support ],, + [ enable_vqsim="no" ]) +AM_CONDITIONAL(BUILD_VQSIM, test x$enable_vqsim = xyes) + # *FLAGS handling goes here ENV_CFLAGS="$CFLAGS" @@ -454,6 +460,14 @@ if test "x${enable_testagents}" = xyes; then WITH_LIST="$WITH_LIST --with testagents" fi +if test "x${enable_rdma}" = xyes; then + PKG_CHECK_MODULES([rdmacm],[rdmacm]) + PKG_CHECK_MODULES([ibverbs],[ibverbs]) + AC_DEFINE_UNQUOTED([HAVE_RDMA], 1, [have rdmacm]) + PACKAGE_FEATURES="$PACKAGE_FEATURES rdma" + WITH_LIST="$WITH_LIST --with rdma" +fi + if test "x${enable_monitoring}" = xyes; then PKG_CHECK_MODULES([statgrab], [libstatgrab]) PKG_CHECK_MODULES([statgrabge090], [libstatgrab >= 0.90], @@ -490,6 +504,16 @@ fi if test "x${enable_qdevices}" = xyes; then PACKAGE_FEATURES="$PACKAGE_FEATURES qdevices" fi + +if test "x${enable_vqsim}" = xyes; then + vqsim_readline=no + AC_CHECK_HEADERS([readline/readline.h readline/history.h], + [], + AC_MSG_WARN([vqsim will lack readline support])) + PACKAGE_FEATURES="$PACKAGE_FEATURES vqsim" +fi +AM_CONDITIONAL(VQSIM_READLINE, [test "x${ac_cv_header_readline_readline_h}" = xyes]) + if test "x${enable_qnetd}" = xyes; then PACKAGE_FEATURES="$PACKAGE_FEATURES qnetd" fi diff --git a/vqsim/Makefile.am b/vqsim/Makefile.am new file mode 100644 index 00000000..7eccdd8b --- /dev/null +++ b/vqsim/Makefile.am @@ -0,0 +1,47 @@ +# +# Copyright (c) 2009 Red Hat, Inc. +# +# Authors: Andrew Beekhof +# Steven Dake (sdake@redhat.com) +# +# This software licensed under BSD license, the text of which follows: +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# - Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# - Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# - Neither the name of the MontaVista Software, Inc. nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF +# THE POSSIBILITY OF SUCH DAMAGE. + +MAINTAINERCLEANFILES = Makefile.in + +noinst_PROGRAMS = vqsim + +vqsim_LDADD = $(top_builddir)/common_lib/libcorosync_common.la \ + ../exec/corosync-votequorum.o ../exec/corosync-icmap.o ../exec/corosync-logsys.o \ + ../exec/corosync-coroparse.o ../exec/corosync-logconfig.o \ + $(LIBQB_LIBS) +if VQSIM_READLINE +vqsim_LDADD += -lreadline +endif + +vqsim_DEPENDENCIES = $(top_builddir)/common_lib/libcorosync_common.la + +vqsim_SOURCES = vqmain.c parser.c vq_object.c vqsim_vq_engine.c diff --git a/vqsim/parser.c b/vqsim/parser.c new file mode 100644 index 00000000..a7954d14 --- /dev/null +++ b/vqsim/parser.c @@ -0,0 +1,332 @@ +/* Parses the interactive commands */ + +#include + +#include +#include +#include +#include +#include +#ifdef HAVE_READLINE_HISTORY_H +#include +#endif + +#include +#include "vqsim.h" + +static void do_usage(void) +{ + printf(" All node IDs in the cluster are unique and belong to a numbered 'partition' (default=0)\n"); + printf("\n"); + printf("up [:][[,] ...] [[:][...]] [...]\n"); + printf(" bring node(s) online in the specified partition(s)\n"); + printf("down ,[...]\n"); + printf(" send nodes offline (shut them down)\n"); + printf("move/split [:][[,] ...] [[:][...]] [...]\n"); + printf(" Move nodes from one partition to another (netsplit)\n"); + printf(" here is the partition to move the nodes to\n"); + printf("join [] ... \n"); + printf(" Join partitions together (reverse of a netsplit)\n"); + printf("qdevice on|off [:][[,] ...] [[:][...]] [...]\n"); + printf(" Enable quorum device in specified nodes\n"); + printf("autofence on|off\n"); + printf(" automatically 'down' nodes on inquorate side on netsplit\n"); + printf("show Show current nodes status\n"); + printf("exit\n\n"); +} + + +typedef void (*cmd_routine_t)(int argc, char **argv); + +static void run_up_cmd(int argc, char **argv); +static void run_down_cmd(int argc, char **argv); +static void run_join_cmd(int argc, char **argv); +static void run_move_cmd(int argc, char **argv); +static void run_exit_cmd(int argc, char **argv); +static void run_show_cmd(int argc, char **argv); +static void run_autofence_cmd(int argc, char **argv); +static void run_qdevice_cmd(int argc, char **argv); + +static struct cmd_list_struct { + const char *cmd; + int min_args; + cmd_routine_t cmd_runner; +} cmd_list[] = { + { "up", 1, run_up_cmd}, + { "down", 1, run_down_cmd}, + { "move", 2, run_move_cmd}, + { "split", 2, run_move_cmd}, + { "join", 2, run_join_cmd}, + { "autofence", 1, run_autofence_cmd}, + { "qdevice", 1, run_qdevice_cmd}, + { "show", 0, run_show_cmd}, + { "exit", 0, run_exit_cmd}, + { "quit", 0, run_exit_cmd}, + { "q", 0, run_exit_cmd}, +}; +static int num_cmds = (sizeof(cmd_list)) / sizeof(struct cmd_list_struct); +#define MAX_ARGS 1024 + +/* Takes a :[[,]...] list and return it + as a partition and a list of nodes. + Returns 0 if successful, -1 if not +*/ +static int parse_partition_nodelist(char *string, int *partition, int *num_nodes, int **retnodes) +{ + int i; + int nodecount; + int len; + int last_comma; + char *nodeptr; + int *nodes; + char *colonptr = strchr(string, ':'); + + if (colonptr) { + *colonptr = '\0'; + nodeptr = colonptr+1; + *partition = atoi(string); + } + else { + /* Default to partition 0 */ + *partition = 0; + nodeptr = string; + } + + /* Count the number of commas and allocate space for the nodes */ + nodecount = 0; + for (i=0; i MAX_NODES) { + return -1; + } + + nodes = malloc(sizeof(int) * nodecount); + if (!nodes) { + return -1; + } + + nodecount = 0; + last_comma = 0; + len = strlen(nodeptr); + for (i=0; i<=len; i++) { + if (nodeptr[i] == ',' || nodeptr[i] == '\0') { + + nodeptr[i] = '\0'; + nodes[nodecount++] = atoi(&nodeptr[last_comma]); + last_comma = i+1; + } + } + + *num_nodes = nodecount; + *retnodes = nodes; + + return 0; +} + +void parse_input_command(char *rl_cmd) +{ + int i; + int argc = 0; + int valid_cmd = 0; + char *argv[MAX_ARGS]; + int last_arg_start = 0; + int last_was_space = 0; + int len; + char *cmd; + + /* ^D quits */ + if (rl_cmd == NULL) { + run_exit_cmd(0, NULL); + } + + cmd = strdup(rl_cmd); + + /* Split cmd up into args + * destroying the original string mwahahahaha + */ + + len = strlen(cmd); + + /* Span leading spaces */ + for (i=0; cmd[i] == ' '; i++) + ; + last_arg_start = i; + + for (; i<=len; i++) { + if (cmd[i] == ' ' || cmd[i] == '\0') { + + /* Allow multiple spaces */ + if (last_was_space) { + continue; + } + + cmd[i] = '\0'; + last_was_space = 1; + + argv[argc] = &cmd[last_arg_start]; + argc++; + } + else { + if (last_was_space) { + last_arg_start = i; + } + last_was_space = 0; + } + } + + /* Ignore null commands */ + if (strlen(argv[0]) == 0) { + free(cmd); + return; + } +#ifdef HAVE_READLINE_HISTORY_H + add_history(rl_cmd); +#endif + + /* Dispatch command */ + for (i=0; i +#include +#include +#include + +#include "../exec/votequorum.h" +#include "vqsim.h" + +struct vq_instance +{ + int nodeid; + int vq_socket; + pid_t pid; +}; + +vq_object_t vq_create_instance(qb_loop_t *poll_loop, int nodeid) +{ + struct vq_instance *instance = malloc(sizeof(struct vq_instance)); + if (!instance) { + return NULL; + } + + instance->nodeid = nodeid; + + if (fork_new_instance(nodeid, &instance->vq_socket, &instance->pid)) { + free(instance); + return NULL; + } + + return instance; +} + +pid_t vq_get_pid(vq_object_t instance) +{ + struct vq_instance *vqi = instance; + return vqi->pid; +} + +void vq_quit(vq_object_t instance) +{ + struct vq_instance *vqi = instance; + struct vqsim_msg_header msg; + int res; + + msg.type = VQMSG_QUIT; + msg.from_nodeid = 0; + msg.param = 0; + + res = write(vqi->vq_socket, &msg, sizeof(msg)); + if (res <= 0) { + perror("Quit write failed"); + } +} + +int vq_quit_if_inquorate(vq_object_t instance) +{ + struct vq_instance *vqi = instance; + struct vqsim_msg_header msg; + int res; + + msg.type = VQMSG_QUORUMQUIT; + msg.from_nodeid = 0; + msg.param = 0; + + res = write(vqi->vq_socket, &msg, sizeof(msg)); + if (res <= 0) { + perror("Quit write failed"); + } + return 0; +} + +int vq_set_nodelist(vq_object_t instance, struct memb_ring_id *ring_id, int *nodeids, int nodeids_entries) +{ + struct vq_instance *vqi = instance; + char msgbuf[sizeof(int)*nodeids_entries + sizeof(struct vqsim_sync_msg)]; + struct vqsim_sync_msg *msg = (void*)msgbuf; + int res; + + msg->header.type = VQMSG_SYNC; + msg->header.from_nodeid = 0; + msg->header.param = 0; + msg->view_list_entries = nodeids_entries; + memcpy(&msg->view_list, nodeids, nodeids_entries*sizeof(int)); + memcpy(&msg->ring_id, ring_id, sizeof(struct memb_ring_id)); + + res = write(vqi->vq_socket, msgbuf, sizeof(msgbuf)); + if (res <= 0) { + perror("Sync write failed"); + return -1; + } + return 0; +} + +int vq_set_qdevice(vq_object_t instance, struct memb_ring_id *ring_id, int onoff) +{ + struct vq_instance *vqi = instance; + struct vqsim_msg_header msg; + int res; + + msg.type = VQMSG_QDEVICE; + msg.from_nodeid = 0; + msg.param = onoff; + res = write(vqi->vq_socket, &msg, sizeof(msg)); + if (res <= 0) { + perror("qdevice register write failed"); + return -1; + } + return 0; +} + +int vq_get_parent_fd(vq_object_t instance) +{ + struct vq_instance *vqi = instance; + + return vqi->vq_socket; +} diff --git a/vqsim/vqmain.c b/vqsim/vqmain.c new file mode 100644 index 00000000..ce4d3236 --- /dev/null +++ b/vqsim/vqmain.c @@ -0,0 +1,733 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef HAVE_READLINE_READLINE_H +#include +#else +#include /* isatty */ +#endif + +#include "../exec/votequorum.h" +#include "../exec/service.h" +#include +#include + +#include "icmap.h" +#include "vqsim.h" + +/* Easier than including the config file with a ton of conflicting dependencies */ +extern int coroparse_configparse (icmap_map_t config_map, const char **error_string); +extern int corosync_log_config_read (const char **error_string); + +/* One of these per partition */ +struct vq_partition { + TAILQ_HEAD(, vq_node) nodelist; + struct memb_ring_id ring_id; + int num; +}; + +/* One of these per node */ +struct vq_node { + vq_object_t instance; + unsigned int nodeid; + int fd; + struct vq_partition *partition; + TAILQ_ENTRY(vq_node) entries; + + /* Last status */ + int last_quorate; + struct memb_ring_id last_ring_id; + int last_view_list[MAX_NODES]; + int last_view_list_entries; +}; + +static struct vq_partition partitions[MAX_PARTITIONS]; +static qb_loop_t *poll_loop; +static int autofence; +static int check_for_quorum; +static FILE *output_file; +static int nosync; +static qb_loop_timer_handle kb_timer; +static ssize_t wait_count; +static ssize_t wait_count_to_unblock; + +static struct vq_node *find_by_pid(pid_t pid); +static void send_partition_to_nodes(struct vq_partition *partition, int newring); +static void start_kb_input(void); +static void start_kb_input_timeout(void *data); + +#ifndef HAVE_READLINE_READLINE_H +#define INPUT_BUF_SIZE 1024 +static char input_buf[INPUT_BUF_SIZE]; +static size_t input_buf_term = 0; +static int is_tty; +#endif + +/* Tell all non-quorate nodes to quit */ +static void force_fence(void) +{ + int i; + struct vq_node *vqn; + + for (i=0; iinstance); + } + } +} + +/* Save quorum state from the incoming message */ +static void save_quorum_state(struct vq_node *node, struct vqsim_quorum_msg *qmsg) +{ + node->last_quorate = qmsg->quorate; + memcpy(&node->last_ring_id, &qmsg->ring_id, sizeof(struct memb_ring_id)); + memcpy(node->last_view_list, qmsg->view_list, sizeof(int) * qmsg->view_list_entries); + node->last_view_list_entries = qmsg->view_list_entries; + + /* If at least one node is quorate and autofence is enabled, then fence everyone who is not quorate */ + if (check_for_quorum && qmsg->quorate & autofence) { + check_for_quorum = 0; + force_fence(); + } +} + +/* Print current node state */ +static void print_quorum_state(struct vq_node *node) +{ + int i; + + if (node->last_quorate < 0) { + fprintf(output_file, "%d:%02d: q=UNINITIALIZED\n", + node->partition->num, node->nodeid); + return; + } + + fprintf(output_file, "%d:%02d: q=%d ring=[%d/%lld] ", node->partition->num, node->nodeid, node->last_quorate, + node->last_ring_id.rep.nodeid, node->last_ring_id.seq); + fprintf(output_file, "nodes=["); + for (i = 0; i < node->last_view_list_entries; i++) { + if (i) { + fprintf(output_file, " "); + } + fprintf(output_file, "%d", node->last_view_list[i]); + } + fprintf(output_file, "]\n"); + +} + +static void propogate_vq_message(struct vq_node *vqn, const char *msg, int len) +{ + struct vq_node *other_vqn; + + /* Send it to everyone in that node's partition (including itself) */ + TAILQ_FOREACH(other_vqn, &vqn->partition->nodelist, entries) { + write(other_vqn->fd, msg, len); + } +} + +static int vq_parent_read_fn(int32_t fd, int32_t revents, void *data) +{ + char msgbuf[8192]; + int msglen; + struct vqsim_msg_header *msg; + struct vqsim_quorum_msg *qmsg; + struct vq_node *vqn = data; + + if (revents == POLLIN) { + msglen = read(fd, msgbuf, sizeof(msgbuf)); + if (msglen < 0) { + perror("read failed"); + } + + if (msglen > 0) { + msg = (void*)msgbuf; + switch (msg->type) { + case VQMSG_QUORUM: + if (!nosync && --wait_count_to_unblock <= 0) + qb_loop_timer_del(poll_loop, kb_timer); + qmsg = (void*)msgbuf; + save_quorum_state(vqn, qmsg); + print_quorum_state(vqn); + if (!nosync && wait_count_to_unblock <= 0) + start_kb_input(); + break; + case VQMSG_EXEC: + /* Message from votequorum, pass around the partition */ + propogate_vq_message(vqn, msgbuf, msglen); + break; + case VQMSG_QUIT: + case VQMSG_SYNC: + case VQMSG_QDEVICE: + case VQMSG_QUORUMQUIT: + /* not used here */ + break; + } + } + } + if (revents == POLLERR) { + fprintf(stderr, "pollerr on %d\n", vqn->nodeid); + } + return 0; +} + + +static int read_corosync_conf(void) +{ + int res; + const char *error_string; + + int err = icmap_init(); + if (!err) { + fprintf(stderr, "icmap_init failed\n"); + } + + /* Load corosync.conf */ + logsys_format_set(NULL); + res = coroparse_configparse(icmap_get_global_map(), &error_string); + if (res == -1) { + log_printf (LOGSYS_LEVEL_INFO, "Error loading corosyc.conf %s", error_string); + return -1; + } + else { + res = corosync_log_config_read (&error_string); + if (res < 0) { + log_printf (LOGSYS_LEVEL_INFO, "error reading log config %s", error_string); + syslog (LOGSYS_LEVEL_INFO, "error reading log config %s", error_string); + } + else { + logsys_config_apply(); + } + } + if (logsys_thread_start() != 0) { + log_printf (LOGSYS_LEVEL_ERROR, "Can't initialize log thread"); + return -1; + } + return 0; +} + +static void remove_node(struct vq_node *node) +{ + struct vq_partition *part; + part = node->partition; + + /* Remove from partition list */ + TAILQ_REMOVE(&part->nodelist, node, entries); + free(node); + + wait_count--; + + /* Rebuild quorum */ + send_partition_to_nodes(part, 1); +} + +static int32_t sigchld_handler(int32_t sig, void *data) +{ + pid_t pid; + int status; + struct vq_node *vqn; + const char *exit_status=""; + char text[132]; + + pid = wait(&status); + if (WIFEXITED(status)) { + vqn = find_by_pid(pid); + if (vqn) { + switch (WEXITSTATUS(status)) { + case 0: + exit_status = "(on request)"; + break; + case 1: + exit_status = "(autofenced)"; + break; + default: + sprintf(text, "(exit code %d)", WEXITSTATUS(status)); + break; + } + printf("%d:%02d Quit %s\n", vqn->partition->num, vqn->nodeid, exit_status); + + remove_node(vqn); + } + else { + fprintf(stderr, "Unknown child %d exited with status %d\n", pid, WEXITSTATUS(status)); + } + } + if (WIFSIGNALED(status)) { + vqn = find_by_pid(pid); + if (vqn) { + printf("%d:%02d exited on signal %d%s\n", vqn->partition->num, vqn->nodeid, WTERMSIG(status), WCOREDUMP(status)?" (core dumped)":""); + remove_node(vqn); + } + else { + fprintf(stderr, "Unknown child %d exited with status %d%s\n", pid, WTERMSIG(status), WCOREDUMP(status)?" (core dumped)":""); + } + } + return 0; +} + +static void send_partition_to_nodes(struct vq_partition *partition, int newring) +{ + struct vq_node *vqn; + int nodelist[MAX_NODES]; + int nodes = 0; + int first = 1; + + if (newring) { + /* Simulate corosync incrementing the seq by 4 for added authenticity */ + partition->ring_id.seq += 4; + } + + /* Build the node list */ + TAILQ_FOREACH(vqn, &partition->nodelist, entries) { + nodelist[nodes++] = vqn->nodeid; + if (first) { + partition->ring_id.rep.nodeid = vqn->nodeid; + first = 0; + } + } + + TAILQ_FOREACH(vqn, &partition->nodelist, entries) { + vq_set_nodelist(vqn->instance, &partition->ring_id, nodelist, nodes); + } +} + +static void init_partitions(void) +{ + int i; + + for (i=0; ilast_quorate = -1; /* mark "uninitialized" */ + newvq->instance = vq_create_instance(poll_loop, nodeid); + if (!newvq->instance) { + fprintf(stderr, + "ERR: could not create vq instance nodeid %d\n", + nodeid); + return (pid_t) -1; + } + newvq->partition = &partitions[partno]; + newvq->nodeid = nodeid; + newvq->fd = vq_get_parent_fd(newvq->instance); + TAILQ_INSERT_TAIL(&partitions[partno].nodelist, newvq, entries); + + if (qb_loop_poll_add(poll_loop, + QB_LOOP_MED, + newvq->fd, + POLLIN | POLLERR, + newvq, + vq_parent_read_fn)) { + perror("qb_loop_poll_add returned error"); + return (pid_t) -1; + } + + /* Send sync with all the nodes so far in it. */ + send_partition_to_nodes(&partitions[partno], 1); + return vq_get_pid(newvq->instance); + } + return (pid_t) -1; +} + +static size_t create_nodes_from_config(void) +{ + icmap_iter_t iter; + char tmp_key[ICMAP_KEYNAME_MAXLEN]; + uint32_t node_pos; + uint32_t nodeid; + const char *iter_key; + int res; + pid_t pid; + size_t ret = 0; + + init_partitions(); + + iter = icmap_iter_init("nodelist.node."); + while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) { + res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, tmp_key); + if (res != 2) { + continue; + } + + if (strcmp(tmp_key, "ring0_addr") != 0) { + continue; + } + + snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.nodeid", node_pos); + if (icmap_get_uint32(tmp_key, &nodeid) == CS_OK) { + pid = create_node(nodeid, 0); + if (pid == (pid_t) -1) { + fprintf(stderr, + "ERR: nodeid %d could not be spawned\n", + nodeid); + exit(1); + } + ret++; + } + + } + icmap_iter_finalize(iter); + + return ret; +} + +static struct vq_node *find_node(int nodeid) +{ + int i; + struct vq_node *vqn; + + for (i=0; inodeid == nodeid) { + return vqn; + } + } + } + return NULL; +} + +static struct vq_node *find_by_pid(pid_t pid) +{ + int i; + struct vq_node *vqn; + + for (i=0; iinstance) == pid) { + return vqn; + } + } + } + return NULL; +} + +/* Routines called from the parser */ +void cmd_start_new_node(int nodeid, int partition) +{ + struct vq_node *node; + + node = find_node(nodeid); + if (node) { + fprintf(stderr, "ERR: nodeid %d already exists in partition %d\n", nodeid, node->partition->num); + return; + } + qb_loop_poll_del(poll_loop, STDIN_FILENO); + create_node(nodeid, partition); + if (!nosync) { + /* Delay kb input handling by 0.25 second when we've just + added a node; expect that the delay will be cancelled + substantially earlier once it has reported its quorum info + (the delay is in fact a failsafe input enabler here) */ + qb_loop_timer_add(poll_loop, + QB_LOOP_MED, + 250000000, + NULL, + start_kb_input_timeout, + &kb_timer); + } +} + +void cmd_stop_all_nodes() +{ + int i; + struct vq_node *vqn; + + for (i=0; iinstance); + } + } +} + +void cmd_show_node_states() +{ + int i; + struct vq_node *vqn; + + for (i=0; iinstance); + + /* Node will be removed when the child process exits */ +} + +/* Move all nodes in 'nodelist' into partition 'partition' */ +void cmd_move_nodes(int partition, int num_nodes, int *nodelist) +{ + int i; + struct vq_node *node; + + for (i=0; ipartition->nodelist, node, entries); + + /* Add it to the new partition */ + TAILQ_INSERT_TAIL(&partitions[partition].nodelist, node, entries); + node->partition = &partitions[partition]; + } + else { + printf("ERR: node %d does not exist\n", nodelist[i]); + } + } +} + +/* Take all the nodes in part2 and join them to part1 */ +void cmd_join_partitions(int part1, int part2) +{ + struct vq_node *vqn; + + /* TAILQ_FOREACH is not delete safe *sigh* */ +retry: + TAILQ_FOREACH(vqn, &partitions[part2].nodelist, entries) { + + TAILQ_REMOVE(&vqn->partition->nodelist, vqn, entries); + TAILQ_INSERT_TAIL(&partitions[part1].nodelist, vqn, entries); + vqn->partition = &partitions[part1]; + + goto retry; + } +} + +void cmd_set_autofence(int onoff) +{ + autofence = onoff; + fprintf(output_file, "#autofence: %s\n", onoff?"on":"off"); +} + +void cmd_update_all_partitions(int newring) +{ + int i; + + check_for_quorum = 1; + for (i=0; iinstance, &node->partition->ring_id, onoff); + } +} + +/* ---------------------------------- */ + +#ifndef HAVE_READLINE_READLINE_H +static void dummy_read_char(void); + +static void dummy_read_char() +{ + int c, flush = 0; + + while (!flush) { + c = getchar(); + if (++input_buf_term >= INPUT_BUF_SIZE) { + if (c != '\n' && c != EOF) + fprintf(stderr, "User input overflows the limit: %zu\n", + (size_t) INPUT_BUF_SIZE); + input_buf[INPUT_BUF_SIZE - 1] = '\0'; + flush = 1; + } else if (c == '\n' || c == EOF) { + input_buf[input_buf_term - 1] = '\0'; + flush = 1; + } else { + input_buf[input_buf_term - 1] = c; + } + } + + parse_input_command((c == EOF) ? NULL : input_buf); + input_buf_term = 0; + + if (is_tty) { + printf("vqsim> "); + fflush(stdout); + } +} +#endif + +static int stdin_read_fn(int32_t fd, int32_t revents, void *data) +{ +#ifdef HAVE_READLINE_READLINE_H + /* Send it to readline */ + rl_callback_read_char(); +#else + dummy_read_char(); +#endif + return 0; +} + +static void start_kb_input(void) +{ + wait_count_to_unblock = 0; + +#ifdef HAVE_READLINE_READLINE_H + /* Readline will deal with completed lines when they arrive */ + rl_callback_handler_install("vqsim> ", parse_input_command); +#else + if (is_tty) { + printf("vqsim> "); + fflush(stdout); + } +#endif + + /* Send stdin to readline */ + if (qb_loop_poll_add(poll_loop, + QB_LOOP_MED, + STDIN_FILENO, + POLLIN | POLLERR, + NULL, + stdin_read_fn)) { + if (errno != EEXIST) { + perror("qb_loop_poll_add1 returned error"); + } + } +} + +static void start_kb_input_timeout(void *data) +{ +// fprintf(stderr, "Waiting for nodes to report status timed out\n"); + start_kb_input(); +} + +static void usage(char *program) +{ + printf("Usage:\n"); + printf("\n"); + printf("%s [-f ] [-o ]\n", program); + printf("\n"); + printf(" -f config file. defaults to /etc/corosync/corosync.conf\n"); + printf(" -o output file. defaults to stdout\n"); + printf(" -n no synchronization (on adding a node)\n"); + printf(" -h display this help text\n"); + printf("\n"); +} + +int main(int argc, char **argv) +{ + qb_loop_signal_handle sigchld_qb_handle; + int ch; + char *config_file_name = NULL; + char *output_file_name = NULL; + char envstring[PATH_MAX]; + + while ((ch = getopt (argc, argv, "f:o:nh")) != EOF) { + switch (ch) { + case 'f': + config_file_name = optarg; + break; + case 'o': + output_file_name = optarg; + break; + case 'n': + nosync = 1; + break; + default: + usage(argv[0]); + exit(0); + } + } + + if (config_file_name) { + sprintf(envstring, "COROSYNC_MAIN_CONFIG_FILE=%s", config_file_name); + putenv(envstring); + } + if (output_file_name) { + output_file = fopen(output_file_name, "w"); + if (!output_file) { + fprintf(stderr, "Unable to open %s for output: %s\n", output_file_name, strerror(errno)); + exit(-1); + } + } + else { + output_file = stdout; + } +#ifndef HAVE_READLINE_READLINE_H + is_tty = isatty(STDIN_FILENO); +#endif + + qb_log_filter_ctl(QB_LOG_SYSLOG, QB_LOG_FILTER_ADD, + QB_LOG_FILTER_FUNCTION, "*", LOG_DEBUG); + + qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); + qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, + QB_LOG_FILTER_FUNCTION, "*", LOG_DEBUG); + + poll_loop = qb_loop_create(); + + /* SIGCHLD handler to reap sub-processes and reconfigure the cluster */ + qb_loop_signal_add(poll_loop, + QB_LOOP_MED, + SIGCHLD, + NULL, + sigchld_handler, + &sigchld_qb_handle); + + /* Create a full cluster of nodes from corosync.conf */ + read_corosync_conf(); + if (create_nodes_from_config() && !nosync) { + /* Delay kb input handling by 1 second when we've just + added the nodes from corosync.conf; expect that + the delay will be cancelled substantially earlier + once they all have reported their quorum info + (the delay is in fact a failsafe input enabler here) */ + qb_loop_timer_add(poll_loop, + QB_LOOP_MED, + 1000000000, + NULL, + start_kb_input_timeout, + &kb_timer); + } else { + start_kb_input(); + } + + qb_loop_run(poll_loop); + return 0; +} diff --git a/vqsim/vqsim.h b/vqsim/vqsim.h new file mode 100644 index 00000000..0c999d74 --- /dev/null +++ b/vqsim/vqsim.h @@ -0,0 +1,77 @@ + +typedef enum {VQMSG_QUIT=1, + VQMSG_SYNC, /* set nodelist */ + VQMSG_QUORUM, /* quorum state of this 'node' */ + VQMSG_EXEC, /* message for exec_handler */ + VQMSG_QDEVICE, /* quorum device enable/disable */ + VQMSG_QUORUMQUIT, /* quit if you don't have quorum */ +} vqsim_msg_type_t; + +typedef struct vq_instance *vq_object_t; + +struct vqsim_msg_header +{ + vqsim_msg_type_t type; + int from_nodeid; + int param; +}; + +/* This is the sync sent from the controller process */ +struct vqsim_sync_msg +{ + struct vqsim_msg_header header; + struct memb_ring_id ring_id; + size_t view_list_entries; + unsigned int view_list[]; +}; + +/* This is just info sent from each VQ instance */ +struct vqsim_quorum_msg +{ + struct vqsim_msg_header header; + int quorate; + struct memb_ring_id ring_id; + size_t view_list_entries; + unsigned int view_list[]; +}; + +struct vqsim_exec_msg +{ + struct vqsim_msg_header header; + char execmsg[]; +}; + +struct vqsim_lib_msg +{ + struct vqsim_msg_header header; + char libmsg[]; +}; + +#define MAX_NODES 1024 +#define MAX_PARTITIONS 16 + +/* In vq_object.c */ +vq_object_t vq_create_instance(qb_loop_t *poll_loop, int nodeid); +void vq_quit(vq_object_t instance); +int vq_set_nodelist(vq_object_t instance, struct memb_ring_id *ring_id, int *nodeids, int nodeids_entries); +int vq_get_parent_fd(vq_object_t instance); +int vq_set_qdevice(vq_object_t instance, struct memb_ring_id *ring_id, int onoff); +int vq_quit_if_inquorate(vq_object_t instance); +pid_t vq_get_pid(vq_object_t instance); + +/* in vqsim_vq_engine.c - effectively the constructor */ +int fork_new_instance(int nodeid, int *vq_sock, pid_t *child_pid); + +/* In parser.c */ +void parse_input_command(char *cmd); + +/* These are in vqmain.c */ +void cmd_stop_node(int nodeid); +void cmd_stop_all_nodes(void); +void cmd_start_new_node(int nodeid, int partition); +void cmd_set_autofence(int onoff); +void cmd_move_nodes(int partition, int num_nodes, int *nodelist); +void cmd_join_partitions(int part1, int part2); +void cmd_update_all_partitions(int newring); +void cmd_qdevice_poll(int nodeid, int onoff); +void cmd_show_node_states(void); diff --git a/vqsim/vqsim_vq_engine.c b/vqsim/vqsim_vq_engine.c new file mode 100644 index 00000000..ee139d41 --- /dev/null +++ b/vqsim/vqsim_vq_engine.c @@ -0,0 +1,433 @@ + +/* This is the bit of VQSIM that runs in the forked process. + It represents a single votequorum instance or, if you like, + a 'node' in the cluster. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../exec/votequorum.h" +#include "../exec/service.h" +#include "../include/corosync/corotypes.h" +#include "../include/corosync/votequorum.h" +#include "../include/corosync/ipc_votequorum.h" +#include +#include + +#include "icmap.h" +#include "vqsim.h" + +#define QDEVICE_NAME "VQsim_qdevice" + +/* Static variables here are per-instance because we are forked */ +static struct corosync_service_engine *engine; +static int parent_socket; /* Our end of the socket */ +static char buffer[8192]; +static int our_nodeid; +static char *private_data; +static qb_loop_t *poll_loop; +static qb_loop_timer_handle sync_timer; +static qb_loop_timer_handle qdevice_timer; +static int we_are_quorate; +static void *fake_conn = (void*)1; +static cs_error_t last_lib_error; +static struct memb_ring_id current_ring_id; +static int qdevice_registered; +static unsigned int qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT; + +/* 'Keep the compiler happy' time */ +char *get_run_dir(void); +int api_timer_add_duration ( + unsigned long long nanosec_duration, + void *data, + void (*timer_fn) (void *data), + corosync_timer_handle_t *handle); + +static void api_error_memory_failure(void) __attribute__((noreturn)); +static void api_error_memory_failure() +{ + fprintf(stderr, "Out of memory error\n"); + exit(-1); +} +static void api_timer_delete(corosync_timer_handle_t th) +{ + qb_loop_timer_del(poll_loop, th); +} + +int api_timer_add_duration ( + unsigned long long nanosec_duration, + void *data, + void (*timer_fn) (void *data), + corosync_timer_handle_t *handle) +{ + return qb_loop_timer_add(poll_loop, + QB_LOOP_MED, + nanosec_duration, + data, + timer_fn, + handle); +} + +static unsigned int api_totem_nodeid_get(void) +{ + return our_nodeid; +} + +static int api_totem_mcast(const struct iovec *iov, unsigned int iovlen, unsigned int type) +{ + struct vqsim_msg_header header; + struct iovec iovec[iovlen+1]; + int total = sizeof(header); + int res; + int i; + + header.type = VQMSG_EXEC; + header.from_nodeid = our_nodeid; + header.param = 0; + + iovec[0].iov_base = &header; + iovec[0].iov_len = sizeof(header); + for (i=0; ierror; + return 0; +} + +static struct corosync_api_v1 corosync_api = { + .error_memory_failure = api_error_memory_failure, + .timer_delete = api_timer_delete, + .timer_add_duration = api_timer_add_duration, + .totem_nodeid_get = api_totem_nodeid_get, + .totem_mcast = api_totem_mcast, + .ipc_private_data_get = api_ipc_private_data_get, + .ipc_response_send = api_ipc_response_send, +}; + +/* -------------------- Above is all for providing the corosync_api support routines --------------------------------------------*/ +/* They need to be in the same file as the engine as they use the local 'poll_loop' variable which is per-process */ + +static void start_qdevice_poll(int longwait); +static void start_sync_timer(void); + +/* Callback from Votequorum to tell us about the quorum state */ +static void quorum_fn(const unsigned int *view_list, + size_t view_list_entries, + int quorate, struct memb_ring_id *ring_id) +{ + char msgbuf[8192]; + int len; + struct vqsim_quorum_msg *quorum_msg = (void*) msgbuf; + + we_are_quorate = quorate; + + /* Send back to parent */ + quorum_msg->header.type = VQMSG_QUORUM; + quorum_msg->header.from_nodeid = our_nodeid; + quorum_msg->header.param = 0; + quorum_msg->quorate = quorate; + memcpy(&quorum_msg->ring_id, ring_id, sizeof(*ring_id)); + quorum_msg->view_list_entries = view_list_entries; + + memcpy(quorum_msg->view_list, view_list, sizeof(unsigned int)*view_list_entries); + + if ( (len=write(parent_socket, msgbuf, sizeof(*quorum_msg) + sizeof(unsigned int)*view_list_entries)) <= 0) { + perror("write (view list to parent) failed"); + } + memcpy(¤t_ring_id, ring_id, sizeof(*ring_id)); +} + +char *corosync_service_link_and_init(struct corosync_api_v1 *api, + struct default_service *service_engine) +{ + /* dummy */ + return NULL; +} + +/* For votequorum */ +char *get_run_dir() +{ + static char cwd_buffer[PATH_MAX]; + + return getcwd(cwd_buffer, PATH_MAX); +} + +static int load_quorum_instance(struct corosync_api_v1 *api) +{ + const char *error_string; + int res; + + error_string = votequorum_init(api, quorum_fn); + if (error_string) { + fprintf(stderr, "Votequorum init failed: %s\n", error_string); + return -1; + } + + engine = votequorum_get_service_engine_ver0(); + error_string = engine->exec_init_fn(api); + if (error_string) { + fprintf(stderr, "votequorum exec init failed: %s\n", error_string); + return -1; + } + + private_data = malloc(engine->private_data_size); + if (!private_data) { + perror("Malloc in child failed"); + return -1; + } + + res = engine->lib_init_fn(fake_conn); + + return res; +} + +static void sync_dispatch_fn(void *data) +{ + if (engine->sync_process()) { + start_sync_timer(); + } + else { + engine->sync_activate(); + } +} + +static void start_sync_timer() +{ + qb_loop_timer_add(poll_loop, + QB_LOOP_MED, + 10000000, + NULL, + sync_dispatch_fn, + &sync_timer); +} + +static void send_sync(char *buf, int len) +{ + struct vqsim_sync_msg *msg = (void*)buf; + + /* Votequorum doesn't use the transitional node list :-) */ + engine->sync_init(NULL, 0, + msg->view_list, msg->view_list_entries, + &msg->ring_id); + + start_sync_timer(); +} + +static void send_exec_msg(char *buf, int len) +{ + struct vqsim_exec_msg *execmsg = (void*)buf; + struct qb_ipc_request_header *qb_header = (void*)execmsg->execmsg; + + engine->exec_engine[qb_header->id & 0xFFFF].exec_handler_fn(execmsg->execmsg, execmsg->header.from_nodeid); +} + +static int send_lib_msg(int type, void *msg) +{ + /* Clear this as not all lib functions return a response immediately */ + last_lib_error = CS_OK; + + engine->lib_engine[type].lib_handler_fn(fake_conn, msg); + + return last_lib_error; +} + +static int poll_qdevice(int onoff) +{ + struct req_lib_votequorum_qdevice_poll pollmsg; + int res; + + pollmsg.cast_vote = onoff; + pollmsg.ring_id.nodeid = current_ring_id.rep.nodeid; + pollmsg.ring_id.seq = current_ring_id.seq; + strcpy(pollmsg.name, QDEVICE_NAME); + + res = send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_POLL, &pollmsg); + if (res != CS_OK) { + fprintf(stderr, "%d: qdevice poll failed: %d\n", our_nodeid, res); + } + return res; +} + +static void qdevice_dispatch_fn(void *data) +{ + if (poll_qdevice(1) == CS_OK) { + start_qdevice_poll(0); + } +} + +static void start_qdevice_poll(int longwait) +{ + unsigned long long timeout; + + timeout = (unsigned long long)qdevice_timeout*500000; /* Half the corosync timeout */ + if (longwait) { + timeout *= 2; + } + + qb_loop_timer_add(poll_loop, + QB_LOOP_MED, + timeout, + NULL, + qdevice_dispatch_fn, + &qdevice_timer); +} + +static void stop_qdevice_poll(void) +{ + qb_loop_timer_del(poll_loop, qdevice_timer); + qdevice_timer = 0; +} + +static void do_qdevice(int onoff) +{ + int res; + + if (onoff) { + if (!qdevice_registered) { + struct req_lib_votequorum_qdevice_register regmsg; + + strcpy(regmsg.name, QDEVICE_NAME); + if ( (res=send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_REGISTER, ®msg)) == CS_OK) { + qdevice_registered = 1; + start_qdevice_poll(1); + } + else { + fprintf(stderr, "%d: qdevice registration failed: %d\n", our_nodeid, res); + } + } + else { + if (!qdevice_timer) { + start_qdevice_poll(0); + } + } + } + else { + poll_qdevice(0); + stop_qdevice_poll(); + } +} + + +/* From controller */ +static int parent_pipe_read_fn(int32_t fd, int32_t revents, void *data) +{ + struct vqsim_msg_header *header = (void*)buffer; + int len; + + len = read(fd, buffer, sizeof(buffer)); + if (len > 0) { + /* Check header and route */ + switch (header->type) { + case VQMSG_QUIT: + exit(0); + break; + case VQMSG_EXEC: /* For votequorum exec messages */ + send_exec_msg(buffer, len); + break; + case VQMSG_SYNC: + send_sync(buffer, len); + break; + case VQMSG_QDEVICE: + do_qdevice(header->param); + break; + case VQMSG_QUORUMQUIT: + if (!we_are_quorate) { + exit(1); + } + break; + case VQMSG_QUORUM: + /* not used here */ + break; + } + } + return 0; +} + +static void initial_sync(int nodeid) +{ + unsigned int trans_list[1] = {nodeid}; + unsigned int member_list[1] = {nodeid}; + struct memb_ring_id ring_id; + + ring_id.rep.nodeid = our_nodeid; + ring_id.seq = 1; + + /* cluster with just us in it */ + engine->sync_init(trans_list, 1, + member_list, 1, + &ring_id); + start_sync_timer(); +} + +/* Return pipe FDs & child PID if sucessful */ +int fork_new_instance(int nodeid, int *vq_sock, pid_t *childpid) +{ + int pipes[2]; + pid_t pid; + + if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0, pipes)) { + return -1; + } + parent_socket = pipes[0]; + + switch ( (pid=fork()) ) { + case -1: + perror("fork failed"); + return -1; + case 0: + /* child process - continue below */ + break; + default: + /* parent process */ + *vq_sock = pipes[1]; + *childpid = pid; + return 0; + } + + our_nodeid = nodeid; + poll_loop = qb_loop_create(); + + if (icmap_get_uint32("quorum.device.timeout", &qdevice_timeout) != CS_OK) { + qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT; + } + + load_quorum_instance(&corosync_api); + + qb_loop_poll_add(poll_loop, + QB_LOOP_MED, + parent_socket, + POLLIN, + NULL, + parent_pipe_read_fn); + + /* Start it up! */ + initial_sync(nodeid); + qb_loop_run(poll_loop); + + return 0; +}