CPG model_initialize and ringid + members callback

Patch adds new function to initialize cpg, cpg_model_initialize. Model
is set of callbacks. With this function, future addions of models
should  be possible without changing the ABI.

Patch also contains callback in CPG_MODEL_V1 for notification about
Totem membership changes.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2770 fd59a12c-fef9-0310-b244-a6a79926bd2f
This commit is contained in:
Jan Friesse 2010-04-20 12:40:48 +00:00
parent da6fce352b
commit e8b143595c
10 changed files with 524 additions and 67 deletions

View File

@ -78,6 +78,10 @@ typedef enum {
CPG_ITERATION_ALL = 3,
} cpg_iteration_type_t;
typedef enum {
CPG_MODEL_V1 = 1,
} cpg_model_t;
struct cpg_address {
uint32_t nodeid;
uint32_t pid;
@ -98,6 +102,11 @@ struct cpg_iteration_description_t {
uint32_t pid;
};
struct cpg_ring_id {
uint32_t nodeid;
uint64_t seq;
};
typedef void (*cpg_deliver_fn_t) (
cpg_handle_t handle,
const struct cpg_name *group_name,
@ -117,11 +126,32 @@ typedef void (*cpg_confchg_fn_t) (
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries);
typedef void (*cpg_totem_confchg_fn_t) (
cpg_handle_t handle,
struct cpg_ring_id ring_id,
uint32_t member_list_entries,
const uint32_t *member_list);
typedef struct {
cpg_deliver_fn_t cpg_deliver_fn;
cpg_confchg_fn_t cpg_confchg_fn;
} cpg_callbacks_t;
typedef struct {
cpg_model_t model;
} cpg_model_data_t;
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF 0x01
typedef struct {
cpg_model_t model;
cpg_deliver_fn_t cpg_deliver_fn;
cpg_confchg_fn_t cpg_confchg_fn;
cpg_totem_confchg_fn_t cpg_totem_confchg_fn;
unsigned int flags;
} cpg_model_v1_data_t;
/** @} */
/*
@ -131,6 +161,15 @@ cs_error_t cpg_initialize (
cpg_handle_t *handle,
cpg_callbacks_t *callbacks);
/*
* Create a new cpg connection, initialize with model
*/
cs_error_t cpg_model_initialize (
cpg_handle_t *handle,
cpg_model_t model,
cpg_model_data_t *model_data,
void *context);
/*
* Close the cpg handle
*/

View File

@ -65,6 +65,7 @@ enum res_cpg_types {
MESSAGE_RES_CPG_ITERATIONNEXT = 10,
MESSAGE_RES_CPG_ITERATIONFINALIZE = 11,
MESSAGE_RES_CPG_FINALIZE = 12,
MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK = 13,
};
enum lib_cpg_confchg_reason {
@ -149,10 +150,24 @@ static inline void marshall_from_mar_cpg_iteration_description_t(
marshall_from_mar_cpg_name_t (&dest->group, &src->group);
};
typedef struct {
mar_uint32_t nodeid __attribute__((aligned(8)));
mar_uint64_t seq __attribute__((aligned(8)));
} mar_cpg_ring_id_t;
static inline void marshall_from_mar_cpg_ring_id_t (
struct cpg_ring_id *dest,
const mar_cpg_ring_id_t *src)
{
dest->nodeid = src->nodeid;
dest->seq = src->seq;
}
struct req_lib_cpg_join {
coroipc_request_header_t header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
mar_uint32_t pid __attribute__((aligned(8)));
mar_uint32_t flags __attribute__((aligned(8)));
};
struct res_lib_cpg_join {
@ -238,6 +253,13 @@ struct res_lib_cpg_confchg_callback {
// struct cpg_address joined_list[];
};
struct res_lib_cpg_totem_confchg_callback {
coroipc_response_header_t header __attribute__((aligned(8)));
mar_cpg_ring_id_t ring_id __attribute__((aligned(8)));
mar_uint32_t member_list_entries __attribute__((aligned(8)));
mar_uint32_t member_list[];
};
struct req_lib_cpg_leave {
coroipc_request_header_t header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));

201
lib/cpg.c
View File

@ -62,8 +62,11 @@
struct cpg_inst {
hdb_handle_t handle;
int finalize;
cpg_callbacks_t callbacks;
void *context;
union {
cpg_model_data_t model_data;
cpg_model_v1_data_t model_v1_data;
};
struct list_head iteration_list_head;
};
@ -117,10 +120,33 @@ static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
cs_error_t cpg_initialize (
cpg_handle_t *handle,
cpg_callbacks_t *callbacks)
{
cpg_model_v1_data_t model_v1_data;
memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
if (callbacks) {
model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
}
return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
}
cs_error_t cpg_model_initialize (
cpg_handle_t *handle,
cpg_model_t model,
cpg_model_data_t *model_data,
void *context)
{
cs_error_t error;
struct cpg_inst *cpg_inst;
if (model != CPG_MODEL_V1) {
error = CPG_ERR_INVALID_PARAM;
goto error_no_destroy;
}
error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
if (error != CS_OK) {
goto error_no_destroy;
@ -142,10 +168,26 @@ cs_error_t cpg_initialize (
goto error_put_destroy;
}
if (callbacks) {
memcpy (&cpg_inst->callbacks, callbacks, sizeof (cpg_callbacks_t));
if (model_data != NULL) {
switch (model) {
case CPG_MODEL_V1:
memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
error = CS_ERR_INVALID_PARAM;
goto error_destroy;
}
break;
default:
error = CS_ERR_LIBRARY;
goto error_destroy;
break;
}
}
cpg_inst->model_data.model = model;
cpg_inst->context = context;
list_init(&cpg_inst->iteration_list_head);
hdb_handle_put (&cpg_handle_t_db, *handle);
@ -283,7 +325,8 @@ cs_error_t cpg_dispatch (
struct cpg_inst *cpg_inst;
struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
cpg_callbacks_t callbacks;
struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
struct cpg_inst cpg_inst_copy;
coroipc_response_header_t *dispatch_data;
struct cpg_address member_list[CPG_MEMBERS_MAX];
struct cpg_address left_list[CPG_MEMBERS_MAX];
@ -292,6 +335,8 @@ cs_error_t cpg_dispatch (
mar_cpg_address_t *left_list_start;
mar_cpg_address_t *joined_list_start;
unsigned int i;
struct cpg_ring_id ring_id;
uint32_t totem_member_list[CPG_MEMBERS_MAX];
error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
if (error != CS_OK) {
@ -332,74 +377,96 @@ cs_error_t cpg_dispatch (
* A risk of this dispatch method is that the callback routines may
* operate at the same time that cpgFinalize has been called.
*/
memcpy (&callbacks, &cpg_inst->callbacks, sizeof (cpg_callbacks_t));
/*
* Dispatch incoming message
*/
switch (dispatch_data->id) {
case MESSAGE_RES_CPG_DELIVER_CALLBACK:
if (callbacks.cpg_deliver_fn == NULL) {
memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
switch (cpg_inst_copy.model_data.model) {
case CPG_MODEL_V1:
/*
* Dispatch incoming message
*/
switch (dispatch_data->id) {
case MESSAGE_RES_CPG_DELIVER_CALLBACK:
if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
break;
}
res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
marshall_from_mar_cpg_name_t (
&group_name,
&res_cpg_deliver_callback->group_name);
cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
&group_name,
res_cpg_deliver_callback->nodeid,
res_cpg_deliver_callback->pid,
&res_cpg_deliver_callback->message,
res_cpg_deliver_callback->msglen);
break;
}
res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
break;
}
marshall_from_mar_cpg_name_t (
&group_name,
&res_cpg_deliver_callback->group_name);
res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
callbacks.cpg_deliver_fn (handle,
&group_name,
res_cpg_deliver_callback->nodeid,
res_cpg_deliver_callback->pid,
&res_cpg_deliver_callback->message,
res_cpg_deliver_callback->msglen);
break;
for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
marshall_from_mar_cpg_address_t (&member_list[i],
&res_cpg_confchg_callback->member_list[i]);
}
left_list_start = res_cpg_confchg_callback->member_list +
res_cpg_confchg_callback->member_list_entries;
for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
marshall_from_mar_cpg_address_t (&left_list[i],
&left_list_start[i]);
}
joined_list_start = res_cpg_confchg_callback->member_list +
res_cpg_confchg_callback->member_list_entries +
res_cpg_confchg_callback->left_list_entries;
for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
marshall_from_mar_cpg_address_t (&joined_list[i],
&joined_list_start[i]);
}
marshall_from_mar_cpg_name_t (
&group_name,
&res_cpg_confchg_callback->group_name);
cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
&group_name,
member_list,
res_cpg_confchg_callback->member_list_entries,
left_list,
res_cpg_confchg_callback->left_list_entries,
joined_list,
res_cpg_confchg_callback->joined_list_entries);
case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
if (callbacks.cpg_confchg_fn == NULL) {
break;
}
case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
break;
}
res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
marshall_from_mar_cpg_address_t (&member_list[i],
&res_cpg_confchg_callback->member_list[i]);
}
left_list_start = res_cpg_confchg_callback->member_list +
res_cpg_confchg_callback->member_list_entries;
for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
marshall_from_mar_cpg_address_t (&left_list[i],
&left_list_start[i]);
}
joined_list_start = res_cpg_confchg_callback->member_list +
res_cpg_confchg_callback->member_list_entries +
res_cpg_confchg_callback->left_list_entries;
for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
marshall_from_mar_cpg_address_t (&joined_list[i],
&joined_list_start[i]);
}
marshall_from_mar_cpg_name_t (
&group_name,
&res_cpg_confchg_callback->group_name);
marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
}
callbacks.cpg_confchg_fn (handle,
&group_name,
member_list,
res_cpg_confchg_callback->member_list_entries,
left_list,
res_cpg_confchg_callback->left_list_entries,
joined_list,
res_cpg_confchg_callback->joined_list_entries);
break;
default:
coroipcc_dispatch_put (cpg_inst->handle);
error = CS_ERR_LIBRARY;
goto error_put;
break;
}
cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
ring_id,
res_cpg_totem_confchg_callback->member_list_entries,
totem_member_list);
break;
default:
coroipcc_dispatch_put (cpg_inst->handle);
error = CS_ERR_LIBRARY;
goto error_put;
break;
} /* - switch (dispatch_data->id) */
break; /* case CPG_MODEL_V1 */
} /* - switch (cpg_inst_copy.model_data.model) */
coroipcc_dispatch_put (cpg_inst->handle);
/*
@ -434,6 +501,14 @@ cs_error_t cpg_join (
req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
req_lib_cpg_join.pid = getpid();
req_lib_cpg_join.flags = 0;
switch (cpg_inst->model_data.model) {
case CPG_MODEL_V1:
req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
break;
}
marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
group);

View File

@ -1 +1 @@
4.0.1
4.1.0

View File

@ -71,6 +71,7 @@ dist_man_MANS = \
cpg_leave.3 \
cpg_local_get.3 \
cpg_mcast_joined.3 \
cpg_model_initialize.3 \
cpg_zcb_mcast_joined.3 \
cpg_zcb_alloc.3 \
cpg_zcb_free.3 \

View File

@ -41,7 +41,10 @@ cpg_initialize \- Create a new connection to the CPG service
.SH DESCRIPTION
The
.B cpg_initialize
function is used to initialize a connection to the closed process groups API.
function is used to initialize a connection to the closed process groups API. This function is deprecated
and
.B cpg_model_initialize
should be used in newly written code.
.PP
Each application may have several connections to the CPG API. Each application
uses the
@ -167,5 +170,6 @@ The errors are undocumented.
.BR cpg_context_get (3)
.BR cpg_context_set (3)
.BR cpg_local_get (3)
.BR cpg_model_initialize (3)
.PP

231
man/cpg_model_initialize.3 Normal file
View File

@ -0,0 +1,231 @@
.\"/*
.\" * Copyright (c) 2010 Red Hat, Inc.
.\" *
.\" * All rights reserved.
.\" *
.\" * Author: Jan Friesse <jfriesse@redhat.com>
.\" * Author: Christine Caulfield <ccaulfie@redhat.com>
.\" *
.\" * This software licensed under BSD license, the text of which follows:
.\" *
.\" * Redistribution and use in source and binary forms, with or without
.\" * modification, are permitted provided that the following conditions are met:
.\" *
.\" * - Redistributions of source code must retain the above copyright notice,
.\" * this list of conditions and the following disclaimer.
.\" * - Redistributions in binary form must reproduce the above copyright notice,
.\" * this list of conditions and the following disclaimer in the documentation
.\" * and/or other materials provided with the distribution.
.\" * - Neither the name of the MontaVista Software, Inc. nor the names of its
.\" * contributors may be used to endorse or promote products derived from this
.\" * software without specific prior written permission.
.\" *
.\" * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
.\" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
.\" * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
.\" * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
.\" * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
.\" * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
.\" * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
.\" * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
.\" * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
.\" * THE POSSIBILITY OF SUCH DAMAGE.
.\" */
.TH CPG_MODEL_INITIALIZE 3 2010-04-07 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
.SH NAME
cpg_model_initialize \- Create a new connection to the CPG service
.SH SYNOPSIS
.B #include <corosync/cpg.h>
.sp
.BI "cs_error_t cpg_model_initialize(cpg_handle_t *" handle ", cpg_model_t " model ", cpg_model_data_t *" model_data ", void *" context ");
.SH DESCRIPTION
The
.B cpg_model_initialize
function is used to initialize a connection to the closed process groups API.
.PP
Each application may have several connections to the CPG API. Each application
uses the
.I handle
argument to uniquely identify the connection. The
.I handle
argument is then used in other function calls to identify the connection to be used
for communication with the CPG service.
.PP
Argument
.I model
is used to explicitly choose set of callbacks and internal parameters. Currently only model
.I CPG_MODEL_V1
is defined.
.PP
Callbacks and internal parameters are passed by
.I model_data
argument. This is casted pointer (idea is similar as in sockaddr function) to one of structures
corresponding to chosen model. Currently only
.I cpg_model_v1_data_t
is needed.
.SH MODEL_V1
The
.I MODEL_V1
is backwards compatible with original callbacks initialized by
.I cpg_initialize
but new callback
.I cpg_totem_confchg_fn
is defined.
.PP
Every time an CPG event occurs within the joined group, one of the callbacks specified by the argument
.I callbacks
is called. The callback functions are described by the following type definitions:
.PP
.PP
.IP
.RS
.ne 18
.nf
.ta 4n 20n 32n
typedef void (*cpg_deliver_fn_t) (
cpg_handle_t handle,
const struct cpg_name *group_name,
uint32_t nodeid,
uint32_t pid,
const void *msg,
size_t msg_len);
typedef void (*cpg_confchg_fn_t) (
cpg_handle_t handle,
const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries);
typedef void (*cpg_totem_confchg_fn_t) (
cpg_handle_t handle,
struct cpg_ring_id ring_id,
uint32_t member_list_entries,
const uint32_t *member_list);
.ta
.fi
.RE
.IP
.PP
.PP
The
.I cpg_model_v1_data_t
structure is defined as:
.IP
.RS
.ne 18
.nf
.PP
typedef struct {
cpg_model_t model;
cpg_deliver_fn_t cpg_deliver_fn;
cpg_confchg_fn_t cpg_confchg_fn;
cpg_totem_confchg_fn_t cpg_totem_confchg_fn;
unsigned int flags;
} cpg_model_v1_data_t;
.ta
.fi
.RE
.IP
.PP
When a configuration change occurs or a message is to be delivered one of the callbacks
is called from the
.B cpg_dispatch()
function. If a configuration change occurs,
.I cpg_confchg_fn
is called. If a delivery of a message occurs,
.I cpg_deliver_fn
is called.
When totem membership change occurs,
.I cpg_totem_confchg_fn
is called. You can OR
.I CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
constant to flags to get callback after first confchg event.
The
.I cpg_address
structure is defined
.IP
.RS
.ne 18
.nf
.PP
struct cpg_address {
unsigned int nodeid;
unsigned int pid;
unsigned int reason;
};
.ta
.fi
.RE
.IP
.PP
where nodeid is a 32 bit unique node identifier, pid is the process ID of the process that has joined/left the group
or sent the message, and reason is an integer code indicating why the node joined/left the group.
.PP
.IP
.RS
.ne 18
.nf
.PP
CPG_REASON_JOIN - the process joined a group using cpg_join().
CPG_REASON_LEAVE - the process left a group using cpg_leave()
CPG_REASON_NODEDOWN - the process left a group because the node left the cluster.
CPG_REASON_NODEUP - the process joined a group because it was already a member of a group on a node that has just joined the cluster
CPG_REASON_PROCDOWN - the process left a group without calling cpg_leave()
.ta
.fi
.RE
.IP
.PP
The
.I cpg_ring_id
structure is defined
.IP
.RS
.ne 18
.nf
.PP
struct cpg_ring_id {
uint32_t nodeid;
uint64_t seq;
};
.ta
.fi
.RE
.IP
.PP
where
.I nodeid
is if of node of current Totem leader and seq is increasing number.
.PP
.SH RETURN VALUE
This call returns the CPG_OK value if successful, otherwise an error is returned.
.PP
.SH ERRORS
The errors are undocumented.
.SH "SEE ALSO"
.BR cpg_overview (8),
.BR cpg_initialize (3),
.BR cpg_finalize (3),
.BR cpg_fd_get (3),
.BR cpg_dispatch (3),
.BR cpg_join (3),
.BR cpg_leave (3),
.BR cpg_mcast_joined (3),
.BR cpg_membership_get (3)
.BR cpg_zcb_alloc (3)
.BR cpg_zcb_free (3)
.BR cpg_zcb_mcast_joined (3)
.BR cpg_context_get (3)
.BR cpg_context_set (3)
.BR cpg_local_get (3)
.BR cpg_model_initialize (3)
.PP

