diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c index 9db370d2..de47ca80 100644 --- a/cts/agents/cpg_test_agent.c +++ b/cts/agents/cpg_test_agent.c @@ -96,36 +96,35 @@ static int32_t my_msgs_to_send; static int32_t total_stored_msgs = 0; static hdb_handle_t poll_handle; + +static void send_some_more_messages (void * unused); + static char* err_status_string (char * buf, size_t buf_len, msg_status_t status) { switch (status) { case MSG_OK: strncpy (buf, "OK", buf_len); - return buf; break; case MSG_NODEID_ERR: strncpy (buf, "NODEID_ERR", buf_len); - return buf; break; case MSG_PID_ERR: strncpy (buf, "PID_ERR", buf_len); - return buf; break; case MSG_SEQ_ERR: strncpy (buf, "SEQ_ERR", buf_len); - return buf; break; case MSG_SIZE_ERR: strncpy (buf, "SIZE_ERR", buf_len); - return buf; break; case MSG_SHA1_ERR: - default: strncpy (buf, "SHA1_ERR", buf_len); - return buf; + break; + default: + strncpy (buf, "UNKNOWN_ERR", buf_len); break; } - + return buf; } static void delivery_callback ( @@ -160,7 +159,7 @@ static void delivery_callback ( status = MSG_SIZE_ERR; } sha1_init (&sha1_hash); - sha1_process (&sha1_hash, msg_pt->payload, msg_pt->size); + sha1_process (&sha1_hash, msg_pt->payload, (msg_pt->size - sizeof (msg_t))); sha1_done (&sha1_hash, sha1_compare); if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) { syslog (LOG_ERR, "%s(); msg seq:%d; incorrect hash", @@ -285,8 +284,19 @@ static void read_messages (int sock, char* atmost_str) send (sock, big_and_buf, strlen (big_and_buf), 0); } +static void send_some_more_messages_later (void) +{ + poll_timer_handle timer_handle; + cpg_dispatch (cpg_handle, CS_DISPATCH_ALL); + poll_timer_add ( + poll_handle, + 100, NULL, + send_some_more_messages, + &timer_handle); +} + static unsigned char buffer[200000]; -static void send_some_more_messages (void) +static void send_some_more_messages (void * unused) { msg_t my_msg; struct iovec iov[2]; @@ -302,7 +312,7 @@ static void send_some_more_messages (void) send_now = my_msgs_to_send; - syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now); + //syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now); my_msg.pid = my_pid; my_msg.nodeid = my_nodeid; payload_size = (rand() % 100000); @@ -325,6 +335,7 @@ static void send_some_more_messages (void) res = cpg_flow_control_state_get (cpg_handle, &fc_state); if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { /* lets do this later */ + send_some_more_messages_later (); syslog (LOG_INFO, "%s() flow control enabled.", __func__); return; } @@ -332,6 +343,7 @@ static void send_some_more_messages (void) res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2); if (res == CS_ERR_TRY_AGAIN) { /* lets do this later */ + send_some_more_messages_later (); syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.", __func__); return; @@ -357,10 +369,10 @@ static void msg_blaster (int sock, char* num_to_send_str) /* control the limits */ if (my_msgs_to_send <= 0) my_msgs_to_send = 1; - if (my_msgs_to_send > 1000) - my_msgs_to_send = 1000; + if (my_msgs_to_send > 10000) + my_msgs_to_send = 10000; - send_some_more_messages (); + send_some_more_messages (NULL); } @@ -541,7 +553,7 @@ static int server_process_data_fn (hdb_handle_t handle, exit (0); } else { if (my_msgs_to_send > 0) - send_some_more_messages (); + send_some_more_messages (NULL); big_and_buf_rx[nbytes] = '\0'; diff --git a/cts/corotests.py b/cts/corotests.py index d07eb5f8..92ea570f 100644 --- a/cts/corotests.py +++ b/cts/corotests.py @@ -44,19 +44,12 @@ class CoroTest(CTSTest): CTSTest.__init__(self,cm) self.start = StartTest(cm) self.stop = StopTest(cm) + self.config = {} def setup(self, node): ret = CTSTest.setup(self, node) - self.CM.apply_new_config() - for n in self.CM.Env["nodes"]: - if not self.CM.StataCM(n): - self.incr("started") - self.start(n) - return ret - - - def setup_sec_key(self, node): + # setup the authkey localauthkey = '/tmp/authkey' if not os.path.exists(localauthkey): self.CM.rsh(node, 'corosync-keygen') @@ -67,6 +60,20 @@ class CoroTest(CTSTest): #copy key onto other nodes self.CM.rsh.cp(localauthkey, "%s:%s" % (n, "/etc/corosync/authkey")) + # copy over any new config + for c in self.config: + self.CM.new_config[c] = self.config[c] + + # apply the config + self.CM.apply_new_config() + + # start any killed corosync's + for n in self.CM.Env["nodes"]: + if not self.CM.StataCM(n): + self.incr("started") + self.start(n) + return ret + def teardown(self, node): self.CM.apply_default_config() @@ -190,42 +197,12 @@ class CpgCfgChgOnExecCrash(CpgConfigChangeBase): ################################################################### -class CpgCfgChgOnNodeLeave_v2(CpgConfigChangeBase): +class CpgCfgChgOnNodeIsolate(CpgConfigChangeBase): def __init__(self, cm): CpgConfigChangeBase.__init__(self,cm) - self.name="CpgCfgChgOnNodeLeave_v2" + self.name="CpgCfgChgOnNodeIsolate" - def setup(self, node): - self.CM.new_config['compatibility'] = 'none' - self.CM.new_config['totem/token'] = 10000 - return CpgConfigChangeBase.setup(self, node) - - def failure_action(self): - self.CM.log("isolating node " + self.wobbly) - self.CM.isolate_node(self.wobbly) - - def __call__(self, node): - self.incr("calls") - self.failure_action() - return self.wait_for_config_change() - - def teardown(self, node): - self.CM.unisolate_node (self.wobbly) - return CpgConfigChangeBase.teardown(self, node) - -################################################################### -class CpgCfgChgOnNodeLeave_v1(CpgConfigChangeBase): - - def __init__(self, cm): - CpgConfigChangeBase.__init__(self,cm) - self.name="CpgCfgChgOnNodeLeave_v1" - - def setup(self, node): - self.CM.new_config['compatibility'] = 'whitetank' - self.CM.new_config['totem/token'] = 10000 - return CpgConfigChangeBase.setup(self, node) - def failure_action(self): self.CM.log("isolating node " + self.wobbly) self.CM.isolate_node(self.wobbly) @@ -268,15 +245,13 @@ class CpgMsgOrderBase(CoroTest): for n in self.CM.Env["nodes"]: msgs[n] = [] - got = False stopped = False - self.CM.debug( " getting messages from " + n ) + waited = 0 - while len(msgs[n]) < self.total_num_msgs and not stopped: + while len(msgs[n]) < self.total_num_msgs and waited < 60: msg = self.CM.agent[n].read_messages(25) if not msg == None: - got = True msgl = msg.split(";") # remove empty entries @@ -288,106 +263,52 @@ class CpgMsgOrderBase(CoroTest): not_done = False msgs[n].extend(msgl) - elif msg == None and got: - self.CM.debug(" done getting messages from " + n) - stopped = True - - if not got: + elif msg == None: time.sleep(1) + waited = waited + 1 + + if len(msgs[n]) < self.total_num_msgs: + return self.failure("expected %d messages from %s got %d" % (self.total_num_msgs, n, len(msgs[n]))) fail = False + error_message = '' for i in range(0, self.total_num_msgs): first = None for n in self.CM.Env["nodes"]: + # first test for errors + params = msgs[n][i].split(":") + if not 'OK' in params[3]: + fail = True + error_message = 'error: ' + params[3] + ' in received message' + self.CM.log(str(params)) + + # then look for out of order messages if first == None: first = n else: if not msgs[first][i] == msgs[n][i]: # message order not the same! fail = True + error_message = 'message out of order' self.CM.log(msgs[first][i] + " != " + msgs[n][i]) if fail: - return self.failure() + return self.failure(error_message) else: return self.success() ################################################################### class CpgMsgOrderBasic(CpgMsgOrderBase): ''' - each sends & logs 100 messages + each sends & logs 1000 messages ''' def __init__(self, cm): CpgMsgOrderBase.__init__(self,cm) self.name="CpgMsgOrderBasic" + self.num_msgs_per_node = 9000 def __call__(self, node): self.incr("calls") - - self.num_msgs_per_node = 100 - self.cpg_msg_blaster() - return self.wait_and_validate_order() - -class CpgMsgOrderThreads(CpgMsgOrderBase): - ''' - each sends & logs 100 messages - ''' - def __init__(self, cm): - CpgMsgOrderBase.__init__(self,cm) - self.name="CpgMsgOrderThreads" - - def setup(self, node): - self.CM.new_config['totem/threads'] = 4 - return CpgMsgOrderBase.setup(self, node) - - def __call__(self, node): - self.incr("calls") - - self.num_msgs_per_node = 100 - self.cpg_msg_blaster() - return self.wait_and_validate_order() - - -class CpgMsgOrderSecNss(CpgMsgOrderBase): - ''' - each sends & logs 100 messages - ''' - def __init__(self, cm): - CpgMsgOrderBase.__init__(self,cm) - self.name="CpgMsgOrderSecNss" - - def setup(self, node): - self.setup_sec_key(node) - self.CM.new_config['totem/secauth'] = 'on' - self.CM.new_config['totem/crypto_accept'] = 'new' - self.CM.new_config['totem/crypto_type'] = 'nss' - return CpgMsgOrderBase.setup(self, node) - - def __call__(self, node): - self.incr("calls") - - self.num_msgs_per_node = 100 - self.cpg_msg_blaster() - return self.wait_and_validate_order() - -class CpgMsgOrderSecSober(CpgMsgOrderBase): - ''' - each sends & logs 100 messages - ''' - def __init__(self, cm): - CpgMsgOrderBase.__init__(self,cm) - self.name="CpgMsgOrderSecSober" - - def setup(self, node): - self.setup_sec_key(node) - self.CM.new_config['totem/secauth'] = 'on' - self.CM.new_config['totem/crypto_type'] = 'sober' - return CpgMsgOrderBase.setup(self, node) - - def __call__(self, node): - self.incr("calls") - - self.num_msgs_per_node = 100 self.cpg_msg_blaster() return self.wait_and_validate_order() @@ -506,22 +427,17 @@ class ServiceLoadTest(CoroTest): return self.success() - +GenTestClasses = [] +GenTestClasses.append(CpgMsgOrderBasic) +GenTestClasses.append(CpgCfgChgOnExecCrash) +GenTestClasses.append(CpgCfgChgOnGroupLeave) +GenTestClasses.append(CpgCfgChgOnNodeLeave) +GenTestClasses.append(CpgCfgChgOnNodeIsolate) AllTestClasses = [] AllTestClasses.append(ServiceLoadTest) -AllTestClasses.append(CpgMsgOrderBasic) -AllTestClasses.append(CpgMsgOrderThreads) -AllTestClasses.append(CpgMsgOrderSecNss) -AllTestClasses.append(CpgMsgOrderSecSober) AllTestClasses.append(MemLeakObject) AllTestClasses.append(MemLeakSession) -AllTestClasses.append(CpgCfgChgOnExecCrash) -AllTestClasses.append(CpgCfgChgOnGroupLeave) -AllTestClasses.append(CpgCfgChgOnNodeLeave) -AllTestClasses.append(CpgCfgChgOnNodeLeave_v1) -AllTestClasses.append(CpgCfgChgOnNodeLeave_v2) - AllTestClasses.append(FlipTest) AllTestClasses.append(RestartTest) AllTestClasses.append(StartOnebyOne) @@ -534,10 +450,58 @@ AllTestClasses.append(RestartOnebyOne) def CoroTestList(cm, audits): result = [] + configs = [] + empty = {} + configs.append(empty) + + a = {} + a['compatibility'] = 'none' + a['totem/token'] = 10000 + configs.append(a) + + b = {} + b['compatibility'] = 'whitetank' + b['totem/token'] = 10000 + configs.append(b) + + c = {} + c['totem/secauth'] = 'on' + c['totem/crypto_accept'] = 'new' + c['totem/crypto_type'] = 'nss' + configs.append(c) + + d = {} + d['totem/secauth'] = 'on' + d['totem/crypto_type'] = 'sober' + configs.append(d) + + e = {} + e['totem/threads'] = 4 + configs.append(e) + + #quorum/provider= + f = {} + f['quorum/provider'] = 'corosync_quorum_ykd' + configs.append(f) + + num=1 + for cfg in configs: + for testclass in GenTestClasses: + bound_test = testclass(cm) + if bound_test.is_applicable(): + bound_test.Audits = audits + bound_test.config = cfg + bound_test.name = bound_test.name + '_' + str(num) + result.append(bound_test) + num = num + 1 + + + for testclass in AllTestClasses: bound_test = testclass(cm) if bound_test.is_applicable(): bound_test.Audits = audits result.append(bound_test) + return result