diff --git a/conf/lenses/corosync.aug b/conf/lenses/corosync.aug index a94e6892..23458f34 100644 --- a/conf/lenses/corosync.aug +++ b/conf/lenses/corosync.aug @@ -89,7 +89,7 @@ let common_logging = |kv "logfile_priority" /alert|crit|debug|emerg|err|info|notice|warning/ |kv "syslog_priority" /alert|crit|debug|emerg|err|info|notice|warning/ |kv "syslog_facility" /daemon|local0|local1|local2|local3|local4|local5|local6|local7/ - |qstr /logfile|tags/ + |qstr /logfile|trace/ (* A logger_subsys subsection *) let logger_subsys = diff --git a/cts/agents/Makefile.am b/cts/agents/Makefile.am index 8a3278a9..3e1f4195 100644 --- a/cts/agents/Makefile.am +++ b/cts/agents/Makefile.am @@ -58,7 +58,7 @@ endif noinst_HEADERS = common_test_agent.h cpg_test_agent_SOURCES = cpg_test_agent.c common_test_agent.c -cpg_test_agent_LDADD = -lcpg -lcoroipcc ../../exec/coropoll.o ../../exec/crypto.o +cpg_test_agent_LDADD = -lcpg -lcfg -lcoroipcc ../../exec/coropoll.o ../../exec/crypto.o cpg_test_agent_LDFLAGS = -L../../lib -L. confdb_test_agent_SOURCES = confdb_test_agent.c common_test_agent.c diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c index 1b6a0671..e94cb6bc 100644 --- a/cts/agents/cpg_test_agent.c +++ b/cts/agents/cpg_test_agent.c @@ -50,6 +50,7 @@ #include #include #include +#include #include "../../exec/crypto.h" #include "common_test_agent.h" @@ -82,7 +83,9 @@ static char big_and_buf[HOW_BIG_AND_BUF]; static int32_t record_config_events_g = 0; static int32_t record_messages_g = 0; static cpg_handle_t cpg_handle = 0; +static corosync_cfg_handle_t cfg_handle = 0; static int32_t cpg_fd = -1; +static int32_t cfg_fd = -1; static struct list_head config_chg_log_head; static struct list_head msg_log_head; static pid_t my_pid; @@ -91,7 +94,8 @@ static int32_t my_seq; static int32_t use_zcb = 0; static int32_t my_msgs_to_send; static int32_t total_stored_msgs = 0; - +static int32_t in_cnchg = 0; +static int32_t pcmk_test = 0; static void send_some_more_messages (void * unused); @@ -171,6 +175,11 @@ static void delivery_callback ( err_status_string (status_buf, 20, status)); list_add_tail (&log_pt->list, &msg_log_head); total_stored_msgs++; + +// if ((total_stored_msgs % 100) == 0) { +// syslog (LOG_INFO, "%s(); %d", __func__, total_stored_msgs); +// } + } static void config_change_callback ( @@ -209,8 +218,27 @@ static void config_change_callback ( list_add_tail (&log_pt->list, &config_chg_log_head); } } + if (pcmk_test == 1) { + in_cnchg = 1; + send_some_more_messages (NULL); + in_cnchg = 0; + } } +static void my_shutdown_callback (corosync_cfg_handle_t handle, + corosync_cfg_shutdown_flags_t flags) +{ + syslog (LOG_CRIT, "%s flags:%d", __func__, flags); + if (flags & COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) { + corosync_cfg_replyto_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_YES); + } +} + + +static corosync_cfg_callbacks_t cfg_callbacks = { + .corosync_cfg_shutdown_callback = my_shutdown_callback, + .corosync_cfg_state_track_callback = NULL, +}; static cpg_callbacks_t callbacks = { .cpg_deliver_fn = delivery_callback, .cpg_confchg_fn = config_change_callback, @@ -349,8 +377,10 @@ static void send_some_more_messages_zcb (void) if (res == CS_ERR_TRY_AGAIN) { /* lets do this later */ send_some_more_messages_later (); - syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.", - __func__); +// if (i > 0) { +// syslog (LOG_INFO, "%s() TRY_AGAIN %d to send.", +// __func__, my_msgs_to_send); +// } goto free_buffer; } else if (res != CS_OK) { syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.", @@ -365,6 +395,13 @@ free_buffer: } +#define cs_repeat(counter, max, code) do { \ + code; \ + if(res == CS_ERR_TRY_AGAIN) { \ + counter++; \ + sleep(counter); \ + } \ + } while(res == CS_ERR_TRY_AGAIN && counter < max) static unsigned char buffer[200000]; static void send_some_more_messages_normal (void) @@ -377,6 +414,8 @@ static void send_some_more_messages_normal (void) hash_state sha1_hash; cs_error_t res; cpg_flow_control_state_t fc_state; + int retries = 0; + time_t before; if (cpg_fd < 0) return; @@ -402,29 +441,43 @@ static void send_some_more_messages_normal (void) iov[1].iov_base = buffer; for (i = 0; i < send_now; i++) { - - res = cpg_flow_control_state_get (cpg_handle, &fc_state); - if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { - /* lets do this later */ - send_some_more_messages_later (); - syslog (LOG_INFO, "%s() flow control enabled.", __func__); - return; - } - - res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2); - if (res == CS_ERR_TRY_AGAIN) { - /* lets do this later */ - send_some_more_messages_later (); - syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.", - __func__); - return; - } else + if (in_cnchg && pcmk_test) { + retries = 0; + before = time(NULL); + cs_repeat(retries, 30, res = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 1)); + if (retries > 20) { + syslog (LOG_ERR, "%s() -> cs_repeat: blocked for :%lu secs.", + __func__, (unsigned long)(time(NULL) - before)); + } if (res != CS_OK) { + syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d.", + __func__, res); + return; + } + } else { + res = cpg_flow_control_state_get (cpg_handle, &fc_state); + if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { + /* lets do this later */ + send_some_more_messages_later (); + syslog (LOG_INFO, "%s() flow control enabled.", __func__); + return; + } + + res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2); + if (res == CS_ERR_TRY_AGAIN) { + /* lets do this later */ + send_some_more_messages_later (); + if (i > 0) { + syslog (LOG_INFO, "%s() TRY_AGAIN %d to send.", + __func__, my_msgs_to_send); + } + return; + } else if (res != CS_OK) { syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.", __func__, res); exit (-2); } - + } my_msgs_to_send--; } } @@ -445,6 +498,7 @@ static void msg_blaster (int sock, char* num_to_send_str) my_pid = getpid(); use_zcb = 0; + total_stored_msgs = 0; cpg_local_get (cpg_handle, &my_nodeid); @@ -481,6 +535,7 @@ static void msg_blaster_zcb (int sock, char* num_to_send_str) my_pid = getpid(); use_zcb = 1; + total_stored_msgs = 0; cpg_local_get (cpg_handle, &my_nodeid); @@ -493,18 +548,52 @@ static void msg_blaster_zcb (int sock, char* num_to_send_str) send_some_more_messages_zcb (); } +static corosync_cfg_state_notification_t notification_buffer; + +static int cfg_dispatch_wrapper_fn (hdb_handle_t handle, + int fd, + int revents, + void *data) +{ + cs_error_t error; + if (revents & POLLHUP || revents & POLLERR) { + syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CFG", __func__); + poll_dispatch_delete (ta_poll_handle_get(), cfg_fd); + close (cfg_fd); + cfg_fd = -1; + return -1; + } + error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL); + if (error == CS_ERR_LIBRARY) { + syslog (LOG_ERR, "%s() got LIB error disconnecting from CFG.", __func__); + poll_dispatch_delete (ta_poll_handle_get(), cfg_fd); + close (cfg_fd); + cfg_fd = -1; + return -1; + } + return 0; +} static int cpg_dispatch_wrapper_fn (hdb_handle_t handle, int fd, int revents, void *data) { - cs_error_t error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL); - if (error == CS_ERR_LIBRARY) { - syslog (LOG_ERR, "%s() got LIB error disconnecting from corosync.", __func__); + cs_error_t error; + if (revents & POLLHUP || revents & POLLERR) { + syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CPG", __func__); poll_dispatch_delete (ta_poll_handle_get(), cpg_fd); close (cpg_fd); cpg_fd = -1; + return -1; + } + error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL); + if (error == CS_ERR_LIBRARY) { + syslog (LOG_ERR, "%s() got LIB error disconnecting from CPG", __func__); + poll_dispatch_delete (ta_poll_handle_get(), cpg_fd); + close (cpg_fd); + cpg_fd = -1; + return -1; } return 0; } @@ -595,6 +684,8 @@ static void do_command (int sock, char* func, char*args[], int num_args) read_messages (sock, args[0]); } else if (strcmp ("msg_blaster_zcb", func) == 0) { msg_blaster_zcb (sock, args[0]); + } else if (strcmp ("pcmk_test", func) == 0) { + pcmk_test = 1; } else if (strcmp ("msg_blaster", func) == 0) { msg_blaster (sock, args[0]); } else if (strcmp ("context_test", func) == 0) { @@ -602,6 +693,33 @@ static void do_command (int sock, char* func, char*args[], int num_args) } else if (strcmp ("are_you_ok_dude", func) == 0) { snprintf (response, 100, "%s", OK_STR); send (sock, response, strlen (response), 0); + + } else if (strcmp ("cfg_shutdown", func) == 0) { + + corosync_cfg_try_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST); + + } else if (strcmp ("cfg_initialize",func) == 0) { + int retry_count = 0; + + syslog (LOG_INFO,"%s %s() called!", __func__, func); + result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks); + while (result != CS_OK) { + syslog (LOG_ERR, + "cfg_initialize error %d (attempt %d)\n", + result, retry_count); + if (retry_count >= 3) { + exit (1); + } + sleep(1); + retry_count++; + result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks); + } + + corosync_cfg_fd_get (cfg_handle, &cfg_fd); + + corosync_cfg_state_track (cfg_handle, 0, ¬ification_buffer); + + poll_dispatch_add (ta_poll_handle_get(), cfg_fd, POLLIN|POLLNVAL, NULL, cfg_dispatch_wrapper_fn); } else { syslog (LOG_ERR,"%s RPC:%s not supported!", __func__, func); } diff --git a/cts/corotests.py b/cts/corotests.py index 10bdfc0e..22d8c79f 100644 --- a/cts/corotests.py +++ b/cts/corotests.py @@ -79,6 +79,7 @@ class CoroTest(CTSTest): if self.need_all_up and self.CM.start_cpg: self.CM.cpg_agent[n].clean_start() self.CM.cpg_agent[n].cpg_join(self.name) + self.CM.cpg_agent[n].cfg_initialize() if not self.need_all_up and self.CM.StataCM(n): self.incr("stopped") self.stop(n) @@ -776,8 +777,6 @@ class QuorumState(object): ################################################################### class VoteQuorumBase(CoroTest): - ''' - ''' def setup(self, node): ret = CoroTest.setup(self, node) @@ -796,8 +795,9 @@ class VoteQuorumBase(CoroTest): def config_valid(self, config): if config.has_key('totem/rrp_mode'): return False - else: - return True + if config.has_key('quorum/provider'): + return False + return True ################################################################### @@ -849,7 +849,7 @@ class VoteQuorumGoDown(VoteQuorumBase): self.failure('unexpected number of expected_votes') if state.total_votes != nodes_alive: - self.failure('unexpected number of total votes') + self.failure('unexpected number of total votes:%d, nodes_alive:%d', (state.total_votes, nodes_alive)) min = ((len(self.CM.Env["nodes"]) + 2) / 2) if min != state.quorum: @@ -1019,9 +1019,58 @@ class GenSimulStop(CoroTest): return self.success() +################################################################### +class GenStopAllBeekhof(CoroTest): + '''Stop all the nodes ~ simultaneously''' + + def __init__(self, cm): + CoroTest.__init__(self,cm) + self.name="GenStopAllBeekhof" + self.need_all_up = True + + def __call__(self, node): + '''Perform the 'GenStopAllBeekhof' test. ''' + self.incr("calls") + + stopping = int(time.time()) + for n in self.CM.Env["nodes"]: + self.CM.cpg_agent[n].pcmk_test() + self.CM.cpg_agent[n].msg_blaster(10000) + self.CM.cpg_agent[n].cfg_shutdown() + self.CM.ShouldBeStatus[n] = "down" + + waited = 0 + max_wait = 60 + + still_up = list(self.CM.Env["nodes"]) + while len(still_up) > 0: + waited = int(time.time()) - stopping + self.CM.log("%s still up %s; waited %d secs" % (self.name, str(still_up), waited)) + if waited > max_wait: + break + time.sleep(3) + for v in self.CM.Env["nodes"]: + if v in still_up: + self.CM.ShouldBeStatus[n] = "down" + if not self.CM.StataCM(v): + still_up.remove(v) + + waited = int(time.time()) - stopping + if waited > max_wait: + for v in still_up: + self.CM.log("%s killing corosync on %s" % (self.name, v)) + self.CM.rsh(v, 'killall -SIGSEGV corosync cpg_test_agent') + return self.failure("Waited %d secs for nodes: %s to stop" % (waited, str(still_up))) + + self.CM.log("%s ALL good (waited %d secs)" % (self.name, waited)) + return self.success() + + + GenTestClasses = [] GenTestClasses.append(GenSimulStart) GenTestClasses.append(GenSimulStop) +GenTestClasses.append(GenStopAllBeekhof) GenTestClasses.append(CpgMsgOrderBasic) GenTestClasses.append(CpgMsgOrderZcb) GenTestClasses.append(CpgCfgChgOnExecCrash) @@ -1083,19 +1132,38 @@ def CoroTestList(cm, audits): a = ConfigContainer('none_5min') a['compatibility'] = 'none' a['totem/token'] = (5 * 60 * 1000) + a['totem/consensus'] = int(5 * 60 * 1000 * 1.2) + 1 configs.append(a) - b = ConfigContainer('whitetank_5min') + b = ConfigContainer('pcmk_basic') b['compatibility'] = 'whitetank' - b['totem/token'] = (5 * 60 * 1000) + b['totem/token'] = 5000 + b['totem/token_retransmits_before_loss_const'] = 10 + b['totem/join'] = 1000 + b['totem/consensus'] = 7500 configs.append(b) - c = ConfigContainer('sec_nss') + c = ConfigContainer('pcmk_sec_nss') c['totem/secauth'] = 'on' c['totem/crypto_accept'] = 'new' c['totem/crypto_type'] = 'nss' + c['totem/token'] = 5000 + c['totem/token_retransmits_before_loss_const'] = 10 + c['totem/join'] = 1000 + c['totem/consensus'] = 7500 configs.append(c) + s = ConfigContainer('pcmk_vq') + s['quorum/provider'] = 'corosync_votequorum' + s['quorum/expected_votes'] = len(cm.Env["nodes"]) + s['totem/token'] = 5000 + s['totem/token_retransmits_before_loss_const'] = 10 + s['totem/join'] = 1000 + s['totem/vsftype'] = 'none' + s['totem/consensus'] = 7500 + s['totem/max_messages'] = 20 + configs.append(s) + d = ConfigContainer('sec_sober') d['totem/secauth'] = 'on' d['totem/crypto_type'] = 'sober' @@ -1105,11 +1173,6 @@ def CoroTestList(cm, audits): e['totem/threads'] = 4 configs.append(e) - #quorum/provider= - #f = {} - #f['quorum/provider'] = 'corosync_quorum_ykd' - #configs.append(f) - if not cm.Env["RrpBindAddr"] is None: g = ConfigContainer('rrp_passive') g['totem/rrp_mode'] = 'passive'