The patch contains:

The instantiaton of the component is performed with some new steps:
 
 1. SU invoke Comp to instantiate
 2. Comp multicast a new event 
 MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE
 3. Comp receive the new  event 
 MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE
 4. If the Comp is within the SU hosted on the node. The 

 component invokes

    the clc_cli instantiate script to start the component 

 and start a timer

    to supervise the start and registration of the component.
 5. If the instantiation time elapse before the component has 
 registered himself
    Comp is sending a new multicasted event
    MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE_TMO.
 6. Comp receive 

 MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE_TMO event.

 7. The Comp presence state is set to 
 SA_AMF_PRESENCE_INSTANTIATION_FAILED
 8. When all Components are in presence state 

 SA_AMF_PRESENCE_INSTANTIATED or

    SA_AMF_PRESENCE_INSTANTIATION_FAILED the start or restart will 
 continue with
    the assignment of load.
 
 This implemntation means that the complete instantiation procedure 
 never will be endlessly waiting for a register. The 

 instantiation will 

 either turn out in a component instantiation failure or a success.
 
 
 Hardening of the cluster start use case:
 
 1. A clearer separation of the responsibilities between 

 amf_cluster and

    amf_application.
                      
 2. A clearer interface and separation between amf_main (amf.c) and 
 amf_cluster.
 
 3. A clearer interface and separation between amf_cluster 

 and amf_node.

 
 4. A clearer separation of the responsibilities between amf_node and
    amf_application.
 



git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1251 fd59a12c-fef9-0310-b244-a6a79926bd2f
This commit is contained in:
Lon Hohberger 2006-10-04 05:24:14 +00:00
parent d149242177
commit 5126acc37a
10 changed files with 1782 additions and 850 deletions

View File