View File

@ -61,6 +61,7 @@ access the corosync services.
.BR cpg_join (3),
.BR cpg_leave (3),
.BR cpg_mcast_joined (3),
.BR cpg_model_initialize (3),
.BR cpg_membership_get (3)
.BR cpg_zcb_alloc (3)
.BR cpg_zcb_free (3)

View File

@ -133,6 +133,8 @@ struct cpg_pd {
mar_cpg_name_t group_name;
uint32_t pid;
enum cpd_state cpd_state;
unsigned int flags;
int initial_totem_conf_sent;
struct list_head list;
struct list_head iteration_instance_list_head;
};
@ -160,6 +162,8 @@ static struct corosync_api_v1 *api = NULL;
static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
static mar_cpg_ring_id_t last_sync_ring_id;
struct process_info {
unsigned int nodeid;
uint32_t pid;
@ -255,6 +259,11 @@ static void cpg_sync_activate (void);
static void cpg_sync_abort (void);
static int notify_lib_totem_membership (
void *conn,
int member_list_entries,
const unsigned int *member_list);
/*
* Library Handler Definition
*/
@ -432,6 +441,9 @@ static void cpg_sync_init_v2 (
sizeof (unsigned int));
my_member_list_entries = member_list_entries;
last_sync_ring_id.nodeid = ring_id->rep.nodeid;
last_sync_ring_id.seq = ring_id->seq;
for (i = 0; i < my_member_list_entries; i++) {
if (my_member_list[i] < lowest_nodeid) {
lowest_nodeid = my_member_list[i];
@ -482,13 +494,50 @@ static void cpg_sync_activate (void)
memcpy (my_old_member_list, my_member_list,
my_member_list_entries * sizeof (unsigned int));
my_old_member_list_entries = my_member_list_entries;
notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
}
static void cpg_sync_abort (void)
{
}
static int notify_lib_totem_membership (
void *conn,
int member_list_entries,
const unsigned int *member_list)
{
struct list_head *iter;
char *buf;
int size;
struct res_lib_cpg_totem_confchg_callback *res;
size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
sizeof(mar_uint32_t) * (member_list_entries);
buf = alloca(size);
if (!buf)
return CPG_ERR_LIBRARY;
res = (struct res_lib_cpg_totem_confchg_callback *)buf;
res->member_list_entries = member_list_entries;
res->header.size = size;
res->header.id = MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK;
res->header.error = CS_OK;
memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
if (conn == NULL) {
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list);
api->ipc_dispatch_send (cpg_pd->conn, buf, size);
}
} else {
api->ipc_dispatch_send (conn, buf, size);
}
return CPG_OK;
}
static int notify_lib_joinlist(
const mar_cpg_name_t *group_name,
@ -604,6 +653,20 @@ static int notify_lib_joinlist(
}
}
/*
* Traverse thru cpds and send totem membership for cpd, where it is not send yet
*/
for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
if ((cpd->flags & CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF) && (cpd->initial_totem_conf_sent == 0)) {
cpd->initial_totem_conf_sent = 1;
notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
}
}
return CPG_OK;
}
@ -1093,6 +1156,7 @@ static void message_handler_req_lib_cpg_join (void *conn, const void *message)
error = CPG_OK;
cpd->cpd_state = CPD_STATE_JOIN_STARTED;
cpd->pid = req_lib_cpg_join->pid;
cpd->flags = req_lib_cpg_join->flags;
memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
sizeof (cpd->group_name));

View File

@ -132,9 +132,29 @@ static void ConfchgCallback (
}
}
static cpg_callbacks_t callbacks = {
static void TotemConfchgCallback (
cpg_handle_t handle,
struct cpg_ring_id ring_id,
uint32_t member_list_entries,
const uint32_t *member_list)
{
int i;
printf("\nTotemConfchgCallback: ringid (%u.%llu)\n", ring_id.nodeid, ring_id.seq);
printf("active processors %lu: ",
(unsigned long int) member_list_entries);
for (i=0; i<member_list_entries; i++) {
printf("%d ", member_list[i]);
}
printf ("\n");
}
static cpg_model_v1_data_t model_data = {
.cpg_deliver_fn = DeliverCallback,
.cpg_confchg_fn = ConfchgCallback,
.cpg_totem_confchg_fn = TotemConfchgCallback,
.flags = CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF,
};
static void sigintr_handler (int signum) __attribute__((__noreturn__));
@ -170,7 +190,7 @@ int main (int argc, char *argv[]) {
group_name.length = 6;
}
result = cpg_initialize (&handle, &callbacks);
result = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_data, NULL);
if (result != CS_OK) {
printf ("Could not initialize Cluster Process Group API instance error %d\n", result);
exit (1);