SAM integration of quorum

Patch adds integration of SAM and quorum, so it's now possible to use
SAM_RECOVERY_POLICY_QUORUM_QUIT or SAM_RECOVERY_POLICY_QUORUM_RESTART
recovery policy. With these policies, sam_start will block until
corosync is quorate. If quorum is lost during health checking, recovery
action is taken.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2822 fd59a12c-fef9-0310-b244-a6a79926bd2f
This commit is contained in:
Jan Friesse 2010-05-13 11:20:49 +00:00
parent 088a2a0f17
commit d5884cd714
9 changed files with 443 additions and 75 deletions

View File

@ -43,6 +43,9 @@ extern "C" {
typedef enum {
SAM_RECOVERY_POLICY_QUIT = 1,
SAM_RECOVERY_POLICY_RESTART = 2,
SAM_RECOVERY_POLICY_QUORUM = 0x08,
SAM_RECOVERY_POLICY_QUORUM_QUIT = SAM_RECOVERY_POLICY_QUORUM | SAM_RECOVERY_POLICY_QUIT,
SAM_RECOVERY_POLICY_QUORUM_RESTART = SAM_RECOVERY_POLICY_QUORUM | SAM_RECOVERY_POLICY_RESTART,
} sam_recovery_policy_t;
/*

View File

@ -62,6 +62,7 @@ libvotequorum_a_SOURCES = votequorum.c
libconfdb_a_SOURCES = confdb.c sa-confdb.c
libconfdb_a_LIBADD = ../lcr/lcr_ifact.o
CONFDB_LINKER_ADD = $(OS_DYFLAGS) $(OS_LDL)
SAM_LINKER_ADD = -L. -lquorum
libcoroipcc_a_SOURCES = coroipcc.c
libsam_a_SOURCES = sam.c

View File

@ -1 +1 @@
4.2.1
4.3.0

312
lib/sam.c
View File

@ -51,6 +51,7 @@
#include <corosync/coroipcc.h>
#include <corosync/corodefs.h>
#include <corosync/hdb.h>
#include <corosync/quorum.h>
#include <corosync/sam.h>
@ -107,20 +108,62 @@ static struct {
void *user_data;
size_t user_data_size;
size_t user_data_allocated;
quorum_handle_t quorum_handle;
uint32_t quorate;
int quorum_fd;
} sam_internal_data;
static void quorum_notification_fn (
quorum_handle_t handle,
uint32_t quorate,
uint64_t ring_id,
uint32_t view_list_entries,
uint32_t *view_list)
{
sam_internal_data.quorate = quorate;
}
cs_error_t sam_initialize (
int time_interval,
sam_recovery_policy_t recovery_policy)
{
quorum_callbacks_t quorum_callbacks;
cs_error_t err;
if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_NOT_INITIALIZED) {
return (CS_ERR_BAD_HANDLE);
}
if (recovery_policy != SAM_RECOVERY_POLICY_QUIT && recovery_policy != SAM_RECOVERY_POLICY_RESTART) {
if (recovery_policy != SAM_RECOVERY_POLICY_QUIT && recovery_policy != SAM_RECOVERY_POLICY_RESTART &&
recovery_policy != SAM_RECOVERY_POLICY_QUORUM_QUIT && recovery_policy != SAM_RECOVERY_POLICY_QUORUM_RESTART) {
return (CS_ERR_INVALID_PARAM);
}
if (recovery_policy & SAM_RECOVERY_POLICY_QUORUM) {
/*
* Initialize quorum
*/
quorum_callbacks.quorum_notify_fn = quorum_notification_fn;
if ((err = quorum_initialize (&sam_internal_data.quorum_handle, &quorum_callbacks)) != CS_OK) {
goto exit_error;
}
if ((err = quorum_trackstart (sam_internal_data.quorum_handle, CS_TRACK_CHANGES)) != CS_OK) {
goto exit_error_quorum;
}
if ((err = quorum_fd_get (sam_internal_data.quorum_handle, &sam_internal_data.quorum_fd)) != CS_OK) {
goto exit_error_quorum;
}
/*
* Dispatch initial quorate state
*/
if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ONE)) != CS_OK) {
goto exit_error_quorum;
}
}
sam_internal_data.recovery_policy = recovery_policy;
sam_internal_data.time_interval = time_interval;
@ -136,6 +179,11 @@ cs_error_t sam_initialize (
sam_internal_data.user_data_allocated = 0;
return (CS_OK);
exit_error_quorum:
quorum_finalize (sam_internal_data.quorum_handle);
exit_error:
return (err);
}
/*
@ -350,6 +398,7 @@ cs_error_t sam_data_store (
cs_error_t sam_start (void)
{
char command;
cs_error_t err;
if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED) {
return (CS_ERR_BAD_HANDLE);
@ -360,6 +409,15 @@ cs_error_t sam_start (void)
if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command))
return (CS_ERR_LIBRARY);
if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_QUORUM) {
/*
* Wait for parent reply
*/
if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) {
return (err);
}
}
if (sam_internal_data.hc_callback)
if (sam_safe_write (sam_internal_data.cb_wpipe_fd, &command, sizeof (command)) != sizeof (command))
return (CS_ERR_LIBRARY);
@ -515,6 +573,109 @@ error_reply:
return (err);
}
static cs_error_t sam_parent_wait_for_quorum (
int parent_fd_in,
int parent_fd_out)
{
char reply;
cs_error_t err;
struct pollfd pfds[2];
int poll_err;
/*
* Update current quorum
*/
if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ALL)) != CS_OK) {
goto error_reply;
}
/*
* Wait for quorum
*/
while (!sam_internal_data.quorate) {
pfds[0].fd = parent_fd_in;
pfds[0].events = 0;
pfds[0].revents = 0;
pfds[1].fd = sam_internal_data.quorum_fd;
pfds[1].events = POLLIN;
pfds[1].revents = 0;
poll_err = poll (pfds, 2, -1);
if (poll_err == -1) {
/*
* Error in poll
* If it is EINTR, continue, otherwise QUIT
*/
if (errno != EINTR) {
err = CS_ERR_LIBRARY;
goto error_reply;
}
}
if (pfds[0].revents != 0) {
if (pfds[0].revents == POLLERR || pfds[0].revents == POLLHUP ||pfds[0].revents == POLLNVAL) {
/*
* Child has exited
*/
return (CS_OK);
}
}
if (pfds[1].revents != 0) {
if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ONE)) != CS_OK) {
goto error_reply;
}
}
}
reply = SAM_REPLY_OK;
if (sam_safe_write (parent_fd_out, &reply, sizeof (reply)) != sizeof (reply)) {
err = CS_ERR_LIBRARY;
goto error_reply;
}
return (CS_OK);
error_reply:
reply = SAM_REPLY_ERROR;
if (sam_safe_write (parent_fd_out, &reply, sizeof (reply)) != sizeof (reply)) {
return (CS_ERR_LIBRARY);
}
if (sam_safe_write (parent_fd_out, &err, sizeof (err)) != sizeof (err)) {
return (CS_ERR_LIBRARY);
}
return (err);
}
static cs_error_t sam_parent_kill_child (
int *action,
pid_t child_pid)
{
/*
* Kill child process
*/
if (!sam_internal_data.term_send) {
/*
* We didn't send warn_signal yet.
*/
kill (child_pid, sam_internal_data.warn_signal);
sam_internal_data.term_send = 1;
} else {
/*
* We sent child warning. Now, we will not be so nice
*/
kill (child_pid, SIGKILL);
*action = SAM_PARENT_ACTION_RECOVERY;
}
return (CS_OK);
}
static cs_error_t sam_parent_data_store (
int parent_fd_in,
int parent_fd_out)
@ -585,16 +746,19 @@ static enum sam_parent_action_t sam_parent_handler (
ssize_t bytes_read;
char command;
int time_interval;
struct pollfd pfds;
struct pollfd pfds[2];
nfds_t nfds;
cs_error_t err;
status = 0;
action = SAM_PARENT_ACTION_CONTINUE;
while (action == SAM_PARENT_ACTION_CONTINUE) {
pfds.fd = parent_fd_in;
pfds.events = POLLIN;
pfds.revents = 0;
pfds[0].fd = parent_fd_in;
pfds[0].events = POLLIN;
pfds[0].revents = 0;
nfds = 1;
if (status == 1 && sam_internal_data.time_interval != 0) {
time_interval = sam_internal_data.time_interval;
@ -602,7 +766,14 @@ static enum sam_parent_action_t sam_parent_handler (
time_interval = -1;
}
poll_error = poll (&pfds, 1, time_interval);
if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_QUORUM) {
pfds[nfds].fd = sam_internal_data.quorum_fd;
pfds[nfds].events = POLLIN;
pfds[nfds].revents = 0;
nfds++;
}
poll_error = poll (pfds, nfds, time_interval);
if (poll_error == -1) {
/*
@ -621,75 +792,81 @@ static enum sam_parent_action_t sam_parent_handler (
if (status == 0) {
action = SAM_PARENT_ACTION_QUIT;
} else {
/*
* Kill child process
*/
if (!sam_internal_data.term_send) {
/*
* We didn't send warn_signal yet.
*/
kill (child_pid, sam_internal_data.warn_signal);
sam_internal_data.term_send = 1;
} else {
/*
* We sent child warning. Now, we will not be so nice
*/
kill (child_pid, SIGKILL);
action = SAM_PARENT_ACTION_RECOVERY;
}
sam_parent_kill_child (&action, child_pid);
}
}
if (poll_error > 0) {
/*
* We have EOF or command in pipe
*/
bytes_read = sam_safe_read (parent_fd_in, &command, 1);
if (bytes_read == 0) {
if (pfds[0].revents != 0) {
/*
* Handle EOF -> Take recovery action or quit if sam_start wasn't called
* We have EOF or command in pipe
*/
if (status == 0)
action = SAM_PARENT_ACTION_QUIT;
else
action = SAM_PARENT_ACTION_RECOVERY;
bytes_read = sam_safe_read (parent_fd_in, &command, 1);
continue;
}
if (bytes_read == -1) {
action = SAM_PARENT_ACTION_ERROR;
goto action_exit;
}
/*
* We have read command
*/
switch (command) {
case SAM_COMMAND_START:
if (status == 0) {
if (bytes_read == 0) {
/*
* Not started yet
* Handle EOF -> Take recovery action or quit if sam_start wasn't called
*/
status = 1;
if (status == 0)
action = SAM_PARENT_ACTION_QUIT;
else
action = SAM_PARENT_ACTION_RECOVERY;
continue;
}
break;
case SAM_COMMAND_STOP:
if (status == 1) {
/*
* Started
*/
status = 0;
if (bytes_read == -1) {
action = SAM_PARENT_ACTION_ERROR;
goto action_exit;
}
/*
* We have read command
*/
switch (command) {
case SAM_COMMAND_START:
if (status == 0) {
/*
* Not started yet
*/
if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_QUORUM) {
if (sam_parent_wait_for_quorum (parent_fd_in,
parent_fd_out) != CS_OK) {
continue;
}
}
status = 1;
}
break;
case SAM_COMMAND_STOP:
if (status == 1) {
/*
* Started
*/
status = 0;
}
break;
case SAM_COMMAND_DATA_STORE:
sam_parent_data_store (parent_fd_in, parent_fd_out);
break;
case SAM_COMMAND_WARN_SIGNAL_SET:
sam_parent_warn_signal_set (parent_fd_in, parent_fd_out);
break;
}
} /* if (pfds[0].revents != 0) */
if ((sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_QUORUM) &&
pfds[1].revents != 0) {
/*
* Handle quorum change
*/
err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ALL);
if (status == 1 &&
(!sam_internal_data.quorate || (err != CS_ERR_TRY_AGAIN && err != CS_OK))) {
sam_parent_kill_child (&action, child_pid);
}
break;
case SAM_COMMAND_DATA_STORE:
sam_parent_data_store (parent_fd_in, parent_fd_out);
break;
case SAM_COMMAND_WARN_SIGNAL_SET:
sam_parent_warn_signal_set (parent_fd_in, parent_fd_out);
break;
}
} /* select_error > 0 */
} /* action == SAM_PARENT_ACTION_CONTINUE */
@ -785,11 +962,16 @@ cs_error_t sam_register (
;
if (action == SAM_PARENT_ACTION_RECOVERY) {
if (sam_internal_data.recovery_policy == SAM_RECOVERY_POLICY_QUIT)
if (sam_internal_data.recovery_policy == SAM_RECOVERY_POLICY_QUIT ||
sam_internal_data.recovery_policy == SAM_RECOVERY_POLICY_QUORUM_QUIT)
action = SAM_PARENT_ACTION_QUIT;
}
if (action == SAM_PARENT_ACTION_QUIT) {
if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_QUORUM) {
quorum_finalize (sam_internal_data.quorum_handle);
}
exit (WEXITSTATUS (child_status));
}

View File

@ -1,5 +1,5 @@
.\"/*
.\" * Copyright (c) 2009 Red Hat, Inc.
.\" * Copyright (c) 2009-2010 Red Hat, Inc.
.\" *
.\" * All rights reserved.
.\" *
@ -31,7 +31,7 @@
.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
.\" * THE POSSIBILITY OF SUCH DAMAGE.
.\" */
.TH "SAM_INITIALIZE" 3 "12/01/2009" "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
.TH "SAM_INITIALIZE" 3 "30/04/2010" "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
.SH NAME
.P
@ -68,6 +68,9 @@ The \fIrecovery_policy\fR is defined as type:
typedef enum {
SAM_RECOVERY_POLICY_QUIT = 1,
SAM_RECOVERY_POLICY_RESTART = 2,
SAM_RECOVERY_POLICY_QUORUM = 0x08,
SAM_RECOVERY_POLICY_QUORUM_QUIT = SAM_RECOVERY_POLICY_QUORUM | SAM_RECOVERY_POLICY_QUIT,
SAM_RECOVERY_POLICY_QUORUM_RESTART = SAM_RECOVERY_POLICY_QUORUM | SAM_RECOVERY_POLICY_RESTART,
} sam_recovery_policy_t;
.fi
@ -80,6 +83,17 @@ on failure, the process will terminate.
.TP
SAM_RECOVERY_POLICY_RESTART
on failure, the process will restart.
.TP
SAM_RECOVERY_POLICY_QUORUM
is not policy. Used only as flag meaning quorum integration
.TP
SAM_RECOVERY_POLICY_QUORUM_QUIT
same as \fISAM_RECOVERY_POLICY_QUIT\fR but \fBsam_start (3)\fR will block until corosync becomes
quorate and process will be terminated if quorum is lost.
.TP
SAM_RECOVERY_POLICY_QUORUM_RESTART
same as \fISAM_RECOVERY_POLICY_RESTART\fR but \fBsam_start (3)\fR will block until corosync becomes
quorate and process will be restarted if quorum is lost.
.P
To perform event driven healthchecking, \fBsam_register(3)\fR and

View File

@ -1,5 +1,5 @@
.\"/*
.\" * Copyright (c) 2009 Red Hat, Inc.
.\" * Copyright (c) 2009-2010 Red Hat, Inc.
.\" *
.\" * All rights reserved.
.\" *
@ -32,7 +32,7 @@
.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
.\" * THE POSSIBILITY OF SUCH DAMAGE.
.\" */
.TH "SAM_OVERVIEW" 8 "12/01/2009" "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
.TH "SAM_OVERVIEW" 8 "30/04/2010" "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
.SH NAME
.P
@ -115,6 +115,17 @@ or add timers to the active process to signal a healthcheck operation is
successful. To use event driven healthchecking,
the \fBsam_hc_callback_register(3)\fR function should be executed.
.SH Quorum integration
.P
SAM has special policies (\fISAM_RECOVERY_POLICY_QUIT\fR and \fISAM_RECOVERY_POLICY_RESTART\fR)
for integration with quorum service. This policies changes SAM behaviour in two aspects.
.RS
.IP \(bu 3
Call of \fBsam_start(3)\fR blocks until corosync becomes quorate
.IP \(bu 3
User selected recovery action is taken immediately after lost of quorum.
.RE
.SH Storing user data
.P
Sometimes there is need to store some data, which survives between instances.

View File

@ -1,5 +1,5 @@
.\"/*
.\" * Copyright (c) 2009 Red Hat, Inc.
.\" * Copyright (c) 2009-2010 Red Hat, Inc.
.\" *
.\" * All rights reserved.
.\" *
@ -31,7 +31,7 @@
.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
.\" * THE POSSIBILITY OF SUCH DAMAGE.
.\" */
.TH "SAM_START" 3 "12/01/2009" "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
.TH "SAM_START" 3 "30/04/2010" "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
.SH NAME
.P
@ -61,9 +61,15 @@ function can be called.
An application can always stop health checking by calling the \fBsam_stop(3)\fR
function.
.P
If SAM is initialized with quorum policy \fISAM_RECOVERY_POLICY_QUIT\fR or
\fISAM_RECOVERY_POLICY_RESTART\fR \fBsam_start\fR will block until corosync
becomes quorate.
.SH RETURN VALUE
.P
This call return CS_OK value if successful, otherwise and error is returned.
This call return CS_OK value if successful, otherwise and error is returned. If
SAM is initialized with quorum policy, returned error can also be quorum error.
.SH ERRORS
.TP

View File

@ -85,7 +85,7 @@ logsys_t1_LDADD = -llogsys
logsys_t1_LDFLAGS = -L../exec
logsys_t2_LDADD = -llogsys
logsys_t2_LDFLAGS = -L../exec
testsam_LDADD = -lsam
testsam_LDADD = -lsam -lconfdb
testsam_LDFLAGS = -L../lib
LINT_FILES1:=$(filter-out sa_error.c, $(wildcard *.c))

View File

@ -44,6 +44,7 @@
#include <stdlib.h>
#include <unistd.h>
#include <corosync/corotypes.h>
#include <corosync/confdb.h>
#include <corosync/sam.h>
#include <signal.h>
#include <string.h>
@ -735,12 +736,141 @@ static int test6 (void) {
return 1;
}
static void *test7_thread (void *arg)
{
/* Wait 5s */
sleep (5);
exit (0);
}
/*
* Test quorum
*/
static int test7 (void) {
confdb_handle_t cdb_handle;
cs_error_t err;
hdb_handle_t quorum_handle;
size_t value_len;
char key_value[256];
unsigned int instance_id;
pthread_t kill_thread;
err = confdb_initialize (&cdb_handle, NULL);
if (err != CS_OK) {
printf ("Could not initialize Cluster Configuration Database API instance error %d. Test skipped\n", err);
return (1);
}
err = confdb_object_find_start(cdb_handle, OBJECT_PARENT_HANDLE);
if (err != CS_OK) {
printf ("Could not start object_find %d. Test skipped\n", err);
return (1);
}
err = confdb_object_find(cdb_handle, OBJECT_PARENT_HANDLE, "quorum", strlen("quorum"), &quorum_handle);
if (err != CS_OK) {
printf ("Could not object_find \"quorum\": %d. Test skipped\n", err);
return (1);
}
err = confdb_key_get(cdb_handle, quorum_handle, "provider", strlen("provider"), key_value, &value_len);
if (err != CS_OK) {
printf ("Could not get \"provider\" key: %d. Test skipped\n", err);
return (1);
}
if (!(value_len - 1 == strlen ("testquorum") && memcmp (key_value, "testquorum", value_len - 1) == 0)) {
printf ("Provider is not testquorum. Test skipped\n");
return (1);
}
/*
* Set to not quorate
*/
err = confdb_key_create(cdb_handle, quorum_handle, "quorate", strlen("quorate"), "0", strlen("0"));
if (err != CS_OK) {
printf ("Can't create confdb key. Error %d\n", err);
return (2);
}
printf ("%s: initialize\n", __FUNCTION__);
err = sam_initialize (2000, SAM_RECOVERY_POLICY_QUORUM_RESTART);
if (err != CS_OK) {
fprintf (stderr, "Can't initialize SAM API. Error %d\n", err);
return 2;
}
printf ("%s: register\n", __FUNCTION__);
err = sam_register (&instance_id);
if (err != CS_OK) {
fprintf (stderr, "Can't register. Error %d\n", err);
return 2;
}
if (instance_id == 1) {
/*
* Sam start should block forever, but 10s for us should be enough
*/
pthread_create (&kill_thread, NULL, test7_thread, NULL);
printf ("%s iid %d: start - should block forever (waiting 5s)\n", __FUNCTION__, instance_id);
err = sam_start ();
if (err != CS_OK) {
fprintf (stderr, "Can't start hc. Error %d\n", err);
return 2;
}
printf ("%s iid %d: wasn't killed\n", __FUNCTION__, instance_id);
return (2);
}
if (instance_id == 2) {
/*
* Set to quorate
*/
err = confdb_key_create(cdb_handle, quorum_handle, "quorate", strlen("quorate"), "1", strlen("1"));
if (err != CS_OK) {
printf ("Can't create confdb key. Error %d\n", err);
return (2);
}
printf ("%s iid %d: start\n", __FUNCTION__, instance_id);
err = sam_start ();
if (err != CS_OK) {
fprintf (stderr, "Can't start hc. Error %d\n", err);
return 2;
}
/*
* Set corosync unquorate
*/
err = confdb_key_create(cdb_handle, quorum_handle, "quorate", strlen("quorate"), "0", strlen("0"));
if (err != CS_OK) {
printf ("Can't create confdb key. Error %d\n", err);
return (2);
}
printf ("%s iid %d: sleep 3\n", __FUNCTION__, instance_id);
sleep (3);
printf ("%s iid %d: wasn't killed\n", __FUNCTION__, instance_id);
return (2);
}
if (instance_id == 3) {
return (0);
}
return (2);
}
int main(int argc, char *argv[])
{
pid_t pid;
int err;
int stat;
int all_passed = 1;
int no_skipped = 0;
pid = fork ();
@ -856,8 +986,29 @@ int main(int argc, char *argv[])
if (WEXITSTATUS (stat) != 0)
all_passed = 0;
pid = fork ();
if (pid == -1) {
fprintf (stderr, "Can't fork\n");
return 1;
}
if (pid == 0) {
err = test7 ();
sam_finalize ();
return (err);
}
waitpid (pid, &stat, 0);
fprintf (stderr, "test7 %s\n", (WEXITSTATUS (stat) == 0 ? "passed" : (WEXITSTATUS (stat) == 1 ? "skipped" : "failed")));
if (WEXITSTATUS (stat) == 1)
no_skipped++;
if (WEXITSTATUS (stat) > 1)
all_passed = 0;
if (all_passed)
fprintf (stderr, "All tests passed\n");
fprintf (stderr, "All tests passed (%d skipped)\n", no_skipped);
return (all_passed ? 0 : 1);
}