From f047d8f44f508219adcf66502cf9e29f03a5b8ef Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Fri, 17 Jun 2005 18:36:36 +0000 Subject: [PATCH] defect 577 Implement token holding mode (Logical change 1.207) git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@670 fd59a12c-fef9-0310-b244-a6a79926bd2f --- QUICKSTART | 4 +- exec/Makefile | 10 +- exec/parse.c | 21 +++-- exec/parse.h | 2 + exec/totempg.c | 2 + exec/totemsrp.c | 236 ++++++++++++++++++++++++++++++++++++++---------- exec/totemsrp.h | 2 + test/evsbench.c | 20 ++-- test/test.cpp | 44 ++++++--- 9 files changed, 260 insertions(+), 81 deletions(-) diff --git a/QUICKSTART b/QUICKSTART index a4fe9884..66ae40d8 100644 --- a/QUICKSTART +++ b/QUICKSTART @@ -46,7 +46,9 @@ logging { timeout { token: 200 - retransmit: 45 + token_retransmit: 50 + hold: 30 + retransmits_before_loss: 4 join: 100 consensus: 200 merge: 200 diff --git a/exec/Makefile b/exec/Makefile index ad2544ec..4a670a56 100644 --- a/exec/Makefile +++ b/exec/Makefile @@ -29,16 +29,16 @@ # THE POSSIBILITY OF SUCH DAMAGE. # Production mode flags -#CFLAGS = -O3 -Wall -fomit-frame-pointer -#LDFLAGS = +CFLAGS = -O3 -Wall -fomit-frame-pointer +LDFLAGS = # Debug mode flags -CFLAGS = -g -Wall +#CFLAGS = -g -Wall ##-DDEBUG -LDFLAGS = -g +#LDFLAGS = -g # Profile mode flags -#CFLAGS = -O2 -pg +#CFLAGS = -O3 -pg #LDFLAGS = -pg # Code Coverage with lcov flgs diff --git a/exec/parse.c b/exec/parse.c index 32dfccab..95b283af 100644 --- a/exec/parse.c +++ b/exec/parse.c @@ -564,19 +564,24 @@ extern int openais_main_config_read (char **error_string, break; case MAIN_TIMEOUT: if ((loc = strstr_rs (line, "token:"))) { - openais_config->timeouts[TOTEM_TOKEN]= atoi(loc); - } else if ((loc = strstr_rs (line, "retransmit:"))) { - openais_config->timeouts[TOTEM_RETRANSMIT_TOKEN] = atoi(loc); + openais_config->timeouts[TOTEM_TOKEN]= atoi(loc); + } else if ((loc = strstr_rs (line, "token_retransmit:"))) { + openais_config->timeouts[TOTEM_RETRANSMIT_TOKEN] = atoi(loc); + } else if ((loc = strstr_rs (line, "hold:"))) { + openais_config->timeouts[TOTEM_HOLD_TOKEN] = atoi(loc); + } else if ((loc = strstr_rs (line, "retransmits_before_loss:"))) { + openais_config->timeouts[TOTEM_RETRANSMITS_BEFORE_LOSS] = atoi(loc); + } else if ((loc = strstr_rs (line, "join:"))) { - openais_config->timeouts[TOTEM_JOIN] = atoi(loc); + openais_config->timeouts[TOTEM_JOIN] = atoi(loc); } else if ((loc = strstr_rs (line, "consensus:"))) { - openais_config->timeouts[TOTEM_CONSENSUS] = atoi(loc); + openais_config->timeouts[TOTEM_CONSENSUS] = atoi(loc); } else if ((loc = strstr_rs (line, "merge:"))) { - openais_config->timeouts[TOTEM_MERGE] = atoi(loc); + openais_config->timeouts[TOTEM_MERGE] = atoi(loc); } else if ((loc = strstr_rs (line, "downcheck:"))) { - openais_config->timeouts[TOTEM_DOWNCHECK] = atoi(loc); + openais_config->timeouts[TOTEM_DOWNCHECK] = atoi(loc); } else if ((loc = strstr_rs (line, "fail_recv_const:"))) { - openais_config->timeouts[TOTEM_FAIL_RECV_CONST] = atoi(loc); + openais_config->timeouts[TOTEM_FAIL_RECV_CONST] = atoi(loc); } else if ((loc = strstr_rs (line, "}"))) { parse = MAIN_HEAD; } else { diff --git a/exec/parse.h b/exec/parse.h index 34397860..ba4d790e 100644 --- a/exec/parse.h +++ b/exec/parse.h @@ -62,8 +62,10 @@ enum amfOperationalAdministrativeState { * needs to remain the last item in the list. */ enum { + TOTEM_RETRANSMITS_BEFORE_LOSS, TOTEM_TOKEN, TOTEM_RETRANSMIT_TOKEN, + TOTEM_HOLD_TOKEN, TOTEM_JOIN, TOTEM_CONSENSUS, TOTEM_MERGE, diff --git a/exec/totempg.c b/exec/totempg.c index 8c8eaff9..35e4b5b1 100644 --- a/exec/totempg.c +++ b/exec/totempg.c @@ -524,6 +524,8 @@ int totempg_mcast ( int copy_len = 0; int copy_base = 0; + totemsrp_new_msg_signal (); + max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof (unsigned short) * (mcast_packed_msg_count + 1)); diff --git a/exec/totemsrp.c b/exec/totemsrp.c index 52252b33..665c4b9f 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -64,6 +64,7 @@ int totemsrp_brake; #include #include #include +#include #include #include #include @@ -101,8 +102,10 @@ int totemsrp_brake; #define MISSING_MCAST_WINDOW 128 #define TIMEOUT_STATE_GATHER_JOIN 100 #define TIMEOUT_STATE_GATHER_CONSENSUS 200 +#define TOKEN_RETRANSMITS_BEFORE_LOSS 4 #define TIMEOUT_TOKEN 200 -#define TIMEOUT_TOKEN_RETRANSMIT 45 +#define TIMEOUT_TOKEN_RETRANSMIT (int)(TIMEOUT_TOKEN / (TOKEN_RETRANSMITS_BEFORE_LOSS + 0.2)) +#define TIMEOUT_TOKEN_HOLD (int)(TIMEOUT_TOKEN_RETRANSMIT * 0.8 - (1000/HZ)) #define TIMEOUT_MERGE_DETECT 200 #define PACKET_SIZE_MAX 2000 #define FAIL_TO_RECV_CONST 250 @@ -150,6 +153,7 @@ enum message_type { MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */ MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */ MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */ + MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */ }; /* @@ -218,6 +222,10 @@ static unsigned int timeout_token = TIMEOUT_TOKEN; static unsigned int timeout_token_retransmit = TIMEOUT_TOKEN_RETRANSMIT; +static unsigned int timeout_token_hold = 0; + +static unsigned int token_retransmits_before_loss = TOKEN_RETRANSMITS_BEFORE_LOSS; + static unsigned int timeout_state_gather_join = TIMEOUT_STATE_GATHER_JOIN; static unsigned int timeout_state_gather_consensus = TIMEOUT_STATE_GATHER_CONSENSUS; @@ -283,6 +291,8 @@ poll_timer_handle timer_orf_token_timeout = 0; poll_timer_handle timer_orf_token_retransmit_timeout = 0; +poll_timer_handle timer_orf_token_hold_retransmit_timeout = 0; + poll_timer_handle timer_merge_detect_timeout = 0; poll_timer_handle memb_timer_state_gather_join_timeout = 0; @@ -373,6 +383,11 @@ struct memb_merge_detect { struct memb_ring_id ring_id; } __attribute__((packed)); +struct token_hold_cancel { + struct message_header header; + struct memb_ring_id ring_id; +} __attribute__((packed)); + struct memb_commit_token_memb_entry { struct memb_ring_id ring_id; int aru; @@ -433,7 +448,7 @@ static struct iovec iov_encrypted = { struct message_handlers { int count; - int (*handler_functions[5]) (struct sockaddr_in *, struct iovec *, int, int, int); + int (*handler_functions[6]) (struct sockaddr_in *, struct iovec *, int, int, int); }; poll_handle *totemsrp_poll_handle; @@ -467,12 +482,17 @@ static int message_handler_memb_join (struct sockaddr_in *, struct iovec *, int, static int message_handler_memb_commit_token (struct sockaddr_in *, struct iovec *, int, int, int); +static int message_handler_token_hold_cancel (struct sockaddr_in *, struct iovec *, int, int, int); + static void memb_ring_id_create_or_load (struct memb_ring_id *); static int recv_handler (poll_handle handle, int fd, int revents, void *data, unsigned int *prio); static int netif_determine (struct sockaddr_in *bindnet, struct sockaddr_in *bound_to,int *interface_up); static int loopback_determine (struct sockaddr_in *bound_to); static void netif_down_check (void); +static void token_callbacks_execute (enum totem_callback_token_type type); + + #define NETIF_STATE_REPORT_UP 1 #define NETIF_STATE_REPORT_DOWN 2 @@ -508,19 +528,21 @@ static void memb_ring_id_store (struct memb_commit_token *commit_token); static void memb_state_commit_token_update (struct memb_commit_token *memb_commit_token); static int memb_state_commit_token_send (struct memb_commit_token *memb_commit_token); static void memb_state_commit_token_create (struct memb_commit_token *commit_token); +static int token_hold_cancel_send (void); static void orf_token_endian_convert (struct orf_token *in, struct orf_token *out); static void memb_commit_token_endian_convert (struct memb_commit_token *in, struct memb_commit_token *out); static void memb_join_endian_convert (struct memb_join *in, struct memb_join *out); static void mcast_endian_convert (struct mcast *in, struct mcast *out); struct message_handlers totemsrp_message_handlers = { - 4, + 6, { message_handler_orf_token, message_handler_mcast, message_handler_memb_merge_detect, message_handler_memb_join, - message_handler_memb_commit_token + message_handler_memb_commit_token, + message_handler_token_hold_cancel } }; @@ -592,6 +614,8 @@ int totemsrp_initialize ( int i; + timeout_token_hold = (int)(timeout_token_retransmit * 0.8 - (1000/HZ)); + /* * Initialize random number generator for later use to generate salt */ @@ -613,7 +637,7 @@ int totemsrp_initialize ( * Update our timeout values if they were specified in the openais.conf * file. */ - for (i = TOTEM_TOKEN; i < MAX_TOTEM_TIMEOUTS; i++) { + for (i = 0; i < MAX_TOTEM_TIMEOUTS; i++) { if (!timeouts[i]) { continue; } @@ -621,12 +645,24 @@ int totemsrp_initialize ( case TOTEM_TOKEN: timeout_token = timeouts[i]; totemsrp_log_printf (totemsrp_log_level_notice, - "Token Timeout set to %u ms\n", timeouts[i]); + "Overriding token timeout to (%u ms)\n", timeouts[i]); + timeout_token_retransmit = (int)(timeout_token / (token_retransmits_before_loss + 0.2)); + timeout_token_hold = (int)(timeout_token_retransmit * 0.8 - (1000/HZ)); break; case TOTEM_RETRANSMIT_TOKEN: timeout_token_retransmit = timeouts[i]; totemsrp_log_printf (totemsrp_log_level_notice, - "Token Retransmit Timeout set to %u ms\n", timeouts[i]); + "Overriding token retransmit timeout to (%u ms)\n", timeouts[i]); + break; + case TOTEM_RETRANSMITS_BEFORE_LOSS: + token_retransmits_before_loss = timeouts[i]; + totemsrp_log_printf (totemsrp_log_level_notice, + "Overriding retransmits before loss (%u retrans)\n", timeouts[i]); + break; + case TOTEM_HOLD_TOKEN: + timeout_token_hold = timeouts[i]; + totemsrp_log_printf (totemsrp_log_level_notice, + "Overriding token hold timeout to (%u ms)\n", timeouts[i]); break; case TOTEM_JOIN: timeout_state_gather_join = timeouts[i]; @@ -660,6 +696,13 @@ int totemsrp_initialize ( } } + totemsrp_log_printf (totemsrp_log_level_notice, + "Token Timeout (%d ms) retransmit timeout (%d ms)\n", + timeout_token, timeout_token_retransmit); + totemsrp_log_printf (totemsrp_log_level_notice, + "token hold (%d ms) retransmits before loss (%d retrans)\n", + timeout_token_hold, token_retransmits_before_loss); + queue_init (&new_message_queue, NEW_MESSAGE_QUEUE_SIZE_MAX, sizeof (struct message_item)); @@ -922,15 +965,16 @@ void memb_set_print (char *string, static void timer_function_orf_token_timeout (void *data); static void timer_function_token_retransmit_timeout (void *data); +static void timer_function_token_hold_retransmit_timeout (void *data); static void timer_function_merge_detect_timeout (void *data); void reset_token_retransmit_timeout (void) { - poll_timer_delete (*totemsrp_poll_handle, - timer_orf_token_retransmit_timeout); - poll_timer_add (*totemsrp_poll_handle, timeout_token_retransmit, 0, - timer_function_token_retransmit_timeout, - &timer_orf_token_retransmit_timeout); + poll_timer_delete (*totemsrp_poll_handle, + timer_orf_token_retransmit_timeout); + poll_timer_add (*totemsrp_poll_handle, timeout_token_retransmit, 0, + timer_function_token_retransmit_timeout, + &timer_orf_token_retransmit_timeout); } @@ -1011,11 +1055,23 @@ void reset_token_timeout (void) { } void cancel_token_timeout (void) { - poll_timer_delete (*totemsrp_poll_handle, timer_orf_token_timeout); + poll_timer_delete (*totemsrp_poll_handle, timer_orf_token_timeout); } void cancel_token_retransmit_timeout (void) { - poll_timer_delete (*totemsrp_poll_handle, timer_orf_token_retransmit_timeout); + poll_timer_delete (*totemsrp_poll_handle, timer_orf_token_retransmit_timeout); +} + +void start_token_hold_retransmit_timeout (void) +{ + poll_timer_add (*totemsrp_poll_handle, timeout_token_hold, (void *)9999, + timer_function_token_hold_retransmit_timeout, + &timer_orf_token_hold_retransmit_timeout); +} +void cancel_token_hold_retransmit_timeout (void) +{ + poll_timer_delete (*totemsrp_poll_handle, + timer_orf_token_hold_retransmit_timeout); } static void memb_state_consensus_timeout_expired (void) @@ -1712,6 +1768,11 @@ printf ("received message size %d\n", iov->iov_len); return (0); } +void totemsrp_new_msg_signal (void) +{ + token_hold_cancel_send (); +} + int totemsrp_mcast ( struct iovec *iovec, int iov_len, @@ -1769,6 +1830,7 @@ int totemsrp_mcast ( queue_item_add (&new_message_queue, &message_item); return (0); + error_iovec: for (j = 0; j < i; j++) { free (message_item.iovec[j].iov_base); @@ -2567,9 +2629,6 @@ void token_retransmit (void) { */ void timer_function_token_retransmit_timeout (void *data) { -struct timeval timeval; - - gettimeofday (&timeval, 0); switch (memb_state) { case MEMB_STATE_GATHER: break; @@ -2583,6 +2642,19 @@ struct timeval timeval; } } +void timer_function_token_hold_retransmit_timeout (void *data) +{ + switch (memb_state) { + case MEMB_STATE_GATHER: + break; + case MEMB_STATE_COMMIT: + break; + case MEMB_STATE_OPERATIONAL: + case MEMB_STATE_RECOVERY: + token_retransmit (); + break; + } +} void timer_function_merge_detect_timeout(void *data) { my_merge_detect_timeout_outstanding = 0; @@ -2660,6 +2732,51 @@ static int token_send ( return (res); } +static int token_hold_cancel_send (void) +{ + struct token_hold_cancel token_hold_cancel; + struct iovec iov; + struct msghdr msghdr; + + /* + * Only cancel if the token is currently held + */ + if (my_token_held == 0) { + return (0); + } + my_token_held = 0; + + /* + * Build message + */ + token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL; + token_hold_cancel.header.endian_detector = ENDIAN_LOCAL; + memcpy (&token_hold_cancel.ring_id, &my_ring_id, + sizeof (struct memb_ring_id)); + + iov.iov_base = &token_hold_cancel; + iov.iov_len = sizeof (struct token_hold_cancel); + + encrypt_and_sign (&iov, 1); + + /* + * Build multicast message + */ + msghdr.msg_name = (caddr_t)&sockaddr_in_mcast; + msghdr.msg_namelen = sizeof (struct sockaddr_in); + msghdr.msg_iov = &iov_encrypted; + msghdr.msg_iovlen = 1; + msghdr.msg_control = 0; + msghdr.msg_controllen = 0; + msghdr.msg_flags = 0; + + /* + * Multicast message + */ + sendmsg (totemsrp_sockets[0].mcast, &msghdr, MSG_NOSIGNAL | MSG_DONTWAIT); + + return (0); +} int orf_token_send_initial (void) { struct orf_token orf_token; @@ -2997,7 +3114,7 @@ void totem_callback_token_type (void *handle) free (token_callback_instance); } -void token_callbacks_execute (enum totem_callback_token_type type) +static void token_callbacks_execute (enum totem_callback_token_type type) { struct list_head *list; struct list_head *list_next; @@ -3085,7 +3202,6 @@ printf ("OTHERS %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0); } #endif - my_token_held = 1; my_do_delivery = 0; #ifdef RANDOM_DROP @@ -3093,24 +3209,17 @@ if (random () % 100 < 10) { return (0); } #endif - /* - * Hold onto token when there is no activity on ring and - * this processor is the ring rep - */ - forward_token = 1; - if (my_ring_id.rep.s_addr == my_id.sin_addr.s_addr) { - if (my_seq_unchanged > SEQNO_UNCHANGED_CONST) { - forward_token = 0; - } - } /* * Handle merge detection timeout */ if (token_ref->seq == my_last_seq) { start_merge_detect_timeout (); + my_seq_unchanged += 1; } else { cancel_merge_detect_timeout (); + cancel_token_hold_retransmit_timeout (); + my_seq_unchanged = 0; } my_last_seq = token_ref->seq; @@ -3145,6 +3254,29 @@ if (random () % 100 < 10) { } } while (nfds == 1); + /* + * Determine if we should hold (in reality drop) the token + */ + my_token_held = 0; + if (my_ring_id.rep.s_addr == my_id.sin_addr.s_addr && + my_seq_unchanged > SEQNO_UNCHANGED_CONST) { + my_token_held = 1; + } else + if (my_ring_id.rep.s_addr != my_id.sin_addr.s_addr && + my_seq_unchanged >= SEQNO_UNCHANGED_CONST) { + my_token_held = 1; + } + + /* + * Hold onto token when there is no activity on ring and + * this processor is the ring rep + */ + forward_token = 1; + if (my_ring_id.rep.s_addr == my_id.sin_addr.s_addr) { + if (my_token_held) { + forward_token = 0; + } + } token_callbacks_execute (TOTEM_CALLBACK_TOKEN_RECEIVED); @@ -3170,7 +3302,6 @@ if (random () % 100 < 10) { if (memcmp (&token->ring_id, &my_ring_id, sizeof (struct memb_ring_id)) != 0) { - my_token_held = 0; return (0); /* discard token */ } @@ -3178,26 +3309,17 @@ if (random () % 100 < 10) { * Discard retransmitted tokens */ if (my_token_seq >= token->token_seq) { - my_token_held = 0; reset_token_retransmit_timeout (); reset_token_timeout (); return (0); /* discard token */ } transmits_allowed = 30; mcasted = orf_token_rtr (token, &transmits_allowed); - if (mcasted) { - forward_token = 1; - my_seq_unchanged = 0; - } - if ((last_aru + MISSING_MCAST_WINDOW) < token->seq) { - transmits_allowed = 0; - } - mcasted = orf_token_mcast (token, transmits_allowed, system_from); - if (mcasted) { - forward_token = 1; - my_seq_unchanged = 0; + if ((last_aru + MISSING_MCAST_WINDOW) < token->seq) { + transmits_allowed = 0; } + mcasted = orf_token_mcast (token, transmits_allowed, system_from); if (my_aru < token->aru || my_id.sin_addr.s_addr == token->aru_addr.s_addr || token->aru_addr.s_addr == 0) { @@ -3292,7 +3414,7 @@ printf ("FAILED TO RECEIVE\n"); } } - token_send (token, 1 /* forward_token */); + token_send (token, forward_token); #ifdef GIVEINFO gettimeofday (&tv_current, NULL); @@ -3313,15 +3435,17 @@ printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0); * to improve performance */ reset_token_timeout (); // REVIEWED - if (forward_token == 0) { - reset_token_retransmit_timeout (); // REVIEWED + reset_token_retransmit_timeout (); // REVIEWED + if (my_id.sin_addr.s_addr == my_ring_id.rep.s_addr && + my_token_held == 1) { + + start_token_hold_retransmit_timeout (); } token_callbacks_execute (TOTEM_CALLBACK_TOKEN_SENT); } break; } - my_token_held = 0; return (0); } @@ -3871,6 +3995,26 @@ if (random()%100 < 10) { return (0); } +static int message_handler_token_hold_cancel ( + struct sockaddr_in *system_from, + struct iovec *iovec, + int iov_len, + int bytes_received, + int endian_conversion_needed) +{ + struct token_hold_cancel *token_hold_cancel = (struct token_hold_cancel *)iovec->iov_base; + + if (memcmp (&token_hold_cancel->ring_id, &my_ring_id, + sizeof (struct memb_ring_id)) == 0) { + + my_seq_unchanged = 0; + if (my_ring_id.rep.s_addr == my_id.sin_addr.s_addr) { + timer_function_token_retransmit_timeout (0); + } + } + return (0); +} + static int recv_handler (poll_handle handle, int fd, int revents, void *data, unsigned int *prio) { diff --git a/exec/totemsrp.h b/exec/totemsrp.h index 67b3137a..b4c8c935 100644 --- a/exec/totemsrp.h +++ b/exec/totemsrp.h @@ -101,6 +101,8 @@ int totemsrp_callback_token_create ( void totemsrp_callback_token_destroy ( void **handle_out); +void totemsrp_new_msg_signal (void); + extern struct sockaddr_in config_mcast_addr; #endif /* TOTEMSRP_H_DEFINED */ diff --git a/test/evsbench.c b/test/evsbench.c index c1e14f96..9f16cbdf 100644 --- a/test/evsbench.c +++ b/test/evsbench.c @@ -54,10 +54,14 @@ static int alarm_notice = 0; +int outstanding = 0; + + void evs_deliver_fn (struct in_addr source_addr, void *msg, int msg_len) { -// printf ("Delivering message %s\n", buf); + outstanding--; +// printf ("Delivering message %s\n", msg); } void evs_confchg_fn ( @@ -118,11 +122,15 @@ void evs_benchmark (evs_handle_t handle, do { sprintf (buffer, "This is message %d\n", write_count); try_again: - result = evs_mcast_joined (handle, EVS_TYPE_AGREED, &iov, 1); - if (result == EVS_ERR_TRY_AGAIN) { - goto try_again; - } else { - write_count += 1; + if (outstanding < 10) { + result = evs_mcast_joined (handle, EVS_TYPE_AGREED, &iov, 1); + if (result == EVS_ERR_TRY_AGAIN) { +printf ("try again\n"); + goto try_again; + } else { + write_count += 1; + outstanding++; + } } result = evs_dispatch (handle, EVS_DISPATCH_ALL); } while (alarm_notice == 0); diff --git a/test/test.cpp b/test/test.cpp index a2bf396e..192579cc 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -15,10 +15,15 @@ #include "ais_types.h" -#include "ais_ckpt.h" +#include "saCkpt.h" -//SaVersionT version = { 'A', 1, 1 }; +SaVersionT version = { 'B', 1, 1 }; + +SaCkptCallbacksT callbacks = { + 0, + 0 +}; SaCkptCheckpointCreationAttributesT checkpointCreationAttributes = { SA_CKPT_WR_ALL_REPLICAS, @@ -30,8 +35,8 @@ SaCkptCheckpointCreationAttributesT checkpointCreationAttributes = { }; SaCkptSectionIdT sectionId = { - (SaUint8T*)"section ID #1", - 14 + 14, + (SaUint8T*)"section ID #1" }; SaCkptSectionCreationAttributesT sectionCreationAttributes = { @@ -60,7 +65,7 @@ char* getPayload(int psize) { return retVal; } -SaCkptCheckpointHandleT* WriteCheckpointHandle; +SaCkptCheckpointHandleT WriteCheckpointHandle; static long sendCount = 0; void process_message() @@ -68,10 +73,11 @@ void process_message() struct timeval tv; long t1; long t2; + long told; SaCkptIOVectorElementT writeElement; // KJS SaUint32T erroroneousVectorIndex = 0; - SaErrorT error; + SaAisErrorT error; writeElement.sectionId = sectionId; writeElement.dataBuffer = getPayload(200); @@ -81,6 +87,7 @@ void process_message() gettimeofday(&tv, NULL); t1 = tv.tv_usec; + told = tv.tv_sec; do { error = saCkptCheckpointWrite (WriteCheckpointHandle, @@ -88,33 +95,40 @@ void process_message() 1, &erroroneousVectorIndex); - if (error != SA_OK) { + if (error != SA_AIS_OK) { fprintf(stderr,"saCkptCheckpointWrite result %d (should be 1)\n", error); } sendCount++; fprintf(stderr,"sendCount = %d",(int)sendCount); - } while (error == SA_ERR_TRY_AGAIN); + } while (error == SA_AIS_ERR_TRY_AGAIN); gettimeofday(&tv, NULL); t2 = tv.tv_usec; - fprintf(stderr," ,RTT::%d\n",(long)t2-t1); + fprintf(stderr," ,RTT::%d.%d\n",(long)tv.tv_sec - told, t2-t1); } int main () { - SaErrorT error; + SaAisErrorT error; SaNameT* WriteCheckpointName = (SaNameT*) malloc(sizeof(SaNameT)); - WriteCheckpointHandle = (SaCkptCheckpointHandleT*) malloc(sizeof(SaCkptCheckpointHandleT)); char name[10]; + SaCkptHandleT ckptHandle; + sprintf(name,"ckpt%d",1); int namelen = strlen(name) + 1; memcpy(WriteCheckpointName->value, name, namelen); WriteCheckpointName->length = namelen; - error = saCkptCheckpointOpen (WriteCheckpointName, + + error = saCkptInitialize (&ckptHandle, &callbacks, &version); + + error = saCkptCheckpointOpen ( + ckptHandle, + WriteCheckpointName, &checkpointCreationAttributes, SA_CKPT_CHECKPOINT_WRITE, 1000000000, /* 1 Second */ - WriteCheckpointHandle); - if (error != SA_OK) { + &WriteCheckpointHandle); + + if (error != SA_AIS_OK) { fprintf(stderr,"saCkptCheckpointOpen result %d (should be 1)\n", error); return error; } @@ -123,7 +137,7 @@ int main () { §ionCreationAttributes, "Initial Data #0", strlen ("Initial Data #0") + 1); - if (error != SA_OK) { + if (error != SA_AIS_OK) { fprintf(stderr,"saCkptSectionCreate result = %d\n", error); return error; }