@ -68,7 +68,9 @@
*
* State: Event: Action: New state:
* ===========================================================================
* IDLE node_joined PROBING-1
* - init[AMF disabled] UNCONFIGURED
* - init IDLE
* IDLE node_joined A0 PROBING-1
* PROBING-1 timer1 timeout A1 PROBING-2
* PROBING-1 SYNC_START A2 UPDATING_CLUSTER_MODEL
* PROBING-1 node_joined A7 PROBING-1
@ -76,14 +78,15 @@
* PROBING-2 SYNC_START[From other] UPDATING_CLUSTER_MODEL
* PROBING-2 node_joined A7 PROBING-2
* CREATING_CLUSTER_MODEL Model created A8 SYNCHRONIZING
* SYNCHRONIZING SYNC_READY A10 NORMAL_OPERATION
* SYNCHRONIZING sync_activate A10 NORMAL_OPERATION
* SYNCHRONIZING node_left[sync_master] A5 SYNCHRONIZING
* SYNCHRONIZING node_joined SYNCHRONIZING
* SYNCHRONIZING node_joined[sync_master
* == me] A1 SYNCHRONIZING
* UPDATING_CLUSTER_MODEL SYNC_DATA A3 UPDATING_CLUSTER_MODEL
* UPDATING_CLUSTER_MODEL SYNC_READY A4 NORMAL_OPERATION
* UPDATING_CLUSTER_MODEL SYNC_START A5 NORMAL_OPERATION
* UPDATING_CLUSTER_MODEL sync_activate A4 NORMAL_OPERATION
* UPDATING_CLUSTER_MODEL SYNC_START A5 UPDATING_CLUSTER_MODEL
* UPDATING_CLUSTER_MODEL node_left[sync_master] PROBING-1
* NORMAL_OPERATION node_joined SYNCHRONIZING
* NORMAL_OPERATION sync_init SYNCHRONIZING
* NORMAL_OPERATION node_left[sync_master] A6 NORMAL_OPERATION
* NORMAL_OPERATION SYNC_REQUEST A8 NORMAL_OPERATION
* Any SYNC_REQUEST A9 No change
@ -103,16 +106,17 @@
*
* 1.3 Action Description
* ======================
* A0 - Start timer1
* A1 - Multicast SYNC_START message
* A2 - Stop timer1
* A3 - Decode AMF object and save
* A4 - Create cluster model
* A4 - Create cluster model; cluster sync ready
* A5 - Free received SYNC_DATA
* A6 - Calculate new sync master
* A7 - Multicast SYNC_REQUEST message
* A8 - Update AMF node object(s) with CLM nodeid
* A9 - Save CLM nodeid & hostname
* A10- Delete CLM nodes
* A10- Delete CLM nodes; cluster sync ready
*/
#include <sys/types.h>
@ -195,6 +199,8 @@ static void message_handler_req_exec_amf_comp_register (
void *message, unsigned int nodeid);
static void message_handler_req_exec_amf_comp_error_report (
void *message, unsigned int nodeid);
static void message_handler_req_exec_amf_comp_instantiate (
void *message, unsigned int nodeid);
static void message_handler_req_exec_amf_clc_cleanup_completed (
void *message, unsigned int nodeid);
static void message_handler_req_exec_amf_healthcheck_tmo (
@ -211,6 +217,8 @@ static void message_handler_req_exec_amf_cluster_start_tmo (
void *message, unsigned int nodeid);
static void message_handler_req_exec_amf_sync_request (
void *message, unsigned int nodeid);
static void message_handler_req_exec_amf_comp_instantiate_tmo(
void *message, unsigned int nodeid);
static void amf_dump_fn (void);
static void amf_sync_init (void);
static int amf_sync_process (void);
@ -323,6 +331,9 @@ static struct openais_exec_handler amf_exec_service[] = {
{
.exec_handler_fn = message_handler_req_exec_amf_comp_error_report,
},
{
.exec_handler_fn = message_handler_req_exec_amf_comp_instantiate,
},
{
.exec_handler_fn = message_handler_req_exec_amf_clc_cleanup_completed,
},
@ -347,6 +358,9 @@ static struct openais_exec_handler amf_exec_service[] = {
{
.exec_handler_fn = message_handler_req_exec_amf_sync_request,
},
{
.exec_handler_fn = message_handler_req_exec_amf_comp_instantiate_tmo,
},
};
/*
@ -425,6 +439,7 @@ struct req_exec_amf_comp_error_report {
SaNtfIdentifierT ntfIdentifier;
};
struct req_exec_amf_response {
mar_req_header_t header;
SaUint32T interface;
@ -434,11 +449,13 @@ struct req_exec_amf_response {
struct req_exec_amf_sync_data {
mar_req_header_t header;
amf_object_type_t object_type;
SaUint32T protocol_version;
SaUint32T object_type;
};
struct req_exec_amf_sync_request {
mar_req_header_t header;
SaUint32T protocol_version;
char hostname[HOST_NAME_MAX + 1];
};
@ -585,12 +602,10 @@ static void nodeids_init (void)
*
* @return struct amf_node*
*/
static struct amf_node *get_this_node_obj (struct amf_cluster *cluster)
static struct amf_node *get_this_node_obj (void)
{
char hostname[HOST_NAME_MAX + 1];
assert (cluster != NULL);
if (gethostname (hostname, sizeof(hostname)) == -1) {
log_printf (LOG_LEVEL_ERROR, "gethostname failed: %d", errno);
openais_exit_error (AIS_DONE_FATAL_ERR);
@ -600,7 +615,7 @@ static struct amf_node *get_this_node_obj (struct amf_cluster *cluster)
}
/**
* Prints old and new sync state, sets new tate
* Prints old and new sync state, sets new state
* @param state
*/
static void sync_state_set (enum scsm_states state)
@ -630,6 +645,7 @@ static int mcast_sync_data (
SYNCTRACE ("%d bytes, type %u", req_exec.header.size , object_type);
req_exec.header.id =
SERVICE_ID_MAKE (AMF_SERVICE, MESSAGE_REQ_EXEC_AMF_SYNC_DATA);
req_exec.protocol_version = AMF_PROTOCOL_VERSION;
req_exec.object_type = object_type;
iov[0].iov_base = &req_exec;
@ -672,7 +688,7 @@ static void sync_request (void)
SYNCTRACE ("");
assert (amf_cluster->state == CLUSTER_UNINSTANTIATED);
assert (amf_cluster->acsm_state == CLUSTER_AC_UNINSTANTIATED);
amf_sync_init ();
@ -703,7 +719,7 @@ static int create_cluster_model (void)
openais_exit_error (AIS_DONE_AMFCONFIGREAD);
}
this_amf_node = get_this_node_obj (amf_cluster);
this_amf_node = get_this_node_obj ();
if (this_amf_node == NULL) {
log_printf (LOG_LEVEL_INFO,
@ -786,7 +802,6 @@ static int healthcheck_sync (struct amf_healthcheck *healthcheck)
SYNCTRACE ("%s", healthcheck->safHealthcheckKey.key);
buf = amf_healthcheck_serialize (healthcheck, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_HEALTHCHECK);
free (buf);
if (res != 0) {
@ -805,7 +820,6 @@ static int comp_sync (struct amf_comp *comp)
if (!scsm.comp_sync_completed) {
buf = amf_comp_serialize (comp, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_COMP);
free (buf);
if (res != 0) {
@ -837,7 +851,6 @@ static int su_sync (struct amf_su *su)
if (!scsm.su_sync_completed) {
buf = amf_su_serialize (su, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_SU);
free (buf);
if (res != 0) {
@ -868,7 +881,6 @@ static int sg_sync (struct amf_sg *sg)
if (!scsm.sg_sync_completed) {
buf = amf_sg_serialize (sg, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_SG);
free (buf);
if (res != 0) {
@ -898,7 +910,6 @@ static int csi_assignment_sync (struct amf_csi_assignment *csi_assignment)
SYNCTRACE ("%s", csi_assignment->name.value);
buf = amf_csi_assignment_serialize (csi_assignment, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_CSI_ASSIGNMENT);
free (buf);
if (res != 0) {
@ -916,7 +927,6 @@ static int csi_attribute_sync (struct amf_csi_attribute *csi_attribute)
SYNCTRACE ("%s", csi_attribute->name);
buf = amf_csi_attribute_serialize (csi_attribute, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_CSI_ATTRIBUTE);
free (buf);
if (res != 0) {
@ -935,7 +945,6 @@ static int csi_sync (struct amf_csi *csi)
if (!scsm.csi_sync_completed) {
buf = amf_csi_serialize (csi, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_CSI);
free (buf);
if (res != 0) {
@ -947,7 +956,8 @@ static int csi_sync (struct amf_csi *csi)
if (scsm.csi_assignment == NULL) {
scsm.csi_assignment = scsm.csi->assigned_csis;
}
for (; scsm.csi_assignment != NULL; scsm.csi_assignment = scsm.csi_assignment->next) {
for (; scsm.csi_assignment != NULL;
scsm.csi_assignment = scsm.csi_assignment->next) {
if (csi_assignment_sync (scsm.csi_assignment) != 0) {
return 1; /* try again later */
}
@ -974,7 +984,6 @@ static int si_assignment_sync (struct amf_si_assignment *si_assignment)
SYNCTRACE ("%s", si_assignment->name.value);
buf = amf_si_assignment_serialize (si_assignment, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_SI_ASSIGNMENT);
free (buf);
if (res != 0) {
@ -993,7 +1002,6 @@ static int si_sync (struct amf_si *si)
if (!scsm.si_sync_completed) {
buf = amf_si_serialize (si, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_SI);
free (buf);
if (res != 0) {
@ -1033,7 +1041,6 @@ static int application_sync (struct amf_application *app)
if (!scsm.app_sync_completed) {
buf = amf_application_serialize (app, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_APPLICATION);
free (buf);
if (res != 0) {
@ -1072,7 +1079,6 @@ static int node_sync (struct amf_node *node)
SYNCTRACE ("%s", node->name.value);
buf = amf_node_serialize (node, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_NODE);
free (buf);
if (res != 0) {
@ -1089,7 +1095,6 @@ static int cluster_sync (struct amf_cluster *cluster)
SYNCTRACE ("%s", cluster->name.value);
buf = amf_cluster_serialize (cluster, &len);
assert (buf != NULL);
res = mcast_sync_data (buf, len, AMF_CLUSTER);
free (buf);
if (res != 0) {
@ -1121,10 +1126,13 @@ static int is_member (
return 0;
}
/**
* Start the AMF nodes that has joined
*/
static void joined_nodes_start (void)
static void cluster_joined_nodes_start (void)
{
int i;
struct amf_node *node;
@ -1133,7 +1141,7 @@ static void joined_nodes_start (void)
node = amf_node_find_by_nodeid (scsm.joined_list[i]);
if (node != NULL) {
amf_node_sync_ready (node);
amf_cluster_sync_ready (amf_cluster, node);
} else {
log_printf (LOG_LEVEL_INFO,
"Info: Node %u is not configured as an AMF node", scsm.joined_list[i]);
@ -1163,6 +1171,7 @@ static void amf_sync_init (void)
default:
dprintf ("unknown state: %u", scsm.state);;
assert (0);
break;
}
if (scsm.state == SYNCHRONIZING && scsm.sync_master == this_ip->nodeid) {
@ -1251,6 +1260,7 @@ static void amf_sync_abort (void)
{
SYNCTRACE ("state %s", scsm_state_names[scsm.state]);
memset (&scsm, 0, sizeof (scsm));
assert (0); /* not ready... */
}
/**
@ -1274,51 +1284,21 @@ static void amf_sync_activate (void)
}
clm_nodes = NULL;
sync_state_set (NORMAL_OPERATION);
/* TODO: Remove dependencies to amf_cluster->state */
switch (amf_cluster->state) {
case CLUSTER_STARTED:
case CLUSTER_STARTING_WORKLOAD:
joined_nodes_start ();
break;
case CLUSTER_STARTING_COMPONENTS: {
amf_cluster_sync_ready (amf_cluster);
joined_nodes_start ();
break;
}
case CLUSTER_UNINSTANTIATED:
default: {
amf_cluster_sync_ready (amf_cluster);
}
}
cluster_joined_nodes_start ();
break;
case UPDATING_CLUSTER_MODEL:
amf_cluster = scsm.cluster;
assert (amf_cluster != NULL);
scsm.cluster = NULL;
this_amf_node = get_this_node_obj (amf_cluster);
this_amf_node = get_this_node_obj ();
sync_state_set (NORMAL_OPERATION);
if (this_amf_node != NULL) {
this_amf_node->nodeid = this_ip->nodeid;
#ifdef AMF_DEBUG
amf_runtime_attributes_print (amf_cluster);
#endif
/* TODO: Remove dependencies to amf_cluster->state */
switch (amf_cluster->state) {
case CLUSTER_STARTED: {
case CLUSTER_STARTING_WORKLOAD:
amf_node_sync_ready (this_amf_node);
break;
}
case CLUSTER_STARTING_COMPONENTS: {
amf_cluster_sync_ready (amf_cluster);
amf_node_sync_ready (this_amf_node);
break;
}
case CLUSTER_UNINSTANTIATED:
default: {
amf_cluster_sync_ready (amf_cluster);
}
}
amf_cluster_sync_ready (amf_cluster, this_amf_node);
} else {
log_printf (LOG_LEVEL_INFO,
"Info: This node is not configured as an AMF node, disabling.");
@ -1332,6 +1312,7 @@ static void amf_sync_activate (void)
default:
dprintf ("unknown state: %u", scsm.state);;
assert (0);
break;
}
SYNCTRACE ("");
@ -1419,8 +1400,11 @@ static void amf_confchg_fn (
/* fall-through */
case PROBING_2:
if (joined_list_entries > 0) {
struct req_exec_amf_sync_request msg;
memcpy (msg.hostname, hostname, strlen (hostname) + 1);
msg.protocol_version = AMF_PROTOCOL_VERSION;
amf_msg_mcast (MESSAGE_REQ_EXEC_AMF_SYNC_REQUEST,
hostname, strlen (hostname) + 1);
&msg.protocol_version, sizeof (msg) - sizeof (mar_req_header_t));
}
break;
case UNCONFIGURED:
@ -1493,6 +1477,7 @@ static void amf_confchg_fn (
default:
log_printf (LOG_LEVEL_ERROR, "unknown state: %u\n", scsm.state);
assert (0);
break;
}
}
@ -1504,15 +1489,13 @@ static int amf_lib_exit_fn (void *conn)
assert (amf_pd != NULL);
comp = amf_pd->comp;
assert (comp != NULL);
/* Make sure this is not a new connection */
if (comp->conn == conn) {
if (comp != NULL && comp->conn == conn ) {
comp->conn = NULL;
dprintf ("Lib exit from comp %s\n", getSaNameT (&comp->name));
}
dprintf ("Lib exit from comp %s\n", getSaNameT (&comp->name));
return (0);
}
@ -1533,6 +1516,8 @@ static void amf_dump_fn (void)
amf_runtime_attributes_print (amf_cluster);
}
/******************************************************************************
* Executive Message Implementation
*****************************************************************************/
@ -1553,7 +1538,7 @@ static void message_handler_req_exec_amf_comp_register (
assert (comp != NULL);
TRACE1 ("ComponentRegister: '%s'", comp->name.value);
error = amf_comp_register (comp);
if (amf_su_is_local (comp->su)) {
res_lib.header.id = MESSAGE_RES_AMF_COMPONENTREGISTER;
res_lib.header.size = sizeof (struct res_lib_amf_componentregister);
@ -1578,6 +1563,38 @@ static void message_handler_req_exec_amf_comp_error_report (
amf_comp_error_report (comp, req_exec->recommendedRecovery);
}
static void message_handler_req_exec_amf_comp_instantiate(
void *message, unsigned int nodeid)
{
struct req_exec_amf_comp_instantiate *req_exec = message;
struct amf_comp *component;
component = amf_comp_find (amf_cluster, &req_exec->compName);
if (component == NULL) {
log_printf (LOG_ERR, "Error: '%s' not found", req_exec->compName.value);
return;
}
amf_comp_instantiate_event (component);
}
static void message_handler_req_exec_amf_comp_instantiate_tmo(
void *message, unsigned int nodeid)
{
struct req_exec_amf_comp_instantiate_tmo *req_exec = message;
struct amf_comp *component;
component = amf_comp_find (amf_cluster, &req_exec->compName);
if (component == NULL) {
log_printf (LOG_ERR, "Error: '%s' not found", req_exec->compName.value);
return;
}
amf_comp_instantiate_tmo_event (component);
}
static void message_handler_req_exec_amf_clc_cleanup_completed (
void *message, unsigned int nodeid)
{
@ -1621,6 +1638,7 @@ static void message_handler_req_exec_amf_healthcheck_tmo (
amf_comp_healthcheck_tmo (comp, healthcheck);
}
static void message_handler_req_exec_amf_response (
void *message, unsigned int nodeid)
{
@ -1690,6 +1708,7 @@ static void message_handler_req_exec_amf_sync_start (
default:
dprintf ("unknown state %d", scsm.state);
assert (0);
break;
}
}
@ -1697,10 +1716,17 @@ static void message_handler_req_exec_amf_sync_data (
void *message, unsigned int nodeid)
{
struct req_exec_amf_sync_data *req_exec = message;
int sz = sizeof (struct req_exec_amf_sync_data);
char *tmp = ((char*)message) + sizeof (struct req_exec_amf_sync_data);
SYNCTRACE ("rec %d bytes, type %d, ptr %p",
req_exec->header.size, req_exec->object_type, message);
SYNCTRACE ("rec %d bytes, ptr %p, type %d", req_exec->header.size, message,
req_exec->object_type);
#if 0
if (req_exec->protocol_version != AMF_PROTOCOL_VERSION) {
log_printf (LOG_ERR, "Error: Protocol version not supported");
return;
}
#endif
if (scsm.state != UPDATING_CLUSTER_MODEL) {
return;
@ -1709,65 +1735,58 @@ static void message_handler_req_exec_amf_sync_data (
switch (req_exec->object_type)
{
case AMF_CLUSTER:
if ((scsm.cluster = amf_cluster_deserialize (
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
if ((scsm.cluster = amf_cluster_deserialize (tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("Cluster '%s' deserialised", scsm.cluster->name.value);
break;
case AMF_NODE:
if ((scsm.node = amf_node_deserialize (scsm.cluster,
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
if ((scsm.node = amf_node_deserialize (scsm.cluster, tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("Node '%s' deserialised", scsm.node->name.value);
break;
case AMF_APPLICATION:
if ((scsm.app = amf_application_deserialize (scsm.cluster,
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
if ((scsm.app = amf_application_deserialize (scsm.cluster, tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("App '%s' deserialised", scsm.app->name.value);
break;
case AMF_SG:
if ((scsm.sg = amf_sg_deserialize (scsm.app,
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
if ((scsm.sg = amf_sg_deserialize (scsm.app, tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("SG '%s' deserialised", scsm.sg->name.value);
break;
case AMF_SU:
if ((scsm.su = amf_su_deserialize (scsm.sg,
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
if ((scsm.su = amf_su_deserialize (scsm.sg, tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("SU '%s' deserialised", scsm.su->name.value);
break;
case AMF_COMP:
if ((scsm.comp = amf_comp_deserialize (scsm.su,
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
if ((scsm.comp = amf_comp_deserialize (scsm.su, tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("Component '%s' deserialised", scsm.comp->name.value);
break;
case AMF_HEALTHCHECK:
if ((scsm.healthcheck = amf_healthcheck_deserialize (scsm.comp,
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("Healthcheck '%s' deserialised",
scsm.healthcheck->safHealthcheckKey.key);
break;
case AMF_SI:
if ((scsm.si = amf_si_deserialize (scsm.app,
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
if ((scsm.si = amf_si_deserialize (scsm.app, tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("SI '%s' deserialised", scsm.si->name.value);
break;
case AMF_SI_ASSIGNMENT:
if ((scsm.si_assignment = amf_si_assignment_deserialize (scsm.si,
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("SI Ass '%s' deserialised",
@ -1775,14 +1794,14 @@ static void message_handler_req_exec_amf_sync_data (
break;
case AMF_CSI:
if ((scsm.csi = amf_csi_deserialize (scsm.si,
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("CSI '%s' deserialised", scsm.csi->name.value);
break;
case AMF_CSI_ASSIGNMENT:
if ((scsm.csi_assignment = amf_csi_assignment_deserialize (scsm.csi,
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
if ((scsm.csi_assignment = amf_csi_assignment_deserialize (
scsm.csi, tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("CSI Ass '%s' deserialised",
@ -1790,7 +1809,7 @@ static void message_handler_req_exec_amf_sync_data (
break;
case AMF_CSI_ATTRIBUTE:
if ((scsm.csi_attribute = amf_csi_attribute_deserialize (scsm.csi,
((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) {
tmp)) == NULL) {
openais_exit_error (AIS_DONE_FATAL_ERR);
}
SYNCTRACE ("CSI Attr '%s' deserialised",
@ -1818,19 +1837,15 @@ static void message_handler_req_exec_amf_sync_ready (
amf_sync_activate ();
}
static void message_handler_req_exec_amf_cluster_start_tmo (
void *message, unsigned int nodeid)
{
if (scsm.state != NORMAL_OPERATION) {
return;
}
if (nodeid == scsm.sync_master) {
dprintf ("Cluster startup timeout, assigning workload");
amf_cluster_assign_workload (amf_cluster);
} else {
dprintf ("received CLUSTER_START_TMO from non sync-master, ignoring");
}
amf_cluster_start_tmo_event (nodeid == scsm.sync_master, amf_cluster);
}
static void message_handler_req_exec_amf_sync_request (
@ -1838,7 +1853,7 @@ static void message_handler_req_exec_amf_sync_request (
{
struct req_exec_amf_sync_request *req_exec = message;
clm_node_t *clm_node;
SYNCTRACE ("from: %s, name: %s, state %s", totempg_ifaces_print (nodeid),
req_exec->hostname, scsm_state_names[scsm.state]);
@ -1862,6 +1877,7 @@ static void message_handler_req_exec_amf_sync_request (
}
}
/*****************************************************************************
* Library Interface Implementation
****************************************************************************/

View File

@ -50,6 +50,8 @@
#include "timer.h"
#include "aispoll.h"
#define AMF_PROTOCOL_VERSION 1
enum clc_component_types {
clc_component_sa_aware = 0, /* sa aware */
clc_component_proxied_pre = 1, /* proxied, pre-instantiable */
@ -67,22 +69,28 @@ typedef enum {
} amf_node_eesm_state_t;
typedef enum {
NODE_ACSM_REPAIR_NEEDED,
APP_AC_UNINSTANTIATED = 1,
APP_AC_STARTING_SGS,
APP_AC_STARTED,
APP_AC_ASSIGNING_WORKLOAD,
APP_AC_WORKLOAD_ASSIGNED
} app_avail_control_state_t;
typedef enum {
NODE_ACSM_REPAIR_NEEDED = 1,
NODE_ACSM_ESCALLATION_LEVEL_0,
NODE_ACSM_ESCALLATION_LEVEL_2,
NODE_ACSM_ESCALLATION_LEVEL_3,
NODE_ACSM_FAILING_FAST_REBOOTING_NODE,
NODE_ACSM_FAILING_FAST_ACTIVATING_STANDBY_NODE,
NODE_ACSM_FAILING_GRACEFULLY_SWITCHING_OVER,
NODE_ACSM_FAILING_GRACEFULLY_SWITCHING_OVER,
NODE_ACSM_FAILING_GRACEFULLY_FAILING_OVER,
NODE_ACSM_FAILING_GRACEFULLY_REBOOTING_NODE,
NODE_ACSM_LEAVING_SPONTANEOUSLY_FAILING_OVER,
NODE_ACSM_LEAVING_SPONTANEOUSLY_WAITING_FOR_NODE_TO_JOIN,
NODE_ACSM_JOINING_STARTING_SERVICE_UNITS,
NODE_ACSM_JOINING_ASSIGNING_ACTIVE_WORKLOAD,
NODE_ACSM_JOINING_ASSIGNING_STANDBY_WORKLOAD
} amf_node_acsm_state_t;
NODE_ACSM_JOINING_STARTING_APPLICATIONS,
NODE_ACSM_JOINING_ASSIGNING_WORKLOAD
} amf_node_acsm_state_t;
typedef enum {
SG_AC_Idle = 0,
@ -101,6 +109,7 @@ typedef enum {
SG_AC_WaitingAfterOperationFailed
} sg_avail_control_state_t;
typedef enum {
SG_RT_FailoverSU = 1,
SG_RT_FailoverNode
@ -147,14 +156,36 @@ struct amf_si_assignment;
struct amf_csi_assignment;
struct amf_healthcheck;
enum cluster_states {
CLUSTER_UNINSTANTIATED,
CLUSTER_STARTING_COMPONENTS,
CLUSTER_STARTING_WORKLOAD,
CLUSTER_STARTED
};
typedef enum {
CLUSTER_AC_UNINSTANTIATED = 1,
CLUSTER_AC_STARTING_APPLICATIONS,
CLUSTER_AC_WAITING_OVER_TIME,
CLUSTER_AC_ASSIGNING_WORKLOAD,
CLUSTER_AC_STARTED,
CLUSTER_AC_TERMINATING_APPLICATIONS,
CLUSTER_AC_WORKLOAD_REMOVESD,
CLUSTER_AC_REMOVING_WORKLOAD,
CLUSTER_AC_DEACTIVATING_WORKLOAD,
CLUSTER_AC_QUISING
} cluster_avail_control_state_t;
struct amf_cluster {
typedef enum amf_cluster_event {
CLUSTER_SYNC_READY_EV = 1
} amf_cluster_event_t;
typedef struct amf_deferred {
struct amf_deferred *next;
} amf_deferred_t;
typedef struct cluster_deferred {
amf_deferred_t defered_list;
struct amf_node *node;
amf_cluster_event_t event;
} cluster_deferredt_t;
typedef struct amf_cluster {
/* Configuration Attributes */
SaNameT name;
SaUint32T saAmfClusterStartupTimeout;
@ -169,8 +200,9 @@ struct amf_cluster {
/* Implementation */
poll_timer_handle timeout_handle;
enum cluster_states state;
};
cluster_avail_control_state_t acsm_state;
cluster_deferredt_t *deferred_events_head;
} amf_cluster_t;
typedef struct amf_node {
/* Configuration Attributes */
@ -195,7 +227,7 @@ typedef struct amf_node {
amf_node_acsm_state_t acsm_state;
} amf_node_t;
struct amf_application {
typedef struct amf_application {
/* Configuration Attributes */
SaNameT name;
@ -213,16 +245,18 @@ struct amf_application {
SaStringT clccli_path;
struct amf_application *next;
struct amf_node *node_to_start;
};
app_avail_control_state_t acsm_state;
} amf_application_t;
struct sg_recovery_scope {
sg_recovery_type_t recovery_type;
struct amf_si **sis;
struct amf_su **sus;
struct amf_comp *comp;
struct amf_node *node;
};
struct amf_sg {
typedef struct amf_sg {
/* Configuration Attributes */
SaNameT name;
saAmfRedundancyModelT saAmfSGRedundancyModel;
@ -249,18 +283,18 @@ struct amf_sg {
/* Relations */
struct amf_application *application;
/* ordered list of SUs */
/* ordered list of SUs */
struct amf_su *su_head;
/* Implementation */
SaStringT clccli_path;
struct amf_sg *next;
sg_avail_control_state_t avail_state;
struct sg_recovery_scope recovery_scope;
struct sg_recovery_scope recovery_scope;
struct amf_node *node_to_start;
};
} amf_sg_t;
struct amf_su {
typedef struct amf_su {
/* Configuration Attributes */
SaNameT name;
SaUint32T saAmfSURank;
@ -272,12 +306,12 @@ struct amf_su {
SaBoolT saAmfSUPreInstantiable;
SaAmfOperationalStateT saAmfSUOperState;
SaAmfAdminStateT saAmfSUAdminState;
// SaAmfReadinessStateT saAmfSUReadinessState;
/* SaAmfReadinessStateT saAmfSUReadinessState; */
SaAmfPresenceStateT saAmfSUPresenceState;
// SaNameT saAmfSUAssignedSIs;
/* SaNameT saAmfSUAssignedSIs; */
SaNameT saAmfSUHostedByNode;
/* SaUint32T saAmfSUNumCurrActiveSIs; */
/* SaUint32T saAmfSUNumCurrStandbySIs; */
/* SaUint32T saAmfSUNumCurrActiveSIs; */
/* SaUint32T saAmfSUNumCurrStandbySIs; */
SaUint32T saAmfSURestartCount;
/* Relations */
@ -288,11 +322,11 @@ struct amf_su {
su_restart_control_state_t restart_control_state;
su_restart_control_state_t escalation_level_history_state;
SaStringT clccli_path;
SaUint32T su_failover_cnt; /* missing in SAF specs? */
SaUint32T su_failover_cnt; /* missing in SAF specs? */
struct amf_su *next;
};
} amf_su_t;
struct amf_comp {
typedef struct amf_comp {
/* Configuration Attributes */
SaNameT name;
SaNameT **saAmfCompCsTypes;
@ -334,12 +368,12 @@ struct amf_comp {
/* Runtime Attributes */
SaAmfOperationalStateT saAmfCompOperState;
// SaAmfReadinessStateT saAmfCompReadinessState;
/* SaAmfReadinessStateT saAmfCompReadinessState; */
SaAmfPresenceStateT saAmfCompPresenceState;
SaUint32T saAmfCompRestartCount;
/* SaUint32T saAmfCompNumCurrActiveCsi; */
/* SaUint32T saAmfCompNumCurrStandbyCsi; */
/* SaNameT saAmfCompAssignedCsi; */
/* SaUint32T saAmfCompNumCurrActiveCsi; */
/* SaUint32T saAmfCompNumCurrStandbyCsi; */
/*SaNameT saAmfCompAssignedCsi; */
SaNameT saAmfCompCurrProxyName;
SaNameT **saAmfCompCurrProxiedNames;
@ -353,14 +387,15 @@ struct amf_comp {
void *conn;
enum clc_component_types comptype;
struct amf_healthcheck *healthcheck_head;
/**
* Flag that indicates of this component has a suspected error
poll_timer_handle instantiate_timeout_handle;
/*
* Flag that indicates of this component has a suspected error
*/
SaUint32T error_suspected;
};
} amf_comp_t;
struct amf_healthcheck {
typedef struct amf_healthcheck {
/* Configuration Attributes */
SaAmfHealthcheckKeyT safHealthcheckKey;
SaUint32T saAmfHealthcheckMaxDuration;
@ -377,9 +412,9 @@ struct amf_healthcheck {
poll_timer_handle timer_handle_duration;
poll_timer_handle timer_handle_period;
};
} amf_healthcheck_t;
struct amf_si {
typedef struct amf_si {
/* Configuration Attributes */
SaNameT name;
SaNameT saAmfSIProtectedbySG;
@ -390,9 +425,9 @@ struct amf_si {
/* Runtime Attributes */
SaAmfAdminStateT saAmfSIAdminState;
// SaAmfAssignmentStateT saAmfSIAssignmentState;
// SaUint32T saAmfSINumCurrActiveAssignments;
// SaUint32T saAmfSINumCurrStandbyAssignments;
/* SaAmfAssignmentStateT saAmfSIAssignmentState; */
/* SaUint32T saAmfSINumCurrActiveAssignments; */
/* SaUint32T saAmfSINumCurrStandbyAssignments; */
/* Relations */
struct amf_application *application;
@ -403,9 +438,9 @@ struct amf_si {
/* Implementation */
struct amf_si *next;
};
} amf_si_t;
struct amf_si_ranked_su {
typedef struct amf_si_ranked_su {
/* Configuration Attributes */
SaNameT name;
SaUint32T saAmfRank;
@ -417,9 +452,9 @@ struct amf_si_ranked_su {
/* Implementation */
struct amf_si_ranked_su *su_next;
struct amf_si_ranked_su *si_next;
};
} amf_si_ranked_su_t;
struct amf_si_dependency {
typedef struct amf_si_dependency {
/* Configuration Attributes */
SaNameT name;
int saAmfToleranceTime;
@ -428,9 +463,9 @@ struct amf_si_dependency {
/* Implementation */
struct amf_si_dependency *next;
};
} amf_si_dependency_t;
struct amf_si_assignment {
typedef struct amf_si_assignment {
/* Runtime Attributes */
SaNameT name;
SaAmfHAStateT saAmfSISUHAState;
@ -444,9 +479,9 @@ struct amf_si_assignment {
struct amf_si_assignment *next;
void (*assumed_callback_fn) (
struct amf_si_assignment *si_assignment, int result);
};
} amf_si_assignment_t;
struct amf_csi {
typedef struct amf_csi {
/* Configuration Attributes */
SaNameT name;
SaNameT saAmfCSTypeName;
@ -459,18 +494,18 @@ struct amf_csi {
/* Implementation */
struct amf_csi *next;
};
} amf_csi_t;
struct amf_csi_attribute {
typedef struct amf_csi_attribute {
/* Configuration Attributes */
SaStringT name;
SaStringT *value;
/* Implementation */
struct amf_csi_attribute *next;
};
} amf_csi_attribute_t;
struct amf_csi_assignment {
typedef struct amf_csi_assignment {
/* Runtime Attributes */
SaNameT name;
SaAmfHAStateT saAmfCSICompHAState; /* confirmed HA state */
@ -483,7 +518,7 @@ struct amf_csi_assignment {
SaAmfHAStateT requested_ha_state;
struct amf_csi_assignment *next;
struct amf_si_assignment *si_assignment;
};
} amf_csi_assignment_t;
enum amf_response_interfaces {
AMF_RESPONSE_HEALTHCHECKCALLBACK = 1,
@ -495,14 +530,16 @@ enum amf_response_interfaces {
enum amf_message_req_types {
MESSAGE_REQ_EXEC_AMF_COMPONENT_REGISTER = 0,
MESSAGE_REQ_EXEC_AMF_COMPONENT_ERROR_REPORT = 1,
MESSAGE_REQ_EXEC_AMF_CLC_CLEANUP_COMPLETED = 2,
MESSAGE_REQ_EXEC_AMF_HEALTHCHECK_TMO = 3,
MESSAGE_REQ_EXEC_AMF_RESPONSE = 4,
MESSAGE_REQ_EXEC_AMF_SYNC_START = 5,
MESSAGE_REQ_EXEC_AMF_SYNC_DATA = 6,
MESSAGE_REQ_EXEC_AMF_SYNC_READY = 7,
MESSAGE_REQ_EXEC_AMF_CLUSTER_START_TMO = 8,
MESSAGE_REQ_EXEC_AMF_SYNC_REQUEST = 9
MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE = 2,
MESSAGE_REQ_EXEC_AMF_CLC_CLEANUP_COMPLETED = 3,
MESSAGE_REQ_EXEC_AMF_HEALTHCHECK_TMO = 4,
MESSAGE_REQ_EXEC_AMF_RESPONSE = 5,
MESSAGE_REQ_EXEC_AMF_SYNC_START = 6,
MESSAGE_REQ_EXEC_AMF_SYNC_DATA = 7,
MESSAGE_REQ_EXEC_AMF_SYNC_READY = 8,
MESSAGE_REQ_EXEC_AMF_CLUSTER_START_TMO = 9,
MESSAGE_REQ_EXEC_AMF_SYNC_REQUEST = 10,
MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE_TMO = 11
};
struct req_exec_amf_clc_cleanup_completed {
@ -516,6 +553,16 @@ struct req_exec_amf_healthcheck_tmo {
SaAmfHealthcheckKeyT safHealthcheckKey;
};
struct req_exec_amf_comp_instantiate {
mar_req_header_t header;
SaNameT compName;
};
struct req_exec_amf_comp_instantiate_tmo {
mar_req_header_t header;
SaNameT compName;
};
struct req_exec_amf_cluster_start_tmo {
mar_req_header_t header;
@ -529,6 +576,9 @@ extern void amf_runtime_attributes_print (struct amf_cluster *cluster);
extern int amf_enabled (struct objdb_iface_ver0 *objdb);
extern void *_amf_malloc (size_t size, char *file, unsigned int line);
#define amf_malloc(size) _amf_malloc ((size), __FILE__, __LINE__)
extern void *_amf_calloc (size_t nmemb, size_t size, char *file,
unsigned int line);
#define amf_calloc(nmemb,size) _amf_calloc ((nmemb), (size), __FILE__, __LINE__)
extern const char *amf_admin_state (int state);
extern const char *amf_op_state (int state);
@ -540,17 +590,20 @@ extern char *amf_serialize_SaNameT (
char *buf, int *size, int *offset, SaNameT *name);
extern char *amf_serialize_SaStringT (
char *buf, int *size, int *offset, SaStringT str);
extern char *amf_serialize_SaUint16T (
char *buf, int *size, int *offset, SaUint16T num);
extern char *amf_serialize_SaUint32T (
char *buf, int *size, int *offset, SaUint32T num);
extern char *amf_serialize_SaUint64T (char *buf, SaUint64T num);
extern char *amf_serialize_opaque (
char *buf, int *size, int *offset, char *cp, int cnt);
char *buf, int *size, int *offset, void *cp, int cnt);
extern char *amf_deserialize_SaNameT (char *buf, SaNameT *name);
extern char *amf_deserialize_SaStringT (char *buf, SaStringT *str);
extern char *amf_deserialize_SaUint16T (char *buf, SaUint16T *num);
extern char *amf_deserialize_SaUint32T (char *buf, SaUint32T *num);
extern char *amf_deserialize_SaUint64T (char *buf, SaUint64T *num);
extern char *amf_deserialize_opaque (char *buf, char *dst, int *cnt);
extern void amf_msg_mcast (int id, void *buf, size_t len);
extern char *amf_deserialize_opaque (char *buf, void *dst, int *cnt);
extern int amf_msg_mcast (int msg_id, void *buf, size_t len);
extern void amf_util_init (void);
/*===========================================================================*/
@ -561,10 +614,12 @@ extern struct amf_node *amf_node_new (struct amf_cluster *cluster, char *name);
extern void amf_node_init (void);
extern void *amf_node_serialize (struct amf_node *node, int *len);
extern struct amf_node *amf_node_deserialize (
struct amf_cluster *cluster, char *buf, int size);
struct amf_cluster *cluster, char *buf);
extern struct amf_node *amf_node_find (SaNameT *name);
extern struct amf_node *amf_node_find_by_nodeid (unsigned int nodeid);
extern struct amf_node *amf_node_find_by_hostname (const char *hostname);
extern void amf_node_sg_failed_over (struct amf_node *node,
struct amf_sg *sg_in);
/* Event methods */
extern void amf_node_sync_ready (struct amf_node *node);
@ -601,10 +656,25 @@ extern void timer_function_node_probation_period_expired (void *node);
extern void amf_cluster_init (void);
extern struct amf_cluster *amf_cluster_new (void);
extern void *amf_cluster_serialize (struct amf_cluster *cluster, int *len);
extern struct amf_cluster *amf_cluster_deserialize (char *buf, int size);
extern struct amf_cluster *amf_cluster_deserialize (char *buf);
extern int amf_cluster_applications_assigned (struct amf_cluster *cluster);
extern int amf_cluster_applications_started_with_no_starting_sgs (
struct amf_cluster *cluster);
/* Event methods */
extern void amf_cluster_sync_ready (struct amf_cluster *cluster);
extern void amf_cluster_start_tmo_event (int is_sync_master,
struct amf_cluster *cluster);
extern void amf_cluster_sync_ready (struct amf_cluster *cluster,
struct amf_node *node);
/**
*
* @param cluster
* @param app
*/
extern void amf_cluster_start_applications(struct amf_cluster *cluster);
extern void amf_cluster_assign_workload (struct amf_cluster *cluster);
/* Response event methods */
@ -628,7 +698,8 @@ extern int amf_application_calc_and_set_si_dependency_level (
extern void *amf_application_serialize (
struct amf_application *application, int *len);
extern struct amf_application *amf_application_deserialize (
struct amf_cluster *cluster, char *buf, int size);
struct amf_cluster *cluster, char *buf);
extern int amf_application_all_sg_started (struct amf_application *app);
/* Event methods */
extern void amf_application_start (
@ -652,7 +723,7 @@ extern struct amf_sg *amf_sg_new (struct amf_application *app, char *name);
extern void amf_sg_delete (struct amf_sg *sg);
extern void *amf_sg_serialize (struct amf_sg *sg, int *len);
extern struct amf_sg *amf_sg_deserialize (
struct amf_application *app, char *buf, int size);
struct amf_application *app, char *buf);
/**
* Request SG to start (instantiate all SUs)
@ -669,7 +740,7 @@ extern void amf_sg_start (struct amf_sg *sg, struct amf_node *node);
* @param sg
* @param dependency_level
*/
extern void amf_sg_assign_si (struct amf_sg *sg, int dependency_level);
extern int amf_sg_assign_si_req (struct amf_sg *sg, int dependency_level);
extern void amf_sg_failover_node_req (
struct amf_sg *sg, struct amf_node *node);
@ -679,7 +750,6 @@ extern void amf_sg_failover_comp_req (
struct amf_sg *sg, struct amf_node *node);
extern void amf_sg_switchover_node_req (
struct amf_sg *sg, struct amf_node *node);
/* Response event methods */
extern void amf_sg_su_state_changed (
struct amf_sg *sg, struct amf_su *su, SaAmfStateT type, int state);
@ -702,7 +772,7 @@ extern void amf_su_delete (struct amf_su *su);
extern char *amf_su_dn_make (struct amf_su *su, SaNameT *name);
extern void *amf_su_serialize (struct amf_su *su, int *len);
extern struct amf_su *amf_su_deserialize (
struct amf_sg *sg, char *buf, int size);
struct amf_sg *sg, char *buf);
extern int amf_su_is_local (struct amf_su *su);
extern struct amf_si_assignment *amf_su_get_next_si_assignment (
struct amf_su *su, const struct amf_si_assignment *si_assignment);
@ -752,7 +822,7 @@ extern struct amf_comp *amf_comp_find (
struct amf_cluster *cluster, SaNameT *name);
extern void *amf_comp_serialize (struct amf_comp *comp, int *len);
extern struct amf_comp *amf_comp_deserialize (
struct amf_su *su, char *buf, int size);
struct amf_su *su, char *buf);
extern void amf_comp_foreach_csi_assignment (
struct amf_comp *component,
void (*foreach_fn)(struct amf_comp *component,
@ -766,6 +836,8 @@ extern SaAmfReadinessStateT amf_comp_get_saAmfCompReadinessState (
extern void amf_comp_instantiate (struct amf_comp *comp);
extern void amf_comp_terminate (struct amf_comp *comp);
extern void amf_comp_node_left (struct amf_comp *comp);
extern void amf_comp_instantiate_event(struct amf_comp *comp);
extern void amf_comp_instantiate_tmo_event (struct amf_comp *comp);
/**
* Request the component to assume a HA state
@ -832,10 +904,11 @@ extern SaAisErrorT amf_comp_healthcheck_confirm (
SaAmfHealthcheckKeyT *healthcheckKey,
SaAisErrorT healthcheckResult);
extern amf_healthcheck_t *amf_healthcheck_new (struct amf_comp *comp);
extern void *amf_healthcheck_serialize (
struct amf_healthcheck *healthcheck, int *len);
extern struct amf_healthcheck *amf_healthcheck_deserialize (
struct amf_comp *comp, char *buf, int size);
struct amf_comp *comp, char *buf);
/*===========================================================================*/
/* amfsi.c */
@ -848,11 +921,11 @@ extern void amf_si_delete (struct amf_si *si);
extern int amf_si_calc_and_set_csi_dependency_level (struct amf_si *si);
extern void *amf_si_serialize (struct amf_si *si, int *len);
extern struct amf_si *amf_si_deserialize (
struct amf_application *app, char *buf, int size);
struct amf_application *app, char *buf);
extern void *amf_si_assignment_serialize (
struct amf_si_assignment *si_assignment, int *len);
extern struct amf_si_assignment *amf_si_assignment_deserialize (
struct amf_si *si, char *buf, int size);
struct amf_si *si, char *buf);
#if 0
char *amf_si_assignment_dn_make (struct amf_su *su, struct amf_si *si,
SaNameT *name);
@ -972,11 +1045,11 @@ extern struct amf_csi *amf_csi_find (struct amf_si *si, char *name);
extern void amf_csi_delete (struct amf_csi *csi);
extern void *amf_csi_serialize (struct amf_csi *csi, int *len);
extern struct amf_csi *amf_csi_deserialize (
struct amf_si *si, char *buf, int size);
struct amf_si *si, char *buf);
extern void *amf_csi_assignment_serialize (
struct amf_csi_assignment *csi_assignment, int *len);
extern struct amf_csi_assignment *amf_csi_assignment_deserialize (
struct amf_csi *csi, char *buf, int size);
struct amf_csi *csi, char *buf);
extern char *amf_csi_dn_make (struct amf_csi *csi, SaNameT *name);
extern char *amf_csi_assignment_dn_make (
struct amf_csi_assignment *csi_assignment, SaNameT *name);
@ -986,9 +1059,9 @@ extern struct amf_csi_attribute *amf_csi_attribute_new (struct amf_csi *csi);
extern void *amf_csi_attribute_serialize (
struct amf_csi_attribute *csi_attribute, int *len);
extern struct amf_csi_attribute *amf_csi_attribute_deserialize (
struct amf_csi *csi, char *buf, int size);
struct amf_csi *csi, char *buf);
/* extern int sa_amf_grep(const char *string, char *pattern, size_t nmatch, */
/* char** sub_match_array); */
/* char** sub_match_array); */
extern int sa_amf_grep(const char *string, char *pattern, size_t nmatch,
SaNameT *sub_match_array);

View File

@ -65,51 +65,182 @@
* (In the future this state will also be assumed after the LOCK_INSTANTIATION
* administrative command.)
*
* State STARTED is assumed when the application has been initially started and
* will in the future be re-assumed after the administrative command RESTART
* have been executed.
* State WORKLOAD_ASSIGNED is assumed when the application has been initially
* started and will in the future be re-assumed after the administrative
* command RESTART have been executed.
*
*/
#include <assert.h>
* 1. AMF Synchronization Control State Machine
* =========================================
*
* 1.1 State Transition Table
*
* State: Event: Action: New state:
* ===========================================================================
* UNINSTANTIATED start A6,A1 STARTING_SGS
* STARTING_SGS start [C4] A7
* STARTING_SGS sg_started [C1] A8,A9 STARTED
* STARTED start A6,A1 STARTING_SGS
* STARTED assign_workload A3 ASSIGNING_WORKLOAD
* ASSIGNING_WORKLOAD assign_workload A7
* ASSIGNING_WORKLOAD start A7
* ASSIGNING_WORKLOAD sg_assigned [C2] A10,A9 WORKLOAD_ASSIGNED
* WORKLOAD_ASSIGNED start A6,A1 STARTING_SGS
* WORKLOAD_ASSIGNED assign_workload A3 ASSIGNING_WORKLOAD
*
* 1.2 State Description
* =====================
* UNINSTANTIATED - No SUs within the SGs contained in the application have been
* instantiated.
* STARTING_SGS - Waiting for the contained SGs to start.
* STARTED - No SUs within the SGs contained in the application are in the
* process of beein instantiated. Either the SUs are instantiated or
* instantiation was not possible or instantiation has failed.
* ASSIGNING_WORKLOAD - Waiting for the contained SGs to indicate they have
* assigned workload to its SUs.
* WORKLOAD_ASSIGNED - at least some workload has been assigned to the SUs that
* are in-service.
*
* 1.3 Actions
* ===========
* A1 - [foreach sg in application] sg_start
* A2 -
* A3 - [foreach sg in application] sg_assign
* A4 -
* A5 -
* A6 - save value of received node parameter
* A7 - defer the event
* A8 - [node == NULL] cluster_application_started else node_application_started
* A9 - recall defered events
* A10 - [node == NULL] cluster_application_assigned else
* node_application_assigned
*
* 1.4 Guards
* ==========
* C1 - No sg has availability control state == INSTANTIATING_SERVICE_UNITS
* C2 - All sgs have availability control state == IDLE
* C3 -
* C4 - saved node value != received node value
*/
#include <assert.h>
#include "amf.h"
#include "print.h"
#include "util.h"
static int all_sg_started (struct amf_application *app)
/******************************************************************************
* Internal (static) utility functions
*****************************************************************************/
int no_su_is_instantiating (struct amf_application *app)
{
struct amf_sg *sg;
struct amf_su *su;
int all_su_instantiated = 1;
/* TODO: spare SUs... */
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
for (su = sg->su_head; su != NULL; su = su->next) {
if (su->saAmfSUPresenceState == SA_AMF_PRESENCE_INSTANTIATING) {
all_su_instantiated = 0;
break;
}
}
}
return all_su_instantiated;
}
#ifdef COMPILE_OUT
static int all_sg_started (struct amf_application *app)
{
struct amf_sg *sg;
int all_su_instantiated = 1;
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
if (sg->avail_state == SG_AC_InstantiatingServiceUnits) {
all_su_instantiated = 0;
break;
}
}
return all_su_instantiated;
}
#endif
static int all_sg_assigned (struct amf_application *app)
{
struct amf_sg *sg;
int all_sg_assigned = 1;
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
if (sg->avail_state != SG_AC_Idle) {
all_su_instantiated = 0;
all_sg_assigned = 0;
break;
}
}
return all_su_instantiated;
return all_sg_assigned;
}
/******************************************************************************
* Event methods
*****************************************************************************/
void amf_application_start (
struct amf_application *app, struct amf_node *node)
{
struct amf_sg *sg;
ENTER ("'%s'", app->name.value);
assert (app != NULL);
switch (app->acsm_state) {
case APP_AC_UNINSTANTIATED:
app->node_to_start = node;
app->acsm_state = APP_AC_STARTING_SGS;
app->node_to_start = node;
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
amf_sg_start (sg, node);
}
break;
case APP_AC_STARTING_SGS:
if (app->node_to_start == node) {
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
amf_sg_start (sg, node);
}
} else {
/* TODO: Save the start request until state == APP_AC_STARTED */
log_printf (LOG_LEVEL_ERROR, "Request to start application"
" =%s in state = %d",app->name.value, app->acsm_state);
openais_exit_error (AIS_DONE_FATAL_ERR);
}
break;
case APP_AC_STARTED:
/* TODO: Recall defered events */
app->node_to_start = node;
app->acsm_state = APP_AC_STARTING_SGS;
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
amf_sg_start (sg, node);
}
break;
case APP_AC_ASSIGNING_WORKLOAD:
/* TODO: Save the start request until state == APP_AC_STARTED */
log_printf (LOG_LEVEL_ERROR, "Request to start application"
" =%s in state = %d (should be defered)",
app->name.value, app->acsm_state);
openais_exit_error (AIS_DONE_FATAL_ERR);
break;
case APP_AC_WORKLOAD_ASSIGNED:
app->node_to_start = node;
app->acsm_state = APP_AC_STARTING_SGS;
/* TODO: Calculate and set SI dependency levels */
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
amf_sg_start (sg, node);
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
amf_sg_start (sg, node);
}
break;
default:
assert (0);
break;
}
}
void amf_application_assign_workload (
struct amf_application *app, struct amf_node *node)
struct amf_application *app, struct amf_node *node)
{
struct amf_sg *sg;
@ -118,30 +249,75 @@ void amf_application_assign_workload (
* Each dependency level should be looped and amf_sg_assign_si
* called several times.
*/
assert (app != NULL);
app->node_to_start = node;
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
amf_sg_assign_si (sg, 0);
switch (app->acsm_state) {
case APP_AC_WORKLOAD_ASSIGNED:
TRACE1 ("APP_AC_WORKLOAD_ASSIGNED");
/* Fall through */
case APP_AC_STARTED: {
int posible_to_assign_si = 0;
app->acsm_state = APP_AC_ASSIGNING_WORKLOAD;
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
if (amf_sg_assign_si_req (sg, 0)) {
posible_to_assign_si = 1;
}
}
if (posible_to_assign_si == 0) {
app->acsm_state = APP_AC_WORKLOAD_ASSIGNED;
}
break;
}
case APP_AC_ASSIGNING_WORKLOAD:
if (app->node_to_start == node) {
/*Calling object has violated the contract !*/
assert (0);
} else {
/*
* TODO: Save the request to assign workload until state ==
* WORKLOAD_ASSIGNED
*/
log_printf (LOG_LEVEL_ERROR, "Request to assign workload to"
" application =%s in state = %d (should be defered)",
app->name.value, app->acsm_state);
openais_exit_error (AIS_DONE_FATAL_ERR);
}
break;
default:
/*Calling object has violated the contract !*/
assert (0);
break;
}
}
void amf_application_init (void)
{
log_init ("AMF");
}
/******************************************************************************
* Event response methods
*****************************************************************************/
void amf_application_sg_started (
struct amf_application *app, struct amf_sg *sg, struct amf_node *node)
{
ENTER ("'%s'", app->name.value);
if (all_sg_started (app)) {
if (app->node_to_start == NULL) {
amf_cluster_application_started (app->cluster, app);
} else {
amf_node_application_started (app->node_to_start, app);
}
assert (app != NULL);
switch (app->acsm_state) {
case APP_AC_STARTING_SGS:
if (no_su_is_instantiating (app)) {
app->acsm_state = APP_AC_STARTED;
if (app->node_to_start == NULL) {
amf_cluster_application_started (app->cluster, app);
} else {
amf_node_application_started (app->node_to_start, app);
}
}
break;
default:
log_printf (LOG_LEVEL_ERROR, "amf_application_sg_started()"
" called in state = %d", app->acsm_state);
openais_exit_error (AIS_DONE_FATAL_ERR);
break;
}
}
@ -149,22 +325,42 @@ void amf_application_sg_assigned (
struct amf_application *app, struct amf_sg *sg)
{
ENTER ("'%s'", app->name.value);
if (app->node_to_start == NULL) {
amf_cluster_application_workload_assigned (app->cluster, app);
} else {
amf_node_application_workload_assigned (app->node_to_start, app);
assert (app != NULL);
switch (app->acsm_state) {
case APP_AC_ASSIGNING_WORKLOAD:
if (all_sg_assigned (app)){
app->acsm_state = APP_AC_WORKLOAD_ASSIGNED;
if (app->node_to_start == NULL){
amf_cluster_application_workload_assigned (app->cluster, app);
} else {
amf_node_application_workload_assigned (app->node_to_start, app);
}
}
break;
default:
log_printf (LOG_LEVEL_ERROR, "amf_application_sg_assigned()"
" called in state = %d", app->acsm_state);
openais_exit_error (AIS_DONE_FATAL_ERR);
break;
}
}
struct amf_application *amf_application_new (struct amf_cluster *cluster)
/******************************************************************************
* General methods
*****************************************************************************/
void amf_application_init (void)
{
struct amf_application *app = amf_malloc (sizeof (struct amf_application));
log_init ("AMF");
}
struct amf_application *amf_application_new (struct amf_cluster *cluster) {
struct amf_application *app = amf_calloc (1, sizeof (struct amf_application));
app->cluster = cluster;
app->next = cluster->application_head;
cluster->application_head = app;
app->sg_head = NULL;
app->si_head = NULL;
app->acsm_state = APP_AC_UNINSTANTIATED;
return app;
}
@ -173,6 +369,7 @@ void amf_application_delete (struct amf_application *app)
struct amf_sg *sg;
struct amf_si *si;
assert (app != NULL);
for (sg = app->sg_head; sg != NULL;) {
struct amf_sg *tmp = sg;
sg = sg->next;
@ -184,7 +381,6 @@ void amf_application_delete (struct amf_application *app)
si = si->next;
amf_si_delete (tmp);
}
free (app);
}
@ -194,6 +390,7 @@ void *amf_application_serialize (
char *buf = NULL;
int offset = 0, size = 0;
assert (app != NULL);
TRACE8 ("%s", app->name.value);
buf = amf_serialize_SaNameT (buf, &size, &offset, &app->name);
@ -203,23 +400,23 @@ void *amf_application_serialize (
buf, &size, &offset, app->saAmfApplicationCurrNumSG);
buf = amf_serialize_SaStringT (
buf, &size, &offset, app->clccli_path);
buf = amf_serialize_SaUint32T (
buf, &size, &offset, app->acsm_state);
*len = offset;
return buf;
}
struct amf_application *amf_application_deserialize (
struct amf_cluster *cluster, char *buf, int size)
{
struct amf_cluster *cluster, char *buf) {
char *tmp = buf;
struct amf_application *app;
struct amf_application *app = amf_application_new (cluster);
app = amf_application_new (cluster);
tmp = amf_deserialize_SaNameT (tmp, &app->name);
tmp = amf_deserialize_SaUint32T (tmp, &app->saAmfApplicationAdminState);
tmp = amf_deserialize_SaUint32T (tmp, &app->saAmfApplicationCurrNumSG);
tmp = amf_deserialize_SaStringT (tmp, &app->clccli_path);
tmp = amf_deserialize_SaUint32T (tmp, &app->acsm_state);
return app;
}
@ -229,6 +426,7 @@ struct amf_application *amf_application_find (
{
struct amf_application *app;
assert (cluster != NULL);
for (app = cluster->application_head; app != NULL; app = app->next) {
if (app->name.length == strlen(name) &&

View File

@ -64,9 +64,71 @@
*
* State STARTED is assumed when the cluster has been initially started and
* will in the future be re-assumed after the administrative command RESTART
* have been executed.
* have been executed.
*
* 1. Cluster Availability Control State Machine
* =============================================
*
* 1.1 State Transition Table
*
* State: Event: Action: New state:
* ===========================================================================
* UNINSTANTIATED sync_ready [C1] A2,A1 STARTING_APPS
* STARTING_APPS sync_ready A2,A1 STARTING_APPS
* STARTING_APPS app_started [C3] A7,A3 ASSIGNING_WORKLOAD
* STARTING_APPS local_timer_expired A8 STARTING_APPS
* STARTING_APPS time_out [C2] A7,A3 ASSIGNING_WORKLOAD
* STARTING_APPS time_out A7 WAITING_OVERTIME
* WAITING_OVERTIME sync_ready A4 WAITING_OVERTIME
* WAITING_OVERTIME app_started A3 ASSIGNING_WORKLOAD
* ASSIGNING_WORKLOAD sync_ready A4 ASSIGNING_WORKLOAD
* ASSIGNING_WORKLOAD app_assigned [C4] A6 STARTED
* STARTED sync_ready A8 STARTED
*
* 1.2 State Description
* =====================
* UNINSTANTIATED - No SUs within any SG in any Application is instantiated.
* STARTING_APPLICATIONS - All applications have been requested to start
* their contained SGs, which in its turn has requested
* their contained SUs to instantiate all their
* components. The cluster startup timer is running.
* WAITING_OVERTIME - The cluster startup timer has expired but all
* applications have yet not responded that they have been
* started. Cluster will wait infinitely for the
* applications to respond. It is correct to do so even when
* the startup timer has expired, because the applications
* will report they are started as soon as there is no
* attempt to instantiate any of its components pending,
* because attempts to instantiate a component can not go on
* forever, see saAmfCompInstantiateTimeout,
* saAmfCompNumMaxInstantiateWithoutDelay and
* saAmfCompNumMaxInstantiateWithDelay.
* ASSIGNING_WORKLOAD - All applications have been requested to assign it's
* specified workload to it's service units according to
* the redundancy model specified by it's SGs.
* STARTED - A best effort has been made to instatiate the components of all
* applications and assign the specified workload as close as possible
* to what is described in the configuration.
*
* 1.3 Actions
* ===========
* A1 - [foreach application in cluster]/start application
* A2 - start cluster startup timer
* A3 - [foreach application in cluster]/assign workload to application
* A4 - defer sync_ready event
* A5 - forward sync_ready to appropriate node object
* A6 - recall deferred event
* A7 - stop node local instance of cluster startup timer
* A8 - multicast 'cluster startup timer time-out' event
*
* 1.4 Guards
* ==========
* C1 - No sg has availability control state == INSTANTIATING_SERVICE_UNITS
* C2 - No application has Availability Control state == STARTING_SGS
* C3 - All SGs are fully instantiated
*/
#include <stdlib.h>
#include <errno.h>
@ -76,13 +138,15 @@
#include "main.h"
#include "service.h"
/**
* Determine if all applications are started
* Determine if all applications are started so that all
* SUs is in SA_AMF_PRESENCE_INSTANTIATED prsense state
* @param cluster
*
* @return int
*/
static int all_applications_started (struct amf_cluster *cluster)
static int cluster_applications_started_instantiated (struct amf_cluster *cluster)
{
int all_started = 1;
struct amf_application *app;
@ -100,64 +164,204 @@ static int all_applications_started (struct amf_cluster *cluster)
}
}
done:
done:
return all_started;
}
static void timer_function_cluster_assign_workload_tmo (void *_cluster)
static int cluster_applications_are_starting_sgs(struct amf_cluster *cluster)
{
struct req_exec_amf_cluster_start_tmo req;
struct iovec iovec;
ENTER ("");
req.header.size = sizeof (struct req_exec_amf_cluster_start_tmo);
req.header.id = SERVICE_ID_MAKE (AMF_SERVICE,
MESSAGE_REQ_EXEC_AMF_CLUSTER_START_TMO);
iovec.iov_base = (char *)&req;
iovec.iov_len = sizeof (req);
assert (totempg_groups_mcast_joined (openais_group_handle,
&iovec, 1, TOTEMPG_AGREED) == 0);
struct amf_application *application = 0;
int is_starting_sgs = 0;
for (application = cluster->application_head; application != NULL;
application = application->next) {
if (application->acsm_state == APP_AC_STARTING_SGS) {
is_starting_sgs = 1;
break;
}
}
return is_starting_sgs;
}
void amf_cluster_sync_ready (struct amf_cluster *cluster)
static void acsm_cluster_enter_assigning_workload (struct amf_cluster *cluster)
{
log_printf(LOG_NOTICE,
"Cluster: all applications started, assigning workload.");
cluster->acsm_state = CLUSTER_AC_ASSIGNING_WORKLOAD;
amf_cluster_assign_workload (cluster);
}
static void timer_function_cluster_assign_workload_tmo (void *cluster)
{
struct req_exec_amf_cluster_start_tmo req;
((struct amf_cluster*)cluster)->timeout_handle = 0;;
ENTER ("");
amf_msg_mcast (MESSAGE_REQ_EXEC_AMF_CLUSTER_START_TMO, &req, sizeof(req));
}
static inline void stop_cluster_startup_timer (struct amf_cluster *cluster)
{
if (cluster->timeout_handle) {
dprintf ("Stop cluster startup timer");
poll_timer_delete (aisexec_poll_handle,
cluster->timeout_handle);
cluster->timeout_handle = 0;
}
}
static void start_cluster_startup_timer (struct amf_cluster *cluster)
{
poll_timer_add (aisexec_poll_handle,
cluster->saAmfClusterStartupTimeout,
cluster,
timer_function_cluster_assign_workload_tmo,
&cluster->timeout_handle);
}
static inline void amf_cluster_enter_starting_applications (
struct amf_cluster *cluster)
{
ENTER ("");
start_cluster_startup_timer (cluster);
amf_cluster->acsm_state = CLUSTER_AC_STARTING_APPLICATIONS;
amf_cluster_start_applications (cluster);
}
static void add_assign_workload_deferred_list (struct amf_cluster *cluster,
struct amf_node *node, amf_cluster_event_t event)
{
cluster_deferredt_t *tmp_deferred_list =
calloc (1, sizeof (cluster_deferredt_t));
tmp_deferred_list->defered_list.next =
(amf_deferred_t*) cluster->deferred_events_head;
cluster->deferred_events_head = tmp_deferred_list;
}
static void defer_assigning_worload_to_node (struct amf_node *node,
amf_cluster_event_t event)
{
add_assign_workload_deferred_list(amf_cluster, node, event);
}
static amf_deferred_t *recall_defered_cluster_events (
struct amf_cluster *cluster)
{
return (amf_deferred_t*) cluster->deferred_events_head;
}
static void acsm_cluster_enter_started (struct amf_cluster *cluster)
{
amf_deferred_t *deferred_events;
amf_cluster->acsm_state = CLUSTER_AC_STARTED;
for (deferred_events = recall_defered_cluster_events (cluster);
deferred_events != NULL;
deferred_events = deferred_events->next){
amf_node_sync_ready (((cluster_deferredt_t*)deferred_events)->node);
}
}
int amf_cluster_applications_started_with_no_starting_sgs (struct amf_cluster *cluster)
{
return !cluster_applications_are_starting_sgs (cluster);
}
void amf_cluster_start_tmo_event (int is_sync_masterm,
struct amf_cluster *cluster)
{
ENTER ("acsm_state = %d", amf_cluster->acsm_state);
stop_cluster_startup_timer (cluster);
switch (cluster->acsm_state) {
case CLUSTER_AC_STARTING_APPLICATIONS:
if (cluster_applications_are_starting_sgs (cluster)) {
dprintf ("Cluster startup timeout, start waiting over time");
amf_cluster->acsm_state = CLUSTER_AC_WAITING_OVER_TIME;
} else {
dprintf ("Cluster startup timeout, assigning workload");
acsm_cluster_enter_assigning_workload (cluster);
}
break;
case CLUSTER_AC_ASSIGNING_WORKLOAD:
/* ignore cluster startup timer expiration */
case CLUSTER_AC_STARTED:
/* ignore cluster startup timer expiration */
case CLUSTER_AC_WAITING_OVER_TIME:
/* ignore cluster startup timer expiration */
break;
default:
log_printf(LOG_LEVEL_ERROR, "Cluster timout expired in wrong cluster"
" state = %d", cluster->acsm_state);
assert(0);
break;
}
}
/**
* Start all applications in the cluster and start
* the cluster startup timeout.
* @param cluster
* @param app
*/
void amf_cluster_start_applications(struct amf_cluster *cluster)
{
struct amf_application *app;
for (app = cluster->application_head; app != NULL; app = app->next) {
amf_application_start (app, NULL);
}
}
void amf_cluster_sync_ready (struct amf_cluster *cluster, struct amf_node *node)
{
log_printf(LOG_NOTICE, "Cluster: starting applications.");
switch (amf_cluster->state) {
case CLUSTER_UNINSTANTIATED: {
amf_cluster->state = CLUSTER_STARTING_COMPONENTS;
for (app = cluster->application_head; app != NULL; app = app->next) {
amf_application_start (app, NULL);
switch (amf_cluster->acsm_state) {
case CLUSTER_AC_UNINSTANTIATED:
if (amf_cluster->saAmfClusterAdminState == SA_AMF_ADMIN_UNLOCKED) {
amf_cluster_enter_starting_applications (cluster);
}
poll_timer_add (aisexec_poll_handle,
cluster->saAmfClusterStartupTimeout,
cluster,
timer_function_cluster_assign_workload_tmo,
&cluster->timeout_handle);
break;
}
case CLUSTER_STARTING_COMPONENTS: {
case CLUSTER_AC_STARTING_APPLICATIONS:
amf_cluster_enter_starting_applications(cluster);
break;
}
case CLUSTER_STARTING_WORKLOAD: {
case CLUSTER_AC_ASSIGNING_WORKLOAD:
defer_assigning_worload_to_node (node, CLUSTER_SYNC_READY_EV);
log_printf (LOG_LEVEL_ERROR, "Sync ready not implemented in "
"cluster state: %u\n", amf_cluster->state);
"cluster state: %u\n", amf_cluster->acsm_state);
assert (0);
break;
}
case CLUSTER_STARTED: {
assert (0);
case CLUSTER_AC_WAITING_OVER_TIME:
/* TODO: Defer the implementation of assigning
* workload to those syncronized nodes to CLUSTER_AC_STARTED
* state.
*/
defer_assigning_worload_to_node (node, CLUSTER_SYNC_READY_EV);
break;
}
case CLUSTER_AC_STARTED:
TRACE1 ("Node sync ready sent from cluster in "
"CLUSTER_AC_STARTED state");
amf_node_sync_ready (node);
break;
default:
assert (0);
break;
}
}
@ -167,95 +371,120 @@ void amf_cluster_init (void)
log_init ("AMF");
}
void amf_cluster_application_started (
struct amf_cluster *cluster, struct amf_application *application)
{
ENTER ("application '%s' started", application->name.value);
if (all_applications_started (cluster)) {
log_printf(LOG_NOTICE,
"Cluster: all applications started, assigning workload.");
if (cluster->timeout_handle) {
poll_timer_delete (aisexec_poll_handle, cluster->timeout_handle);
cluster->timeout_handle = 0;
switch (cluster->acsm_state) {
case CLUSTER_AC_STARTING_APPLICATIONS:
if (cluster_applications_started_instantiated (cluster)) {
stop_cluster_startup_timer (cluster);
acsm_cluster_enter_assigning_workload (cluster);
}
break;
case CLUSTER_AC_WAITING_OVER_TIME:
if (amf_cluster_applications_started_with_no_starting_sgs (cluster)) {
acsm_cluster_enter_assigning_workload (cluster);
}
break;
default: {
log_printf (LOG_ERR,"Error invalid cluster availability state %d",
cluster->acsm_state);
openais_exit_error(cluster->acsm_state);
break;
}
cluster->state = CLUSTER_STARTING_WORKLOAD;
amf_cluster_assign_workload (cluster);
}
}
struct amf_cluster *amf_cluster_new (void)
{
struct amf_cluster *cluster = calloc (1, sizeof (struct amf_cluster));
if (cluster == NULL) {
openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
}
struct amf_cluster *amf_cluster_new (void) {
struct amf_cluster *cluster = amf_calloc (1, sizeof (struct amf_cluster));
cluster->saAmfClusterStartupTimeout = -1;
cluster->saAmfClusterAdminState = SA_AMF_ADMIN_UNLOCKED;
cluster->deferred_events_head = 0;
cluster->acsm_state = CLUSTER_AC_UNINSTANTIATED;
return cluster;
}
int amf_cluster_applications_assigned (struct amf_cluster *cluster)
{
struct amf_application *app = 0;
int is_all_application_assigned = 1;
for (app = cluster->application_head; app != NULL; app = app->next) {
if (app->acsm_state != APP_AC_WORKLOAD_ASSIGNED) {
is_all_application_assigned = 0;
break;
}
}
return is_all_application_assigned;
}
void amf_cluster_application_workload_assigned (
struct amf_cluster *cluster, struct amf_application *app)
{
log_printf (LOG_NOTICE, "Cluster: application %s assigned.",
app->name.value);
amf_cluster->state = CLUSTER_STARTED;
ENTER ("");
switch (cluster->acsm_state) {
case CLUSTER_AC_ASSIGNING_WORKLOAD:
log_printf (LOG_NOTICE, "Cluster: application %s assigned.",
app->name.value);
if (amf_cluster_applications_assigned (cluster)) {
acsm_cluster_enter_started (cluster);
}
break;
default:
assert(0);
break;
}
}
void *amf_cluster_serialize (struct amf_cluster *cluster, int *len)
{
int objsz = sizeof (struct amf_cluster);
struct amf_cluster *copy;
char *buf = NULL;
int offset = 0, size = 0;
copy = amf_malloc (objsz);
memcpy (copy, cluster, objsz);
*len = objsz;
TRACE8 ("%s", copy->name.value);
TRACE8 ("%s", cluster->name.value);
return copy;
buf = amf_serialize_SaNameT (buf, &size, &offset, &cluster->name);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
cluster->saAmfClusterStartupTimeout);
buf = amf_serialize_SaNameT (buf, &size, &offset,
&cluster->saAmfClusterClmCluster);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
cluster->saAmfClusterAdminState);
buf = amf_serialize_SaUint32T (buf, &size, &offset, cluster->acsm_state);
*len = offset;
return buf;
}
struct amf_cluster *amf_cluster_deserialize (char *buf, int size)
{
int objsz = sizeof (struct amf_cluster);
struct amf_cluster *amf_cluster_deserialize (char *buf) {
char *tmp = buf;
struct amf_cluster *cluster = amf_cluster_new ();
if (objsz > size) {
return NULL;
} else {
struct amf_cluster *obj = amf_cluster_new ();
if (obj == NULL) {
return NULL;
}
memcpy (obj, buf, objsz);
TRACE8 ("%s", obj->name.value);
obj->node_head = NULL;
obj->application_head = NULL;
obj->timeout_handle = 0;
tmp = amf_deserialize_SaNameT (tmp, &cluster->name);
tmp = amf_deserialize_SaUint32T (tmp, &cluster->saAmfClusterStartupTimeout);
tmp = amf_deserialize_SaNameT (tmp, &cluster->saAmfClusterClmCluster);
tmp = amf_deserialize_SaUint32T (tmp, &cluster->saAmfClusterAdminState);
tmp = amf_deserialize_SaUint32T (tmp, &cluster->acsm_state);
return obj;
}
return cluster;
}
void amf_cluster_assign_workload (struct amf_cluster *cluster)
{
struct amf_application *app;
ENTER ("");
cluster->state = CLUSTER_STARTING_WORKLOAD;
if (cluster->timeout_handle) {
poll_timer_delete (aisexec_poll_handle, cluster->timeout_handle);
cluster->timeout_handle = 0;
}
for (app = cluster->application_head; app != NULL; app = app->next) {
amf_application_assign_workload (app, NULL);
}
}

View File

@ -482,6 +482,24 @@ static void *clc_command_run (void *context)
return (0);
}
static void amf_comp_instantiate_tmo (void *component)
{
SaNameT compName;
amf_comp_dn_make (component, &compName);
amf_msg_mcast (MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE_TMO,
&compName, sizeof (SaNameT));
}
static void start_component_instantiate_timer (struct amf_comp *component)
{
poll_timer_add (aisexec_poll_handle,
component->saAmfCompInstantiateTimeout,
component,
amf_comp_instantiate_tmo,
&component->instantiate_timeout_handle);
}
/*
* Instantiate possible operations
*/
@ -506,6 +524,7 @@ static int clc_cli_instantiate (struct amf_comp *comp)
if (res != 0) {
log_printf (LOG_LEVEL_ERROR, "pthread_create failed: %d", res);
}
start_component_instantiate_timer (comp);
// clc_command_run_data->completion_callback (clc_command_run_data);
// TODO error code from pthread_create
@ -683,11 +702,8 @@ struct amf_healthcheck *amf_comp_find_healthcheck (
struct amf_comp *amf_comp_new(struct amf_su *su, char *name)
{
struct amf_comp *tail = su->comp_head;
struct amf_comp *comp = calloc (1, sizeof (struct amf_comp));
struct amf_comp *comp = amf_calloc (1, sizeof (struct amf_comp));
if (comp == NULL) {
openais_exit_error(AIS_DONE_OUT_OF_MEMORY);
}
while (tail != NULL) {
if (tail->next == NULL) {
break;
@ -1184,17 +1200,37 @@ static void lib_csi_set_request (
free(p);
}
static void stop_component_instantiate_timer (struct amf_comp *component)
{
if (component->instantiate_timeout_handle) {
dprintf ("Stop cluster startup timer");
poll_timer_delete (aisexec_poll_handle,
component->instantiate_timeout_handle);
component->instantiate_timeout_handle = 0;
}
}
SaAisErrorT amf_comp_register (struct amf_comp *comp)
{
TRACE2("Exec comp register '%s'", comp->name.value);
if (comp->saAmfCompPresenceState == SA_AMF_PRESENCE_RESTARTING) {
comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATED);
} else if (comp->saAmfCompPresenceState == SA_AMF_PRESENCE_INSTANTIATING) {
amf_comp_operational_state_set (comp, SA_AMF_OPERATIONAL_ENABLED);
comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATED);
} else {
assert (0);
stop_component_instantiate_timer (comp);
switch (comp->saAmfCompPresenceState) {
case SA_AMF_PRESENCE_RESTARTING:
comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATED);
break;
case SA_AMF_PRESENCE_INSTANTIATING:
amf_comp_operational_state_set (comp, SA_AMF_OPERATIONAL_ENABLED);
comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATED);
break;
case SA_AMF_PRESENCE_INSTANTIATION_FAILED:
/* ignore due to instantitate timeout a while ago */
break;
default:
assert (0);
break;
}
return SA_AIS_OK;
@ -1367,23 +1403,98 @@ SaAisErrorT amf_comp_healthcheck_stop (
return error;
}
/**
* Instantiate a component
* @param comp
*/
void amf_comp_instantiate (struct amf_comp *comp)
{
int res = 0;
ENTER ("'%s' SU '%s'", getSaNameT (&comp->name),
getSaNameT (&comp->su->name));
if (comp->saAmfCompPresenceState != SA_AMF_PRESENCE_RESTARTING) {
comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATING);
switch (comp->saAmfCompPresenceState) {
case SA_AMF_PRESENCE_RESTARTING:
/* fall through */
case SA_AMF_PRESENCE_UNINSTANTIATED:
if (amf_su_is_local (comp->su)) {
TRACE1("Send instantiate event for comp '%s' from host %s",
comp->name.value, comp->su->saAmfSUHostedByNode.value);
SaNameT compName;
amf_comp_dn_make (comp, &compName);
amf_msg_mcast (MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE,
&compName, sizeof (SaNameT));
}
break;
default:
dprintf("Instantiate ignored in Component presence state %d",
comp->saAmfCompPresenceState);
break;
}
}
if (amf_su_is_local (comp->su)) {
res = clc_interfaces[comp->comptype]->instantiate (comp);
void amf_comp_instantiate_tmo_event (struct amf_comp *comp)
{
ENTER ("Comp instantiate timeout after %d seconds '%s' '%s'",
comp->saAmfCompInstantiateTimeout, comp->su->name.value,
comp->name.value);
switch (comp->saAmfCompPresenceState) {
case SA_AMF_PRESENCE_RESTARTING:
amf_comp_operational_state_set (comp, SA_AMF_OPERATIONAL_DISABLED);
comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATION_FAILED);
break;
case SA_AMF_PRESENCE_INSTANTIATING:
amf_comp_operational_state_set (comp, SA_AMF_OPERATIONAL_DISABLED);
comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATION_FAILED);
break;
default:
assert (0);
break;
}
}
void amf_comp_instantiate_event (struct amf_comp *component)
{
int res;
ENTER ("");
switch (component->saAmfCompPresenceState) {
case SA_AMF_PRESENCE_INSTANTIATING:
case SA_AMF_PRESENCE_INSTANTIATED:
case SA_AMF_PRESENCE_TERMINATING:
case SA_AMF_PRESENCE_INSTANTIATION_FAILED:
case SA_AMF_PRESENCE_TERMINATION_FAILED:
dprintf("Instantiate ignored in Component presence state %d",
component->saAmfCompPresenceState);
break;
case SA_AMF_PRESENCE_UNINSTANTIATED:
comp_presence_state_set (component, SA_AMF_PRESENCE_INSTANTIATING);
amf_su_comp_state_changed(component->su,
component,SA_AMF_PRESENCE_STATE,SA_AMF_PRESENCE_INSTANTIATING);
if (amf_su_is_local (component->su)) {
res = clc_interfaces[component->comptype]->instantiate (
component);
}
break;
case SA_AMF_PRESENCE_RESTARTING:
if (amf_su_is_local (component->su)) {
res = clc_interfaces[component->comptype]->instantiate (
component);
}
break;
default:
dprintf("Component presence state %d",
component->saAmfCompPresenceState);
assert (0);
break;
}
}
@ -1953,14 +2064,12 @@ void *amf_comp_serialize (struct amf_comp *component, int *len)
*
* @return struct amf_comp*
*/
struct amf_comp *amf_comp_deserialize (struct amf_su *su, char *buf, int size)
struct amf_comp *amf_comp_deserialize (struct amf_su *su, char *buf)
{
char *tmp = buf;
struct amf_comp *component;
int i;
SaUint32T cnt;
component = amf_comp_new (su, "");
struct amf_comp *component = amf_comp_new (su, "");
tmp = amf_deserialize_SaNameT (tmp, &component->name);
tmp = amf_deserialize_SaUint32T (tmp, &cnt);
@ -2063,33 +2172,51 @@ struct amf_comp *amf_comp_deserialize (struct amf_su *su, char *buf, int size)
void *amf_healthcheck_serialize (struct amf_healthcheck *healthcheck, int *len)
{
int objsz = sizeof (struct amf_healthcheck);
struct amf_healthcheck *copy;
char *buf = NULL;
int offset = 0, size = 0;
copy = amf_malloc (objsz);
memcpy (copy, healthcheck, objsz);
*len = objsz;
TRACE8 ("%s", healthcheck->safHealthcheckKey.key);
return copy;
buf = amf_serialize_opaque (buf, &size, &offset,
&healthcheck->safHealthcheckKey.key, SA_AMF_HEALTHCHECK_KEY_MAX);
buf = amf_serialize_SaUint16T (buf, &size, &offset,
healthcheck->safHealthcheckKey.keyLen);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
healthcheck->saAmfHealthcheckMaxDuration);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
healthcheck->saAmfHealthcheckPeriod);
*len = offset;
return buf;
}
struct amf_healthcheck *amf_healthcheck_deserialize (
struct amf_comp *comp, char *buf, int size)
struct amf_comp *comp, char *buf)
{
int objsz = sizeof (struct amf_healthcheck);
char *tmp = buf;
int cnt;
amf_healthcheck_t *healthcheck = amf_healthcheck_new (comp);
if (objsz > size) {
return NULL;
} else {
struct amf_healthcheck *obj = amf_malloc (sizeof (struct amf_healthcheck));
memcpy (obj, buf, objsz);
obj->active = 0;
obj->timer_handle_duration = 0;
obj->timer_handle_period = 0;
obj->comp = comp;
obj->next = comp->healthcheck_head;
comp->healthcheck_head = obj;
return obj;
}
tmp = amf_deserialize_opaque (tmp, &healthcheck->safHealthcheckKey.key, &cnt);
tmp = amf_deserialize_SaUint16T (tmp,
&healthcheck->safHealthcheckKey.keyLen);
tmp = amf_deserialize_SaUint32T (tmp,
&healthcheck->saAmfHealthcheckMaxDuration);
tmp = amf_deserialize_SaUint32T (tmp,
&healthcheck->saAmfHealthcheckPeriod);
return healthcheck;
}
amf_healthcheck_t *amf_healthcheck_new (struct amf_comp *comp)
{
amf_healthcheck_t *healthcheck = amf_calloc (1, sizeof (amf_healthcheck_t));
healthcheck->comp = comp;
healthcheck->next = comp->healthcheck_head;
comp->healthcheck_head = healthcheck;
return healthcheck;
}

View File

@ -72,14 +72,13 @@
* - MANAGING_HOSTED_SERVICE_UNITS (
* . FAILING_FAST (REBOOTING_NODE and ACTIVATING_STANDBY_NODE)
* . FAILING_GRACEFULLY (SWITCHING_OVER, FAILING_OVER and REBOOTING_NODE)
* . LEAVING_SPONTANEOUSLY (DEACTIVATE_DEPENDENT and
* . LEAVING_SPONTANEOUSLY (FAILING_OVER and
* WAITING_FOR_NODE_TO_JOIN)
* . JOINING (STARTING_SERVICE_UNITS, ASSIGNING_ACTIVE_WORKLOAD and
* ASSIGNING_STANDBY_WORKLOAD)
* . JOINING (STARTING_SERVICE_UNITS and ASSIGNING_STANDBY_WORKLOAD)
*
* REPAIR_NEEDED indicates the node needs a manual repair and this state will
* maintained until the administrative command REPAIRED is entered
* (implemented in the future)
* REPAIR_NEEDED indicates the node needs a manual repair and this state will be
* maintained until the administrative command REPAIRED is entered (implemented
* in the future)
*
* ESCALLATION_LEVEL is a kind of idle state where no actions are performed
* and used only to remember the escallation level. Substate LEVEL_0 indicates
@ -105,7 +104,66 @@
* JOINING state handles the start of a node in all cases except cluster start,
* which is handled by the CLUSTER class.
*
*/
* 1. Node Availability Control State Machine
* ==========================================
*
* 1.1 State Transition Table
*
* State: Event: Action: New state:
* ============================================================================
* ESCALATION_LEVEL_0 node_sync_ready A6 JOINING_STARTING_APPLS
* ESCALATION_LEVEL_0 node_leave A9,A8 LEAVING_SP_FAILING_OVER
* JOINING_STARTING_APPLS appl_started [C4] A7 JOINING_ASSIGNING_WL
* JOINING_ASSIGNING_WL appl_assigned [C5] ESCALATION_LEVEL_0
* LEAVING_SP_FAILING_OVER sg_failed_over [C1] LEAVING_SP_WAIT_FOR_JOIN
* LEAVING_SP_WAIT_FOR_JOIN node_sync_ready A6 JOINING_STARTING_APPLS
*
* 1.2 State Description
* =====================
* ESCALATION_LEVEL_0 - Node is synchronized and idle.
* JOINING_STARTING_APPLS - JOINING_STARTING_APPLICATIONS
* Node has ordered all applications to start its SUs
* hosted on current node and is now waiting for them
* to acknowledge that they have started.
*
* JOINING_ASSIGNING_WL - JOINING_ASSIGNING_WORKLOAD
* Node has ordered all applications to assign workload
* to all its SUs which currently have no workload and
* is now waiting for the applications to acknowledge.
*
* LEAVING_SP_FAILING_OVER - LEAVING_SPONTANEOUSLY_FAILING_OVER
* Node has received an event telling that this node
* has left the cluster and has ordered all service
* groups to failover those of its SUs that were
* hosted on current node.
*
* LEAVING_SP_WAIT_FOR_JOIN - LEAVING_SPONTANEOUSLY_WAITING_FOR_NODE_TO_JOIN
* Node is waiting for current node to join again.
*
* 1.3 Actions
* ===========
* A1 -
* A2 -
* A3 -
* A4 -
* A5 -
* A6 - [foreach application in cluster]start application
* A7 - [foreach application in cluster]assign workload to application
* A8 - [foreach application in cluster]
* [foreach SG in application ]failover node
* A9 - [foreach application in cluster]
* [foreach SG in application ]
* [foreach SU in SG where the SU is hosted on current node]
* [foreach comp in such an SU]indicate that the node has left the cluster
*
* 1.4 Guards
* ==========
* C1 - All SG availability control state machines (ACSM) == IDLE
* C2 -
* C3 -
* C4 - No applications are in ACSM state == STARTING_SGS
* C5 - All applications have ACSM state == WORKLOAD_ASSIGNED
*/
#include <stdlib.h>
#include <assert.h>
@ -115,6 +173,9 @@
#include "print.h"
#include "main.h"
/******************************************************************************
* Internal (static) utility functions
*****************************************************************************/
static void amf_node_acsm_enter_leaving_spontaneously(struct amf_node *node)
{
@ -132,7 +193,10 @@ static void amf_node_acsm_enter_failing_over (struct amf_node *node)
ENTER("'%s'", node->name.value);
node->acsm_state = NODE_ACSM_LEAVING_SPONTANEOUSLY_FAILING_OVER;
/*
* Indicate to each component object in the model that current
* node has left the cluster
*/
for (app = amf_cluster->application_head; app != NULL; app = app->next) {
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
for (su = sg->su_head; su != NULL; su = su->next) {
@ -146,26 +210,89 @@ static void amf_node_acsm_enter_failing_over (struct amf_node *node)
}
}
for (app = amf_cluster->application_head; app != NULL; app = app->next) {
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
/*
* Let all service groups with service units hosted on current node failover
* its workload
*/
for (app = amf_cluster->application_head; app != NULL; app =
app->next) {
for (sg = app->sg_head; sg != NULL; sg =
sg->next) {
amf_sg_failover_node_req(sg, node);
}
}
}
#ifdef COMPILE_OUT
static int all_applications_on_node_started (struct amf_node *node,
struct amf_cluster *cluster)
{
int all_started = 1;
struct amf_application *app;
struct amf_sg *sg;
struct amf_su *su;
for (app = cluster->application_head; app != NULL; app = app->next) {
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
for (su = sg->su_head; su != NULL; su = su->next) {
/*
* TODO: Replace the if-statement below with the if-statementin
* this comment when the real problem is fixed !
*/
if (su->saAmfSUPresenceState !=
SA_AMF_PRESENCE_INSTANTIATED &&
name_match(&su->saAmfSUHostedByNode,&node->name)) {
all_started = 0; goto done;
}
if (su->saAmfSUPresenceState != SA_AMF_PRESENCE_INSTANTIATED ) {
all_started = 0;
goto done;
}
}
}
}
done:
return all_started;
}
#endif
/******************************************************************************
* Event methods
*****************************************************************************/
/**
* Node leave event is obtained from amf_confchg_fn
* This event indicates that a node has unexpectedly left the cluster. Node
* leave event is obtained from amf_confchg_fn.
*
* @param node
*/
void amf_node_leave (struct amf_node *node)
{
assert (node != NULL);
ENTER("'%s', CLM node '%s'", node->name.value,
node->saAmfNodeClmNode.value);
amf_node_acsm_enter_leaving_spontaneously(node);
amf_node_acsm_enter_failing_over (node);
}
switch (node->acsm_state) {
case NODE_ACSM_ESCALLATION_LEVEL_0:
case NODE_ACSM_ESCALLATION_LEVEL_2:
case NODE_ACSM_ESCALLATION_LEVEL_3:
amf_node_acsm_enter_leaving_spontaneously(node);
amf_node_acsm_enter_failing_over (node);
break;
case NODE_ACSM_REPAIR_NEEDED:
break;
default:
log_printf (LOG_LEVEL_ERROR, "amf_node_leave()called in state = %d"
" (should have been defered)", node->acsm_state);
openais_exit_error (AIS_DONE_FATAL_ERR);
break;
}
}
/**
*
* @param node
@ -216,85 +343,246 @@ void amf_node_comp_failover_req (
}
/**
* Node constructor
* @param loc
* @param cluster
* This event indicates that current node has joined and its cluster model has
* been synchronized with the other nodes cluster models.
*
* @param node
*/
struct amf_node *amf_node_new (struct amf_cluster *cluster, char *name)
{
struct amf_node *node = calloc (1, sizeof (struct amf_node));
if (node == NULL) {
openais_exit_error(AIS_DONE_OUT_OF_MEMORY);
}
node->next = cluster->node_head;
node->saAmfNodeAdminState = SA_AMF_ADMIN_UNLOCKED;
node->saAmfNodeOperState = SA_AMF_OPERATIONAL_ENABLED;
node->saAmfNodeAutoRepair = SA_TRUE;
node->cluster = cluster;
node->saAmfNodeSuFailOverProb = -1;
node->saAmfNodeSuFailoverMax = ~0;
setSaNameT (&node->name, name);
return node;
}
void *amf_node_serialize (struct amf_node *node, int *len)
{
int objsz = sizeof (struct amf_node);
struct amf_node *copy;
copy = amf_malloc (objsz);
memcpy (copy, node, objsz);
*len = objsz;
TRACE8 ("%s", copy->name.value);
return copy;
}
struct amf_node *amf_node_deserialize (
struct amf_cluster *cluster, char *buf, int size)
{
int objsz = sizeof (struct amf_node);
if (objsz > size) {
return NULL;
} else {
struct amf_node *obj = amf_node_new (cluster, "");
if (obj == NULL) {
return NULL;
}
memcpy (obj, buf, objsz);
TRACE8 ("%s", obj->name.value);
obj->cluster = cluster;
obj->next = cluster->node_head;
cluster->node_head = obj;
return obj;
}
}
void amf_node_sync_ready (struct amf_node *node)
{
struct amf_application *app;
assert (node != NULL);
log_printf(LOG_NOTICE, "Node %s sync ready, starting hosted SUs.",
log_printf(LOG_NOTICE, "Node=%s: sync ready, starting hosted SUs.",
node->name.value);
node->saAmfNodeOperState = SA_AMF_OPERATIONAL_ENABLED;
for (app = amf_cluster->application_head; app != NULL; app = app->next) {
amf_application_start (app, node);
switch (node->acsm_state) {
case NODE_ACSM_ESCALLATION_LEVEL_0:
case NODE_ACSM_ESCALLATION_LEVEL_2:
case NODE_ACSM_ESCALLATION_LEVEL_3:
case NODE_ACSM_LEAVING_SPONTANEOUSLY_WAITING_FOR_NODE_TO_JOIN:
node->acsm_state = NODE_ACSM_JOINING_STARTING_APPLICATIONS;
for (app = amf_cluster->application_head; app != NULL; app = app->next) {
amf_application_start (app, node);
}
break;
case NODE_ACSM_REPAIR_NEEDED:
break;
default:
log_printf (LOG_LEVEL_ERROR, "amf_node_sync_ready()called in state"
" = %d (should have been defered)", node->acsm_state);
openais_exit_error (AIS_DONE_FATAL_ERR);
break;
}
}
/******************************************************************************
* Event response methods
*****************************************************************************/
/**
* This event indicates that an application has started. Started in this context
* means that none of its contained service units is in an -ING state with other
* words successfully instantiated, instantiation has failed or instantiation
* was not possible (due to the node on which the SU was to be hosted is not
* operational).
*
* @param node
* @param application which has been started
*/
void amf_node_application_started (struct amf_node *node,
struct amf_application *app)
{
assert (node != NULL && app != NULL );
ENTER ("Node=%s: application '%s' started", node->name.value,
app->name.value);
switch (node->acsm_state) {
case NODE_ACSM_JOINING_STARTING_APPLICATIONS:
if (amf_cluster_applications_started_with_no_starting_sgs(
app->cluster)) {
log_printf(LOG_NOTICE,
"Node=%s: all applications started, assigning workload.",
node->name.value);
node->acsm_state = NODE_ACSM_JOINING_ASSIGNING_WORKLOAD;
for (app = app->cluster->application_head; app != NULL;
app = app->next) {
amf_application_assign_workload (app, node);
}
}
break;
default:
log_printf (LOG_LEVEL_ERROR, "amf_node_application_started()"
"called in state = %d (unexpected !!)", node->acsm_state);
openais_exit_error (AIS_DONE_FATAL_ERR);
break;
}
}
/**
* This event indicates that an application has been assigned workload.
*
* @param node
* @param application which has been assigned workload
*/
void amf_node_application_workload_assigned (struct amf_node *node,
struct amf_application *app)
{
assert (node != NULL && app != NULL );
ENTER ("Node=%s: application '%s' started", node->name.value,
app->name.value);
switch (node->acsm_state) {
case NODE_ACSM_JOINING_ASSIGNING_WORKLOAD:
if (amf_cluster_applications_assigned (amf_cluster)) {
log_printf(LOG_NOTICE, "Node=%s: all workload assigned",
node->name.value);
/*
* TODO: new state should be set via history
*/
node->acsm_state = NODE_ACSM_ESCALLATION_LEVEL_0;
}
break;
default:
log_printf (LOG_LEVEL_ERROR, "amf_node_application_workload_assigned()"
"called in state = %d (unexpected !!)", node->acsm_state);
openais_exit_error (AIS_DONE_FATAL_ERR);
break;
}
}
/**
* This event indicates that an SG has failed over its workload after a node
* failure.
*
* @param node
* @param sg_in SG which is now ready with its failover
*/
void amf_node_sg_failed_over (struct amf_node *node, struct amf_sg *sg_in)
{
struct amf_sg *sg;
struct amf_application *app;
int all_sg_has_failed_over = 1;
assert (node != NULL && app != NULL);
ENTER ("Node=%s: SG '%s' started", node->name.value,
sg_in->name.value);
switch (node->acsm_state) {
case NODE_ACSM_LEAVING_SPONTANEOUSLY_FAILING_OVER:
for (app = amf_cluster->application_head; app != NULL;
app = app->next) {
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
if (sg->avail_state != SG_AC_Idle) {
all_sg_has_failed_over = 0;
goto end;
}
}
}
break;
default:
log_printf (LOG_LEVEL_ERROR, "amf_node_sg_failed_over()"
"called in state = %d (unexpected !!)", node->acsm_state);
openais_exit_error (AIS_DONE_FATAL_ERR);
break;
}
end:
if (all_sg_has_failed_over) {
node->acsm_state = NODE_ACSM_LEAVING_SPONTANEOUSLY_WAITING_FOR_NODE_TO_JOIN;
}
}
/******************************************************************************
* General methods
*****************************************************************************/
void amf_node_init (void)
{
log_init ("AMF");
}
struct amf_node *amf_node_find (SaNameT *name)
/**
* Node constructor
* @param loc
* @param cluster
* @param node
*/
struct amf_node *amf_node_new (struct amf_cluster *cluster, char *name) {
struct amf_node *node = amf_calloc (1, sizeof (struct amf_node));
setSaNameT (&node->name, name);
node->saAmfNodeAdminState = SA_AMF_ADMIN_UNLOCKED;
node->saAmfNodeOperState = SA_AMF_OPERATIONAL_ENABLED;
node->saAmfNodeAutoRepair = SA_TRUE;
node->saAmfNodeSuFailOverProb = -1;
node->saAmfNodeSuFailoverMax = ~0;
node->cluster = cluster;
node->next = cluster->node_head;
cluster->node_head = node;
node->acsm_state = NODE_ACSM_ESCALLATION_LEVEL_0;
return node;
}
void *amf_node_serialize (struct amf_node *node, int *len)
{
char *buf = NULL;
int offset = 0, size = 0;
TRACE8 ("%s", node->name.value);
buf = amf_serialize_SaNameT (buf, &size, &offset, &node->name);
buf = amf_serialize_SaNameT (buf, &size, &offset, &node->saAmfNodeClmNode);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
node->saAmfNodeSuFailOverProb);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
node->saAmfNodeSuFailoverMax);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
node->saAmfNodeAutoRepair);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
node->saAmfNodeRebootOnInstantiationFailure);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
node->saAmfNodeRebootOnTerminationFailure);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
node->saAmfNodeAdminState);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
node->saAmfNodeOperState);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
node->nodeid);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
node->acsm_state);
*len = offset;
return buf;
}
struct amf_node *amf_node_deserialize (struct amf_cluster *cluster, char *buf) {
char *tmp = buf;
struct amf_node *node = amf_node_new (cluster, "");
tmp = amf_deserialize_SaNameT (tmp, &node->name);
tmp = amf_deserialize_SaNameT (tmp, &node->saAmfNodeClmNode);
tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeSuFailOverProb);
tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeSuFailoverMax);
tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeAutoRepair);
tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeRebootOnInstantiationFailure);
tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeRebootOnTerminationFailure);
tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeAdminState);
tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeOperState);
tmp = amf_deserialize_SaUint32T (tmp, &node->nodeid);
tmp = amf_deserialize_SaUint32T (tmp, &node->acsm_state);
return node;
}
struct amf_node *amf_node_find (SaNameT *name) {
struct amf_node *node;
assert (name != NULL && amf_cluster != NULL);
@ -310,8 +598,7 @@ struct amf_node *amf_node_find (SaNameT *name)
return NULL;
}
struct amf_node *amf_node_find_by_nodeid (unsigned int nodeid)
{
struct amf_node *amf_node_find_by_nodeid (unsigned int nodeid) {
struct amf_node *node;
assert (amf_cluster != NULL);
@ -327,8 +614,7 @@ struct amf_node *amf_node_find_by_nodeid (unsigned int nodeid)
return NULL;
}
struct amf_node *amf_node_find_by_hostname (const char *hostname)
{
struct amf_node *amf_node_find_by_hostname (const char *hostname) {
struct amf_node *node;
assert (hostname != NULL && amf_cluster != NULL);
@ -344,67 +630,3 @@ struct amf_node *amf_node_find_by_hostname (const char *hostname)
return NULL;
}
static int all_applications_on_node_started (struct amf_node *node,
struct amf_cluster *cluster)
{
int all_started = 1;
struct amf_application *app;
struct amf_sg *sg;
struct amf_su *su;
for (app = cluster->application_head; app != NULL; app = app->next) {
for (sg = app->sg_head; sg != NULL; sg = sg->next) {
for (su = sg->su_head; su != NULL; su = su->next) {
/* TODO: Replace the if-statement below with the if-statement in this comment when
the real problem is fixed !
if (su->saAmfSUPresenceState != SA_AMF_PRESENCE_INSTANTIATED &&
name_match(&su->saAmfSUHostedByNode,&node->name)) {
all_started = 0;
goto done;
}
*/
if (su->saAmfSUPresenceState != SA_AMF_PRESENCE_INSTANTIATED ) {
all_started = 0;
goto done;
}
}
}
}
done:
return all_started;
}
void amf_node_application_started (struct amf_node *node,
struct amf_application *_app)
{
struct amf_application *app = _app;
ENTER ("application '%s' started", app->name.value);
if (all_applications_on_node_started (node, app->cluster)) {
log_printf(LOG_NOTICE,
"Node: all applications started, assigning workload.");
for (app = _app->cluster->application_head; app != NULL;
app = app->next) {
amf_application_assign_workload (app, node);
}
}
}
void amf_node_application_workload_assigned (struct amf_node *node,
struct amf_application *app)
{
log_printf(LOG_NOTICE, "Node: all workload assigned on node %s",
node->name.value);
/**
* TODO: Set node acsm state
*/
}

View File

@ -182,6 +182,7 @@ static void return_to_idle (struct amf_sg *sg)
break;
case SG_RT_FailoverNode:
amf_node_sg_failed_over (sg->recovery_scope.node, sg);
log_printf (
LOG_NOTICE, "'%s for %s' recovery action finished",
sg_recovery_type_text[sg->recovery_scope.recovery_type],
@ -191,6 +192,7 @@ static void return_to_idle (struct amf_sg *sg)
log_printf (
LOG_NOTICE, "'%s' recovery action finished",
sg_recovery_type_text[0]);
break;
}
}
@ -280,7 +282,7 @@ static void acsm_enter_terminating_suspected (struct amf_sg *sg)
struct amf_su **sus= sg->recovery_scope.sus;
sg->avail_state = SG_AC_TerminatingSuspected;
/*
/*
* Terminate suspected SU(s)
*/
while (*sus != 0) {
@ -501,18 +503,19 @@ static void acsm_enter_repairing_su (struct amf_sg *sg)
openais_exit_error (AIS_DONE_FATAL_ERR);
}
if (node->saAmfNodeOperState == SA_AMF_OPERATIONAL_ENABLED) {
/* node is synchronized */
is_any_su_instantiated = 1;
amf_su_instantiate ((*sus));
} else {
return_to_idle (sg);
}
}
sus++;
}
if (is_any_su_instantiated == 0) {
return_to_idle (sg);
}
}
/**
@ -605,7 +608,7 @@ static void set_scope_for_failover_su (struct amf_sg *sg, struct amf_su *su)
struct amf_su **sus;
SaNameT dn;
sg->recovery_scope.recovery_type = SG_RT_FailoverSU;
sg->recovery_scope.node = NULL;
sg->recovery_scope.comp = NULL;
sg->recovery_scope.sus = (struct amf_su **)
@ -653,6 +656,7 @@ static void set_scope_for_failover_node (struct amf_sg *sg, struct amf_node *nod
ENTER ("'%s'", node->name.value);
sg->recovery_scope.recovery_type = SG_RT_FailoverNode;
sg->recovery_scope.node = node;
sg->recovery_scope.comp = NULL;
sg->recovery_scope.sus = (struct amf_su **)
calloc (1, sizeof (struct amf_su *));
@ -908,6 +912,7 @@ static void assign_si_assumed_cbfn (
confirmed_assignments);
amf_runtime_attributes_print (amf_cluster);
assert (0);
break;
}
}
@ -922,31 +927,57 @@ static inline int div_round (int a, int b)
return res;
}
#ifdef COMPILE_OUT
static int all_su_has_presence_state (
struct amf_sg *sg, struct amf_node *node_to_start,
SaAmfPresenceStateT state)
{
struct amf_su *su;
int all_set = 1;
SaAmfPresenceStateT state)
{
struct amf_su *su;
int all_set = 1;
for (su = sg->su_head; su != NULL; su = su->next) {
if (su->saAmfSUPresenceState != state) {
if (node_to_start == NULL) {
all_set = 0;
all_set = 0;
break;
} else {
if (name_match(&node_to_start->name,
&su->saAmfSUHostedByNode)) {
all_set = 0;
break;
}
}
}
}
return all_set;
}
#endif
static int no_su_has_presence_state (
struct amf_sg *sg, struct amf_node *node_to_start,
SaAmfPresenceStateT state)
{
struct amf_su *su;
int no_su_has_presence_state = 1;
for (su = sg->su_head; su != NULL; su = su->next) {
if (su->saAmfSUPresenceState == state) {
if (node_to_start == NULL) {
no_su_has_presence_state = 0;
break;
} else {
if (name_match(&node_to_start->name,
&su->saAmfSUHostedByNode)) {
all_set = 0;
no_su_has_presence_state = 0;
break;
}
}
}
}
return all_set;
}
return no_su_has_presence_state;
}
static int all_su_in_scope_has_presence_state (
struct amf_sg *sg, SaAmfPresenceStateT state)
@ -1300,14 +1331,16 @@ static int assign_si (struct amf_sg *sg, int dependency_level)
return assigned;
}
void amf_sg_assign_si (struct amf_sg *sg, int dependency_level)
int amf_sg_assign_si_req (struct amf_sg *sg, int dependency_level)
{
int posible_to_assign_si;
sg->avail_state = SG_AC_AssigningOnRequest;
if (assign_si (sg, dependency_level) == 0) {
if ((posible_to_assign_si = assign_si (sg, dependency_level)) == 0) {
return_to_idle (sg);
amf_application_sg_assigned (sg->application, sg);
}
return posible_to_assign_si;
}
void amf_sg_failover_node_req (
@ -1406,8 +1439,8 @@ void amf_sg_su_state_changed (struct amf_sg *sg,
if (type == SA_AMF_PRESENCE_STATE) {
if (state == SA_AMF_PRESENCE_INSTANTIATED) {
if (sg->avail_state == SG_AC_InstantiatingServiceUnits) {
if (all_su_has_presence_state(sg, sg->node_to_start,
SA_AMF_PRESENCE_INSTANTIATED)) {
if (no_su_has_presence_state(sg, sg->node_to_start,
SA_AMF_PRESENCE_INSTANTIATING)) {
su->sg->avail_state = SG_AC_Idle;
amf_application_sg_started (
sg->application, sg, this_amf_node);
@ -1425,6 +1458,7 @@ void amf_sg_su_state_changed (struct amf_sg *sg,
assert (0);
}
} else {
dprintf ("avail-state: %u", sg->avail_state);
assert (0);
}
} else if (state == SA_AMF_PRESENCE_UNINSTANTIATED) {
@ -1444,6 +1478,17 @@ void amf_sg_su_state_changed (struct amf_sg *sg,
} else {
assert (0);
}
} else if (state == SA_AMF_PRESENCE_INSTANTIATING) {
; /* nop */
} else if (state == SA_AMF_PRESENCE_INSTANTIATION_FAILED) {
if (sg->avail_state == SG_AC_InstantiatingServiceUnits) {
if (no_su_has_presence_state(sg, sg->node_to_start,
SA_AMF_PRESENCE_INSTANTIATING)) {
su->sg->avail_state = SG_AC_Idle;
amf_application_sg_started (
sg->application, sg, this_amf_node);
}
}
} else {
assert (0);
}
@ -1493,14 +1538,9 @@ void amf_sg_failover_su_req (
struct amf_sg *amf_sg_new (struct amf_application *app, char *name)
{
struct amf_sg *sg = calloc (1, sizeof (struct amf_sg));
struct amf_sg *sg = amf_calloc (1, sizeof (struct amf_sg));
if (sg == NULL) {
openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
}
sg->next = app->sg_head;
app->sg_head = sg;
setSaNameT (&sg->name, name);
sg->saAmfSGAdminState = SA_AMF_ADMIN_UNLOCKED;
sg->saAmfSGNumPrefActiveSUs = 1;
sg->saAmfSGNumPrefStandbySUs = 1;
@ -1513,8 +1553,8 @@ struct amf_sg *amf_sg_new (struct amf_application *app, char *name)
sg->saAmfSGAutoAdjustProb = -1;
sg->saAmfSGAutoRepair = SA_TRUE;
sg->application = app;
setSaNameT (&sg->name, name);
sg->node_to_start = NULL;
sg->next = app->sg_head;
app->sg_head = sg;
return sg;
}
@ -1585,13 +1625,10 @@ void *amf_sg_serialize (struct amf_sg *sg, int *len)
return buf;
}
struct amf_sg *amf_sg_deserialize (
struct amf_application *app, char *buf, int size)
struct amf_sg *amf_sg_deserialize (struct amf_application *app, char *buf)
{
char *tmp = buf;
struct amf_sg *sg;
sg = amf_sg_new (app, "");
struct amf_sg *sg = amf_sg_new (app, "");
tmp = amf_deserialize_SaNameT (tmp, &sg->name);
tmp = amf_deserialize_SaUint32T (tmp, &sg->saAmfSGRedundancyModel);

View File

@ -503,11 +503,7 @@ void amf_csi_delete_assignments (struct amf_csi *csi, struct amf_su *su)
struct amf_si *amf_si_new (struct amf_application *app, char *name)
{
struct amf_si *tail = app->si_head;
struct amf_si *si = calloc (1, sizeof (struct amf_si));
if (si == NULL) {
openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
}
struct amf_si *si = amf_calloc (1, sizeof (struct amf_si));
while (tail != NULL) {
if (tail->next == NULL) {
@ -559,38 +555,42 @@ void amf_si_delete (struct amf_si *si)
void *amf_si_serialize (struct amf_si *si, int *len)
{
int objsz = sizeof (struct amf_si);
struct amf_si *copy;
char *buf = NULL;
int offset = 0, size = 0;
copy = amf_malloc (objsz);
memcpy (copy, si, objsz);
*len = objsz;
TRACE8 ("%s", copy->name.value);
TRACE8 ("%s", si->name.value);
return copy;
buf = amf_serialize_SaNameT (buf, &size, &offset, &si->name);
buf = amf_serialize_SaNameT (buf, &size, &offset,
&si->saAmfSIProtectedbySG);
buf = amf_serialize_SaUint32T (buf, &size, &offset, si->saAmfSIRank);
buf = amf_serialize_SaUint32T (buf, &size, &offset, si->saAmfSINumCSIs);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
si->saAmfSIPrefActiveAssignments);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
si->saAmfSIPrefStandbyAssignments);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
si->saAmfSIAdminState);
*len = offset;
return buf;
}
struct amf_si *amf_si_deserialize (
struct amf_application *app, char *buf, int size)
struct amf_si *amf_si_deserialize (struct amf_application *app, char *buf)
{
int objsz = sizeof (struct amf_si);
char *tmp = buf;
struct amf_si *si = amf_si_new (app, "");
if (objsz > size) {
return NULL;
} else {
struct amf_si *tmp = (struct amf_si*) buf;
struct amf_si *si = amf_si_new (app, (char*)tmp->name.value);
TRACE8 ("%s", si->name.value);
tmp = amf_deserialize_SaNameT (tmp, &si->name);
tmp = amf_deserialize_SaNameT (tmp, &si->saAmfSIProtectedbySG);
tmp = amf_deserialize_SaUint32T (tmp, &si->saAmfSIRank);
tmp = amf_deserialize_SaUint32T (tmp, &si->saAmfSINumCSIs);
tmp = amf_deserialize_SaUint32T (tmp, &si->saAmfSIPrefActiveAssignments);
tmp = amf_deserialize_SaUint32T (tmp, &si->saAmfSIPrefStandbyAssignments);
tmp = amf_deserialize_SaUint32T (tmp, &si->saAmfSIAdminState);
memcpy (&si->saAmfSIProtectedbySG, &tmp->saAmfSIProtectedbySG,
sizeof (SaNameT));
si->saAmfSIRank = tmp->saAmfSIRank;
si->saAmfSINumCSIs = tmp->saAmfSINumCSIs;
si->saAmfSIPrefActiveAssignments = tmp->saAmfSIPrefActiveAssignments;
si->saAmfSIPrefStandbyAssignments = tmp->saAmfSIPrefStandbyAssignments;
si->saAmfSIAdminState = tmp->saAmfSIAdminState;
return si;
}
return si;
}
/*****************************************************************************
@ -599,48 +599,48 @@ struct amf_si *amf_si_deserialize (
struct amf_si_assignment *amf_si_assignment_new (struct amf_si *si)
{
struct amf_si_assignment *si_assignment;
struct amf_si_assignment *si_assignment =
amf_calloc (1, sizeof (struct amf_si_assignment));
si_assignment = amf_malloc (sizeof (struct amf_si_assignment));
si_assignment->si = si;
si_assignment->next = si->assigned_sis;
si->assigned_sis = si_assignment;
return si_assignment;
}
void *amf_si_assignment_serialize (
struct amf_si_assignment *si_assignment, int *len)
amf_si_assignment_t *si_assignment, int *len)
{
int objsz = sizeof (struct amf_si_assignment);
struct amf_si_assignment *copy;
char *buf = NULL;
int offset = 0, size = 0;
copy = amf_malloc (objsz);
memcpy (copy, si_assignment, objsz);
*len = objsz;
TRACE8 ("%s", copy->name.value);
TRACE8 ("%s", si_assignment->name.value);
return copy;
buf = amf_serialize_SaNameT (buf, &size, &offset, &si_assignment->name);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
si_assignment->saAmfSISUHAState);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
si_assignment->requested_ha_state);
*len = offset;
return buf;
}
struct amf_si_assignment *amf_si_assignment_deserialize (
struct amf_si *si, char *buf, int size)
struct amf_si *si, char *buf)
{
int objsz = sizeof (struct amf_si_assignment);
char *tmp = buf;
struct amf_si_assignment *si_assignment = amf_si_assignment_new (si);
if (objsz > size) {
return NULL;
} else {
struct amf_si_assignment *obj = amf_si_assignment_new (si);
if (obj == NULL) {
return NULL;
}
memcpy (obj, buf, objsz);
TRACE8 ("%s", obj->name.value);
obj->si = si;
obj->su = amf_su_find (si->application->cluster, &obj->name);
obj->next = si->assigned_sis;
si->assigned_sis = obj;
return obj;
}
tmp = amf_deserialize_SaNameT (tmp, &si_assignment->name);
tmp = amf_deserialize_SaUint32T (tmp, &si_assignment->saAmfSISUHAState);
tmp = amf_deserialize_SaUint32T (tmp, &si_assignment->requested_ha_state);
si_assignment->su = amf_su_find (si->application->cluster,
&si_assignment->name);
return si_assignment;
}
struct amf_si *amf_si_find (struct amf_application *app, char *name)
@ -668,10 +668,11 @@ struct amf_si *amf_si_find (struct amf_application *app, char *name)
struct amf_csi *amf_csi_new (struct amf_si *si)
{
struct amf_csi *csi;
struct amf_csi *csi = amf_calloc (1, sizeof (struct amf_csi));
csi = amf_malloc (sizeof (struct amf_csi));
csi->si = si;
csi->next = si->csi_head;
si->csi_head = csi;
return csi;
}
@ -691,37 +692,28 @@ void amf_csi_delete (struct amf_csi *csi)
void *amf_csi_serialize (struct amf_csi *csi, int *len)
{
int objsz = sizeof (struct amf_csi);
struct amf_csi *copy;
char *buf = NULL;
int offset = 0, size = 0;
copy = amf_malloc (objsz);
memcpy (copy, csi, objsz);
*len = objsz;
TRACE8 ("%s", copy->name.value);
TRACE8 ("%s", csi->name.value);
return copy;
buf = amf_serialize_SaNameT (buf, &size, &offset, &csi->name);
buf = amf_serialize_SaNameT (buf, &size, &offset, &csi->saAmfCSTypeName);
*len = offset;
return buf;
}
struct amf_csi *amf_csi_deserialize (struct amf_si *si, char *buf, int size)
struct amf_csi *amf_csi_deserialize (struct amf_si *si, char *buf)
{
int objsz = sizeof (struct amf_csi);
char *tmp = buf;
struct amf_csi *csi = amf_csi_new (si);
if (objsz > size) {
return NULL;
} else {
struct amf_csi *obj = amf_csi_new (si);
if (obj == NULL) {
return NULL;
}
memcpy (obj, buf, objsz);
TRACE8 ("%s", obj->name.value);
obj->si = si;
obj->assigned_csis = NULL;
obj->attributes_head = NULL;
obj->next = si->csi_head;
si->csi_head = obj;
return obj;
}
tmp = amf_deserialize_SaNameT (tmp, &csi->name);
tmp = amf_deserialize_SaNameT (tmp, &csi->saAmfCSTypeName);
return csi;
}
struct amf_csi *amf_csi_find (struct amf_si *si, char *name)
@ -747,10 +739,12 @@ struct amf_csi *amf_csi_find (struct amf_si *si, char *name)
struct amf_csi_assignment *amf_csi_assignment_new (struct amf_csi *csi)
{
struct amf_csi_assignment *csi_assignment;
struct amf_csi_assignment *csi_assignment =
amf_calloc (1, sizeof (struct amf_csi_assignment));
csi_assignment = amf_malloc (sizeof (struct amf_csi_assignment));
csi_assignment->csi = csi;
csi_assignment->next = csi->assigned_csis;
csi->assigned_csis = csi_assignment;
return csi_assignment;
}
@ -758,15 +752,20 @@ struct amf_csi_assignment *amf_csi_assignment_new (struct amf_csi *csi)
void *amf_csi_assignment_serialize (
struct amf_csi_assignment *csi_assignment, int *len)
{
int objsz = sizeof (struct amf_csi_assignment);
struct amf_csi_assignment *copy;
char *buf = NULL;
int offset = 0, size = 0;
copy = amf_malloc (objsz);
memcpy (copy, csi_assignment, objsz);
*len = objsz;
TRACE8 ("%s", copy->name.value);
TRACE8 ("%s", csi_assignment->name.value);
return copy;
buf = amf_serialize_SaNameT (buf, &size, &offset, &csi_assignment->name);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
csi_assignment->saAmfCSICompHAState);
buf = amf_serialize_SaUint32T (buf, &size, &offset,
csi_assignment->requested_ha_state);
*len = offset;
return buf;
}
struct amf_si_assignment *si_assignment_find (
@ -794,28 +793,22 @@ struct amf_si_assignment *si_assignment_find (
}
struct amf_csi_assignment *amf_csi_assignment_deserialize (
struct amf_csi *csi, char *buf, int size)
struct amf_csi *csi, char *buf)
{
int objsz = sizeof (struct amf_csi_assignment);
char *tmp = buf;
struct amf_csi_assignment *csi_assignment = amf_csi_assignment_new (csi);
if (objsz > size) {
return NULL;
} else {
struct amf_csi_assignment *obj = amf_csi_assignment_new (csi);
if (obj == NULL) {
return NULL;
}
memcpy (obj, buf, objsz);
TRACE8 ("%s", obj->name.value);
obj->csi = csi;
obj->comp = amf_comp_find (csi->si->application->cluster, &obj->name);
assert (obj->comp != NULL);
obj->next = csi->assigned_csis;
csi->assigned_csis = obj;
tmp = amf_deserialize_SaNameT (tmp, &csi_assignment->name);
tmp = amf_deserialize_SaUint32T (tmp,
&csi_assignment->saAmfCSICompHAState);
tmp = amf_deserialize_SaUint32T (tmp, &csi_assignment->requested_ha_state);
obj->si_assignment = si_assignment_find(obj);
return obj;
}
csi_assignment->comp = amf_comp_find (csi->si->application->cluster,
&csi_assignment->name);
assert (csi_assignment->comp != NULL);
csi_assignment->si_assignment = si_assignment_find(csi_assignment);
return csi_assignment;
}
char *amf_csi_assignment_dn_make (
@ -913,9 +906,9 @@ end:
struct amf_csi_attribute *amf_csi_attribute_new (struct amf_csi *csi)
{
struct amf_csi_attribute *csi_attribute;
struct amf_csi_attribute *csi_attribute =
amf_calloc (1, sizeof (struct amf_csi_assignment));
csi_attribute = amf_malloc (sizeof (struct amf_csi_assignment));
csi_attribute->next = csi->attributes_head;
csi->attributes_head = csi_attribute;
@ -949,7 +942,7 @@ void *amf_csi_attribute_serialize (
}
struct amf_csi_attribute *amf_csi_attribute_deserialize (
struct amf_csi *csi, char *buf, int size)
struct amf_csi *csi, char *buf)
{
char *tmp = buf;
struct amf_csi_attribute *csi_attribute;

View File

@ -128,20 +128,6 @@
#include "print.h"
#include "main.h"
int amf_su_presence_state_all_comps_in_su_are_set (struct amf_su *su,
SaAmfPresenceStateT state)
{
int all_set = 1;
struct amf_comp *comp;
for (comp = su->comp_head; comp != NULL; comp = comp->next) {
if (comp->saAmfCompPresenceState != state) {
all_set = 0;
}
}
return all_set;
}
/**
* This function only logs since the readiness state is runtime
@ -264,6 +250,182 @@ static void comp_restart (struct amf_comp *comp)
amf_comp_restart (comp);
}
static void si_ha_state_assumed_cbfn (
struct amf_si_assignment *si_assignment, int result)
{
struct amf_si_assignment *tmp_si_assignment;
struct amf_comp *comp;
struct amf_csi_assignment *csi_assignment;
int all_confirmed = 1;
ENTER ("");
tmp_si_assignment = amf_su_get_next_si_assignment(si_assignment->su, NULL);
while (tmp_si_assignment != NULL) {
for (comp = tmp_si_assignment->su->comp_head; comp != NULL;
comp = comp->next) {
csi_assignment = amf_comp_get_next_csi_assignment(comp, NULL);
while (csi_assignment != NULL) {
if (csi_assignment->requested_ha_state !=
csi_assignment->saAmfCSICompHAState) {
all_confirmed = 0;
}
csi_assignment = amf_comp_get_next_csi_assignment(
comp, csi_assignment);
}
}
tmp_si_assignment = amf_su_get_next_si_assignment(
si_assignment->su, tmp_si_assignment);
}
if (all_confirmed) {
switch (si_assignment->su->restart_control_state) {
case SU_RC_RESTART_COMP_SETTING:
log_printf (LOG_NOTICE, "Component restart recovery finished");
break;
case SU_RC_RESTART_SU_SETTING:
log_printf (LOG_NOTICE, "SU restart recovery finished");
break;
default:
assert (0);
break;
}
si_assignment->su->restart_control_state =
si_assignment->su->escalation_level_history_state;
}
}
static void reassign_sis(struct amf_su *su)
{
struct amf_si_assignment *si_assignment;
ENTER ("");
si_assignment = amf_su_get_next_si_assignment(su, NULL);
while (si_assignment != NULL) {
si_assignment->saAmfSISUHAState = 0; /* unknown */
amf_si_ha_state_assume (si_assignment, si_ha_state_assumed_cbfn);
si_assignment = amf_su_get_next_si_assignment(su, si_assignment);
}
}
static void su_comp_presence_state_changed (
struct amf_su *su, struct amf_comp *comp, int state)
{
ENTER ("'%s', '%s'", su->name.value, comp->name.value);
switch (state) {
case SA_AMF_PRESENCE_INSTANTIATED:
switch (su->restart_control_state) {
case SU_RC_ESCALATION_LEVEL_2:
/*
* TODO: send to node
*/
case SU_RC_ESCALATION_LEVEL_0:
if (amf_su_presence_state_all_comps_in_su_are_set (
comp->su, SA_AMF_PRESENCE_INSTANTIATED)) {
su_presence_state_set (
comp->su, SA_AMF_PRESENCE_INSTANTIATED);
}
break;
case SU_RC_RESTART_COMP_RESTARTING:
su->restart_control_state = SU_RC_RESTART_COMP_SETTING;
reassign_sis (comp->su);
break;
case SU_RC_RESTART_SU_INSTANTIATING:
if (amf_su_presence_state_all_comps_in_su_are_set (
comp->su, SA_AMF_PRESENCE_INSTANTIATED)) {
su->restart_control_state = SU_RC_RESTART_SU_SETTING;
su_presence_state_set (
comp->su, SA_AMF_PRESENCE_INSTANTIATED);
reassign_sis (comp->su);
}
break;
default:
dprintf ("state %d", su->restart_control_state);
assert (0);
break;
}
break;
case SA_AMF_PRESENCE_UNINSTANTIATED:
if (amf_su_presence_state_all_comps_in_su_are_set (
su, SA_AMF_PRESENCE_UNINSTANTIATED)) {
su_presence_state_set (comp->su,SA_AMF_PRESENCE_UNINSTANTIATED);
}
break;
case SA_AMF_PRESENCE_INSTANTIATING:
su_presence_state_set (comp->su,SA_AMF_PRESENCE_INSTANTIATING);
break;
case SA_AMF_PRESENCE_RESTARTING:
break;
case SA_AMF_PRESENCE_TERMINATING:
break;
case SA_AMF_PRESENCE_INSTANTIATION_FAILED:
su_presence_state_set (
comp->su, SA_AMF_PRESENCE_INSTANTIATION_FAILED);
break;
default:
assert (0);
break;
}
}
static void su_comp_op_state_changed (
struct amf_su *su, struct amf_comp *comp, int state)
{
ENTER ("'%s', '%s'", su->name.value, comp->name.value);
switch (state) {
case SA_AMF_OPERATIONAL_ENABLED:
{
struct amf_comp *comp_compare;
int all_set = 1;
for (comp_compare = comp->su->comp_head;
comp_compare != NULL; comp_compare = comp_compare->next) {
if (comp_compare->saAmfCompOperState !=
SA_AMF_OPERATIONAL_ENABLED) {
all_set = 0;
break;
}
}
if (all_set) {
su_operational_state_set (comp->su, SA_AMF_OPERATIONAL_ENABLED);
} else {
su_operational_state_set (comp->su, SA_AMF_OPERATIONAL_DISABLED);
}
break;
}
case SA_AMF_OPERATIONAL_DISABLED:
break;
default:
assert (0);
break;
}
}
int amf_su_presence_state_all_comps_in_su_are_set (struct amf_su *su,
SaAmfPresenceStateT state)
{
int all_set = 1;
struct amf_comp *comp;
for (comp = su->comp_head; comp != NULL; comp = comp->next) {
if (comp->saAmfCompPresenceState != state) {
all_set = 0;
}
}
return all_set;
}
void amf_su_instantiate (struct amf_su *su)
{
struct amf_comp *comp;
@ -331,158 +493,6 @@ void amf_su_assign_si (struct amf_su *su, struct amf_si *si,
}
}
static void si_ha_state_assumed_cbfn (
struct amf_si_assignment *si_assignment, int result)
{
struct amf_si_assignment *tmp_si_assignment;
struct amf_comp *comp;
struct amf_csi_assignment *csi_assignment;
int all_confirmed = 1;
ENTER ("");
tmp_si_assignment = amf_su_get_next_si_assignment(si_assignment->su, NULL);
while (tmp_si_assignment != NULL) {
for (comp = tmp_si_assignment->su->comp_head; comp != NULL;
comp = comp->next) {
csi_assignment = amf_comp_get_next_csi_assignment(comp, NULL);
while (csi_assignment != NULL) {
if (csi_assignment->requested_ha_state !=
csi_assignment->saAmfCSICompHAState) {
all_confirmed = 0;
}
csi_assignment = amf_comp_get_next_csi_assignment(
comp, csi_assignment);
}
}
tmp_si_assignment = amf_su_get_next_si_assignment(
si_assignment->su, tmp_si_assignment);
}
if (all_confirmed) {
switch (si_assignment->su->restart_control_state) {
case SU_RC_RESTART_COMP_SETTING:
log_printf (LOG_NOTICE, "Component restart recovery finished");
break;
case SU_RC_RESTART_SU_SETTING:
log_printf (LOG_NOTICE, "SU restart recovery finished");
break;
default:
assert (0);
}
si_assignment->su->restart_control_state =
si_assignment->su->escalation_level_history_state;
}
}
static void reassign_sis(struct amf_su *su)
{
struct amf_si_assignment *si_assignment;
ENTER ("");
si_assignment = amf_su_get_next_si_assignment(su, NULL);
while (si_assignment != NULL) {
si_assignment->saAmfSISUHAState = 0; /* unknown */
amf_si_ha_state_assume (si_assignment, si_ha_state_assumed_cbfn);
si_assignment = amf_su_get_next_si_assignment(su, si_assignment);
}
}
static void su_comp_presence_state_changed (
struct amf_su *su, struct amf_comp *comp, int state)
{
ENTER ("'%s', '%s'", su->name.value, comp->name.value);
switch (state) {
case SA_AMF_PRESENCE_INSTANTIATED:
switch (su->restart_control_state) {
case SU_RC_ESCALATION_LEVEL_2:
/*
* TODO: send to node
*/
case SU_RC_ESCALATION_LEVEL_0:
if (amf_su_presence_state_all_comps_in_su_are_set (
comp->su, SA_AMF_PRESENCE_INSTANTIATED)) {
su_presence_state_set (
comp->su, SA_AMF_PRESENCE_INSTANTIATED);
}
break;
case SU_RC_RESTART_COMP_RESTARTING:
su->restart_control_state = SU_RC_RESTART_COMP_SETTING;
reassign_sis (comp->su);
break;
case SU_RC_RESTART_SU_INSTANTIATING:
if (amf_su_presence_state_all_comps_in_su_are_set (
comp->su, SA_AMF_PRESENCE_INSTANTIATED)) {
su->restart_control_state = SU_RC_RESTART_SU_SETTING;
su_presence_state_set (
comp->su, SA_AMF_PRESENCE_INSTANTIATED);
reassign_sis (comp->su);
}
break;
default:
dprintf ("state %d", su->restart_control_state);
assert (0);
}
break;
case SA_AMF_PRESENCE_UNINSTANTIATED:
if (amf_su_presence_state_all_comps_in_su_are_set (
su, SA_AMF_PRESENCE_UNINSTANTIATED)) {
su_presence_state_set (comp->su,
SA_AMF_PRESENCE_UNINSTANTIATED);
}
break;
case SA_AMF_PRESENCE_INSTANTIATING:
break;
case SA_AMF_PRESENCE_RESTARTING:
break;
case SA_AMF_PRESENCE_TERMINATING:
break;
default:
assert (0);
}
}
static void su_comp_op_state_changed (
struct amf_su *su, struct amf_comp *comp, int state)
{
ENTER ("'%s', '%s'", su->name.value, comp->name.value);
switch (state) {
case SA_AMF_OPERATIONAL_ENABLED:
{
struct amf_comp *comp_compare;
int all_set = 1;
for (comp_compare = comp->su->comp_head;
comp_compare != NULL; comp_compare = comp_compare->next) {
if (comp_compare->saAmfCompOperState !=
SA_AMF_OPERATIONAL_ENABLED) {
all_set = 0;
break;
}
}
if (all_set) {
su_operational_state_set (comp->su, SA_AMF_OPERATIONAL_ENABLED);
} else {
su_operational_state_set (comp->su, SA_AMF_OPERATIONAL_DISABLED);
}
break;
}
case SA_AMF_OPERATIONAL_DISABLED:
break;
default:
assert (0);
}
}
/**
* Used by a component to report a state change event
@ -503,6 +513,7 @@ void amf_su_comp_state_changed (
break;
default:
assert (0);
break;
}
}
@ -749,11 +760,7 @@ SaAmfReadinessStateT amf_su_get_saAmfSUReadinessState (struct amf_su *su)
struct amf_su *amf_su_new (struct amf_sg *sg, char *name)
{
struct amf_su *tail = sg->su_head;
struct amf_su *su = calloc (1, sizeof (struct amf_su));
if (su == NULL) {
openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
}
struct amf_su *su = amf_calloc (1, sizeof (struct amf_su));
while (tail != NULL) {
if (tail->next == NULL) {
@ -836,12 +843,10 @@ void *amf_su_serialize (struct amf_su *su, int *len)
return buf;
}
struct amf_su *amf_su_deserialize (struct amf_sg *sg, char *buf, int size)
struct amf_su *amf_su_deserialize (struct amf_sg *sg, char *buf)
{
char *tmp = buf;
struct amf_su *su;
su = amf_su_new (sg, "");
struct amf_su *su = amf_su_new (sg, "");
tmp = amf_deserialize_SaNameT (tmp, &su->name);
tmp = amf_deserialize_SaUint32T (tmp, &su->saAmfSURank);

View File

@ -327,11 +327,7 @@ struct amf_cluster *amf_config_read (char **error_string)
cluster->node_head = node;
current_parse = AMF_NODE;
} else if ((loc = strstr_rs (line, "safApp=")) != 0) {
app = calloc (1, sizeof (struct amf_application));
app->next = cluster->application_head;
cluster->application_head = app;
app->cluster = cluster;
app->saAmfApplicationAdminState = SA_AMF_ADMIN_UNLOCKED;
app = amf_application_new (cluster);
setSaNameT (&app->name, trim_str (loc));
current_parse = AMF_APPLICATION;
sg_cnt = 0;
@ -409,6 +405,7 @@ struct amf_cluster *amf_config_read (char **error_string)
sg_cnt++;
sg->recovery_scope.comp = NULL;
sg->recovery_scope.recovery_type = 0;
sg->recovery_scope.node = NULL;
sg->recovery_scope.sis = NULL;
sg->recovery_scope.sus = NULL;
current_parse = AMF_SG;
@ -921,7 +918,7 @@ void amf_runtime_attributes_print (struct amf_cluster *cluster)
log_printf (LOG_INFO, "safCluster=%s", getSaNameT(&cluster->name));
log_printf (LOG_INFO, " admin state: %s\n",
admin_state_text[cluster->saAmfClusterAdminState]);
log_printf (LOG_INFO, " state: %u\n", cluster->state);
log_printf (LOG_INFO, " state: %u\n", cluster->acsm_state);
for (node = cluster->node_head; node != NULL; node = node->next) {
log_printf (LOG_INFO, " safNode=%s\n", getSaNameT (&node->name));
log_printf (LOG_INFO, " CLM Node: %s\n", getSaNameT (&node->saAmfNodeClmNode));
@ -1109,6 +1106,24 @@ char *amf_serialize_SaStringT (char *buf, int *size, int *offset, SaStringT str)
return amf_serialize_opaque (buf, size, offset, str, len);
}
char *amf_serialize_SaUint16T (char *buf, int *size, int *offset, SaUint16T num)
{
char *tmp = buf;
if ((*size - *offset ) < sizeof (SaUint16T)) {
*size += sizeof (SaUint16T);
tmp = realloc (buf, *size);
if (tmp == NULL) {
openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
}
}
*((SaUint16T *)&tmp[*offset]) = num;
(*offset) += sizeof (SaUint16T);
return tmp;
}
char *amf_serialize_SaUint32T (char *buf, int *size, int *offset, SaUint32T num)
{
char *tmp = buf;
@ -1127,14 +1142,8 @@ char *amf_serialize_SaUint32T (char *buf, int *size, int *offset, SaUint32T num)
return tmp;
}
char *amf_serialize_SaUint64T (char *buf, SaUint64T num)
{
*((SaUint64T *)buf) = num;
return buf + sizeof (SaUint64T);
}
char *amf_serialize_opaque (
char *buf, int *size, int *offset, char *src, int cnt)
char *buf, int *size, int *offset, void *src, int cnt)
{
unsigned int required_size;
char *tmp = buf;
@ -1185,19 +1194,19 @@ char *amf_deserialize_SaStringT (char *buf, SaStringT *str)
return tmp;
}
char *amf_deserialize_SaUint16T (char *buf, SaUint16T *num)
{
*num = *((SaUint16T *)buf);
return buf + sizeof (SaUint16T);
}
char *amf_deserialize_SaUint32T (char *buf, SaUint32T *num)
{
*num = *((SaUint32T *)buf);
return buf + sizeof (SaUint32T);
}
char *amf_deserialize_SaUint64T (char *buf, SaUint64T *num)
{
*num = *((SaUint64T *)buf);
return buf + sizeof (SaUint64T);
}
char *amf_deserialize_opaque (char *buf, char *dst, int *cnt)
char *amf_deserialize_opaque (char *buf, void *dst, int *cnt)
{
*cnt = *((SaUint32T *)buf);
memcpy (dst, buf + sizeof (SaUint32T), *cnt);
@ -1216,6 +1225,18 @@ void *_amf_malloc (size_t size, char *file, unsigned int line)
return tmp;
}
void *_amf_calloc (size_t nmemb, size_t size, char *file, unsigned int line)
{
void *tmp = calloc (nmemb, size);
if (tmp == NULL) {
log_printf (LOG_LEVEL_ERROR, "AMF out-of-memory at %s:%u", file, line);
openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
}
return tmp;
}
int sa_amf_grep_one_sub_match(const char *string, char *pattern,
SaNameT *matches_arr)
{
@ -1297,17 +1318,26 @@ out:
}
void amf_msg_mcast (int id, void *buf, size_t len)
/**
* Multicast a message to the cluster. Errors are treated as
* fatal and will exit the program.
* @param msg_id
* @param buf
* @param len
*
* @return int
*/
int amf_msg_mcast (int msg_id, void *buf, size_t len)
{
struct req_exec_amf_msg msg;
struct iovec iov[2];
int iov_cnt;
int res;
// ENTER ("%u, %p, %u", id, buf, len);
// ENTER ("%u, %p, %u", msg_id, buf, len);
msg.header.size = sizeof (msg);
msg.header.id = SERVICE_ID_MAKE (AMF_SERVICE, id);
msg.header.id = SERVICE_ID_MAKE (AMF_SERVICE, msg_id);
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof (msg);
@ -1328,6 +1358,8 @@ void amf_msg_mcast (int id, void *buf, size_t len)
dprintf("Unable to send %d bytes\n", msg.header.size);
openais_exit_error (AIS_DONE_FATAL_ERR);
}
return res;
}
void amf_util_init (void)