diff --git a/exec/amf.c b/exec/amf.c index 89e156d7..636bd98e 100644 --- a/exec/amf.c +++ b/exec/amf.c @@ -68,7 +68,9 @@ * * State: Event: Action: New state: * =========================================================================== - * IDLE node_joined PROBING-1 + * - init[AMF disabled] UNCONFIGURED + * - init IDLE + * IDLE node_joined A0 PROBING-1 * PROBING-1 timer1 timeout A1 PROBING-2 * PROBING-1 SYNC_START A2 UPDATING_CLUSTER_MODEL * PROBING-1 node_joined A7 PROBING-1 @@ -76,14 +78,15 @@ * PROBING-2 SYNC_START[From other] UPDATING_CLUSTER_MODEL * PROBING-2 node_joined A7 PROBING-2 * CREATING_CLUSTER_MODEL Model created A8 SYNCHRONIZING - * SYNCHRONIZING SYNC_READY A10 NORMAL_OPERATION + * SYNCHRONIZING sync_activate A10 NORMAL_OPERATION * SYNCHRONIZING node_left[sync_master] A5 SYNCHRONIZING - * SYNCHRONIZING node_joined SYNCHRONIZING + * SYNCHRONIZING node_joined[sync_master + * == me] A1 SYNCHRONIZING * UPDATING_CLUSTER_MODEL SYNC_DATA A3 UPDATING_CLUSTER_MODEL - * UPDATING_CLUSTER_MODEL SYNC_READY A4 NORMAL_OPERATION - * UPDATING_CLUSTER_MODEL SYNC_START A5 NORMAL_OPERATION + * UPDATING_CLUSTER_MODEL sync_activate A4 NORMAL_OPERATION + * UPDATING_CLUSTER_MODEL SYNC_START A5 UPDATING_CLUSTER_MODEL * UPDATING_CLUSTER_MODEL node_left[sync_master] PROBING-1 - * NORMAL_OPERATION node_joined SYNCHRONIZING + * NORMAL_OPERATION sync_init SYNCHRONIZING * NORMAL_OPERATION node_left[sync_master] A6 NORMAL_OPERATION * NORMAL_OPERATION SYNC_REQUEST A8 NORMAL_OPERATION * Any SYNC_REQUEST A9 No change @@ -103,16 +106,17 @@ * * 1.3 Action Description * ====================== + * A0 - Start timer1 * A1 - Multicast SYNC_START message * A2 - Stop timer1 * A3 - Decode AMF object and save - * A4 - Create cluster model + * A4 - Create cluster model; cluster sync ready * A5 - Free received SYNC_DATA * A6 - Calculate new sync master * A7 - Multicast SYNC_REQUEST message * A8 - Update AMF node object(s) with CLM nodeid * A9 - Save CLM nodeid & hostname - * A10- Delete CLM nodes + * A10- Delete CLM nodes; cluster sync ready */ #include @@ -195,6 +199,8 @@ static void message_handler_req_exec_amf_comp_register ( void *message, unsigned int nodeid); static void message_handler_req_exec_amf_comp_error_report ( void *message, unsigned int nodeid); +static void message_handler_req_exec_amf_comp_instantiate ( + void *message, unsigned int nodeid); static void message_handler_req_exec_amf_clc_cleanup_completed ( void *message, unsigned int nodeid); static void message_handler_req_exec_amf_healthcheck_tmo ( @@ -211,6 +217,8 @@ static void message_handler_req_exec_amf_cluster_start_tmo ( void *message, unsigned int nodeid); static void message_handler_req_exec_amf_sync_request ( void *message, unsigned int nodeid); +static void message_handler_req_exec_amf_comp_instantiate_tmo( + void *message, unsigned int nodeid); static void amf_dump_fn (void); static void amf_sync_init (void); static int amf_sync_process (void); @@ -323,6 +331,9 @@ static struct openais_exec_handler amf_exec_service[] = { { .exec_handler_fn = message_handler_req_exec_amf_comp_error_report, }, + { + .exec_handler_fn = message_handler_req_exec_amf_comp_instantiate, + }, { .exec_handler_fn = message_handler_req_exec_amf_clc_cleanup_completed, }, @@ -347,6 +358,9 @@ static struct openais_exec_handler amf_exec_service[] = { { .exec_handler_fn = message_handler_req_exec_amf_sync_request, }, + { + .exec_handler_fn = message_handler_req_exec_amf_comp_instantiate_tmo, + }, }; /* @@ -425,6 +439,7 @@ struct req_exec_amf_comp_error_report { SaNtfIdentifierT ntfIdentifier; }; + struct req_exec_amf_response { mar_req_header_t header; SaUint32T interface; @@ -434,11 +449,13 @@ struct req_exec_amf_response { struct req_exec_amf_sync_data { mar_req_header_t header; - amf_object_type_t object_type; + SaUint32T protocol_version; + SaUint32T object_type; }; struct req_exec_amf_sync_request { mar_req_header_t header; + SaUint32T protocol_version; char hostname[HOST_NAME_MAX + 1]; }; @@ -585,12 +602,10 @@ static void nodeids_init (void) * * @return struct amf_node* */ -static struct amf_node *get_this_node_obj (struct amf_cluster *cluster) +static struct amf_node *get_this_node_obj (void) { char hostname[HOST_NAME_MAX + 1]; - assert (cluster != NULL); - if (gethostname (hostname, sizeof(hostname)) == -1) { log_printf (LOG_LEVEL_ERROR, "gethostname failed: %d", errno); openais_exit_error (AIS_DONE_FATAL_ERR); @@ -600,7 +615,7 @@ static struct amf_node *get_this_node_obj (struct amf_cluster *cluster) } /** - * Prints old and new sync state, sets new tate + * Prints old and new sync state, sets new state * @param state */ static void sync_state_set (enum scsm_states state) @@ -630,6 +645,7 @@ static int mcast_sync_data ( SYNCTRACE ("%d bytes, type %u", req_exec.header.size , object_type); req_exec.header.id = SERVICE_ID_MAKE (AMF_SERVICE, MESSAGE_REQ_EXEC_AMF_SYNC_DATA); + req_exec.protocol_version = AMF_PROTOCOL_VERSION; req_exec.object_type = object_type; iov[0].iov_base = &req_exec; @@ -672,7 +688,7 @@ static void sync_request (void) SYNCTRACE (""); - assert (amf_cluster->state == CLUSTER_UNINSTANTIATED); + assert (amf_cluster->acsm_state == CLUSTER_AC_UNINSTANTIATED); amf_sync_init (); @@ -703,7 +719,7 @@ static int create_cluster_model (void) openais_exit_error (AIS_DONE_AMFCONFIGREAD); } - this_amf_node = get_this_node_obj (amf_cluster); + this_amf_node = get_this_node_obj (); if (this_amf_node == NULL) { log_printf (LOG_LEVEL_INFO, @@ -786,7 +802,6 @@ static int healthcheck_sync (struct amf_healthcheck *healthcheck) SYNCTRACE ("%s", healthcheck->safHealthcheckKey.key); buf = amf_healthcheck_serialize (healthcheck, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_HEALTHCHECK); free (buf); if (res != 0) { @@ -805,7 +820,6 @@ static int comp_sync (struct amf_comp *comp) if (!scsm.comp_sync_completed) { buf = amf_comp_serialize (comp, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_COMP); free (buf); if (res != 0) { @@ -837,7 +851,6 @@ static int su_sync (struct amf_su *su) if (!scsm.su_sync_completed) { buf = amf_su_serialize (su, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_SU); free (buf); if (res != 0) { @@ -868,7 +881,6 @@ static int sg_sync (struct amf_sg *sg) if (!scsm.sg_sync_completed) { buf = amf_sg_serialize (sg, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_SG); free (buf); if (res != 0) { @@ -898,7 +910,6 @@ static int csi_assignment_sync (struct amf_csi_assignment *csi_assignment) SYNCTRACE ("%s", csi_assignment->name.value); buf = amf_csi_assignment_serialize (csi_assignment, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_CSI_ASSIGNMENT); free (buf); if (res != 0) { @@ -916,7 +927,6 @@ static int csi_attribute_sync (struct amf_csi_attribute *csi_attribute) SYNCTRACE ("%s", csi_attribute->name); buf = amf_csi_attribute_serialize (csi_attribute, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_CSI_ATTRIBUTE); free (buf); if (res != 0) { @@ -935,7 +945,6 @@ static int csi_sync (struct amf_csi *csi) if (!scsm.csi_sync_completed) { buf = amf_csi_serialize (csi, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_CSI); free (buf); if (res != 0) { @@ -947,7 +956,8 @@ static int csi_sync (struct amf_csi *csi) if (scsm.csi_assignment == NULL) { scsm.csi_assignment = scsm.csi->assigned_csis; } - for (; scsm.csi_assignment != NULL; scsm.csi_assignment = scsm.csi_assignment->next) { + for (; scsm.csi_assignment != NULL; + scsm.csi_assignment = scsm.csi_assignment->next) { if (csi_assignment_sync (scsm.csi_assignment) != 0) { return 1; /* try again later */ } @@ -974,7 +984,6 @@ static int si_assignment_sync (struct amf_si_assignment *si_assignment) SYNCTRACE ("%s", si_assignment->name.value); buf = amf_si_assignment_serialize (si_assignment, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_SI_ASSIGNMENT); free (buf); if (res != 0) { @@ -993,7 +1002,6 @@ static int si_sync (struct amf_si *si) if (!scsm.si_sync_completed) { buf = amf_si_serialize (si, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_SI); free (buf); if (res != 0) { @@ -1033,7 +1041,6 @@ static int application_sync (struct amf_application *app) if (!scsm.app_sync_completed) { buf = amf_application_serialize (app, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_APPLICATION); free (buf); if (res != 0) { @@ -1072,7 +1079,6 @@ static int node_sync (struct amf_node *node) SYNCTRACE ("%s", node->name.value); buf = amf_node_serialize (node, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_NODE); free (buf); if (res != 0) { @@ -1089,7 +1095,6 @@ static int cluster_sync (struct amf_cluster *cluster) SYNCTRACE ("%s", cluster->name.value); buf = amf_cluster_serialize (cluster, &len); - assert (buf != NULL); res = mcast_sync_data (buf, len, AMF_CLUSTER); free (buf); if (res != 0) { @@ -1121,10 +1126,13 @@ static int is_member ( return 0; } + + /** * Start the AMF nodes that has joined */ -static void joined_nodes_start (void) + +static void cluster_joined_nodes_start (void) { int i; struct amf_node *node; @@ -1133,7 +1141,7 @@ static void joined_nodes_start (void) node = amf_node_find_by_nodeid (scsm.joined_list[i]); if (node != NULL) { - amf_node_sync_ready (node); + amf_cluster_sync_ready (amf_cluster, node); } else { log_printf (LOG_LEVEL_INFO, "Info: Node %u is not configured as an AMF node", scsm.joined_list[i]); @@ -1163,6 +1171,7 @@ static void amf_sync_init (void) default: dprintf ("unknown state: %u", scsm.state);; assert (0); + break; } if (scsm.state == SYNCHRONIZING && scsm.sync_master == this_ip->nodeid) { @@ -1251,6 +1260,7 @@ static void amf_sync_abort (void) { SYNCTRACE ("state %s", scsm_state_names[scsm.state]); memset (&scsm, 0, sizeof (scsm)); + assert (0); /* not ready... */ } /** @@ -1274,51 +1284,21 @@ static void amf_sync_activate (void) } clm_nodes = NULL; sync_state_set (NORMAL_OPERATION); - /* TODO: Remove dependencies to amf_cluster->state */ - switch (amf_cluster->state) { - case CLUSTER_STARTED: - case CLUSTER_STARTING_WORKLOAD: - joined_nodes_start (); - break; - case CLUSTER_STARTING_COMPONENTS: { - amf_cluster_sync_ready (amf_cluster); - joined_nodes_start (); - break; - } - case CLUSTER_UNINSTANTIATED: - default: { - amf_cluster_sync_ready (amf_cluster); - } - } + + cluster_joined_nodes_start (); break; case UPDATING_CLUSTER_MODEL: amf_cluster = scsm.cluster; assert (amf_cluster != NULL); scsm.cluster = NULL; - this_amf_node = get_this_node_obj (amf_cluster); + this_amf_node = get_this_node_obj (); sync_state_set (NORMAL_OPERATION); if (this_amf_node != NULL) { this_amf_node->nodeid = this_ip->nodeid; #ifdef AMF_DEBUG amf_runtime_attributes_print (amf_cluster); #endif - /* TODO: Remove dependencies to amf_cluster->state */ - switch (amf_cluster->state) { - case CLUSTER_STARTED: { - case CLUSTER_STARTING_WORKLOAD: - amf_node_sync_ready (this_amf_node); - break; - } - case CLUSTER_STARTING_COMPONENTS: { - amf_cluster_sync_ready (amf_cluster); - amf_node_sync_ready (this_amf_node); - break; - } - case CLUSTER_UNINSTANTIATED: - default: { - amf_cluster_sync_ready (amf_cluster); - } - } + amf_cluster_sync_ready (amf_cluster, this_amf_node); } else { log_printf (LOG_LEVEL_INFO, "Info: This node is not configured as an AMF node, disabling."); @@ -1332,6 +1312,7 @@ static void amf_sync_activate (void) default: dprintf ("unknown state: %u", scsm.state);; assert (0); + break; } SYNCTRACE (""); @@ -1419,8 +1400,11 @@ static void amf_confchg_fn ( /* fall-through */ case PROBING_2: if (joined_list_entries > 0) { + struct req_exec_amf_sync_request msg; + memcpy (msg.hostname, hostname, strlen (hostname) + 1); + msg.protocol_version = AMF_PROTOCOL_VERSION; amf_msg_mcast (MESSAGE_REQ_EXEC_AMF_SYNC_REQUEST, - hostname, strlen (hostname) + 1); + &msg.protocol_version, sizeof (msg) - sizeof (mar_req_header_t)); } break; case UNCONFIGURED: @@ -1493,6 +1477,7 @@ static void amf_confchg_fn ( default: log_printf (LOG_LEVEL_ERROR, "unknown state: %u\n", scsm.state); assert (0); + break; } } @@ -1504,15 +1489,13 @@ static int amf_lib_exit_fn (void *conn) assert (amf_pd != NULL); comp = amf_pd->comp; - assert (comp != NULL); /* Make sure this is not a new connection */ - if (comp->conn == conn) { + if (comp != NULL && comp->conn == conn ) { comp->conn = NULL; + dprintf ("Lib exit from comp %s\n", getSaNameT (&comp->name)); } - dprintf ("Lib exit from comp %s\n", getSaNameT (&comp->name)); - return (0); } @@ -1533,6 +1516,8 @@ static void amf_dump_fn (void) amf_runtime_attributes_print (amf_cluster); } + + /****************************************************************************** * Executive Message Implementation *****************************************************************************/ @@ -1553,7 +1538,7 @@ static void message_handler_req_exec_amf_comp_register ( assert (comp != NULL); TRACE1 ("ComponentRegister: '%s'", comp->name.value); error = amf_comp_register (comp); - + if (amf_su_is_local (comp->su)) { res_lib.header.id = MESSAGE_RES_AMF_COMPONENTREGISTER; res_lib.header.size = sizeof (struct res_lib_amf_componentregister); @@ -1578,6 +1563,38 @@ static void message_handler_req_exec_amf_comp_error_report ( amf_comp_error_report (comp, req_exec->recommendedRecovery); } + +static void message_handler_req_exec_amf_comp_instantiate( + void *message, unsigned int nodeid) +{ + struct req_exec_amf_comp_instantiate *req_exec = message; + struct amf_comp *component; + + component = amf_comp_find (amf_cluster, &req_exec->compName); + if (component == NULL) { + log_printf (LOG_ERR, "Error: '%s' not found", req_exec->compName.value); + return; + + } + + amf_comp_instantiate_event (component); +} + +static void message_handler_req_exec_amf_comp_instantiate_tmo( + void *message, unsigned int nodeid) +{ + struct req_exec_amf_comp_instantiate_tmo *req_exec = message; + struct amf_comp *component; + + component = amf_comp_find (amf_cluster, &req_exec->compName); + if (component == NULL) { + log_printf (LOG_ERR, "Error: '%s' not found", req_exec->compName.value); + return; + + } + amf_comp_instantiate_tmo_event (component); +} + static void message_handler_req_exec_amf_clc_cleanup_completed ( void *message, unsigned int nodeid) { @@ -1621,6 +1638,7 @@ static void message_handler_req_exec_amf_healthcheck_tmo ( amf_comp_healthcheck_tmo (comp, healthcheck); } + static void message_handler_req_exec_amf_response ( void *message, unsigned int nodeid) { @@ -1690,6 +1708,7 @@ static void message_handler_req_exec_amf_sync_start ( default: dprintf ("unknown state %d", scsm.state); assert (0); + break; } } @@ -1697,10 +1716,17 @@ static void message_handler_req_exec_amf_sync_data ( void *message, unsigned int nodeid) { struct req_exec_amf_sync_data *req_exec = message; - int sz = sizeof (struct req_exec_amf_sync_data); + char *tmp = ((char*)message) + sizeof (struct req_exec_amf_sync_data); - SYNCTRACE ("rec %d bytes, type %d, ptr %p", - req_exec->header.size, req_exec->object_type, message); + SYNCTRACE ("rec %d bytes, ptr %p, type %d", req_exec->header.size, message, + req_exec->object_type); + +#if 0 + if (req_exec->protocol_version != AMF_PROTOCOL_VERSION) { + log_printf (LOG_ERR, "Error: Protocol version not supported"); + return; + } +#endif if (scsm.state != UPDATING_CLUSTER_MODEL) { return; @@ -1709,65 +1735,58 @@ static void message_handler_req_exec_amf_sync_data ( switch (req_exec->object_type) { case AMF_CLUSTER: - if ((scsm.cluster = amf_cluster_deserialize ( - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + if ((scsm.cluster = amf_cluster_deserialize (tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("Cluster '%s' deserialised", scsm.cluster->name.value); break; case AMF_NODE: - if ((scsm.node = amf_node_deserialize (scsm.cluster, - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + if ((scsm.node = amf_node_deserialize (scsm.cluster, tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("Node '%s' deserialised", scsm.node->name.value); break; case AMF_APPLICATION: - if ((scsm.app = amf_application_deserialize (scsm.cluster, - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + if ((scsm.app = amf_application_deserialize (scsm.cluster, tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("App '%s' deserialised", scsm.app->name.value); break; case AMF_SG: - if ((scsm.sg = amf_sg_deserialize (scsm.app, - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + if ((scsm.sg = amf_sg_deserialize (scsm.app, tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("SG '%s' deserialised", scsm.sg->name.value); break; case AMF_SU: - if ((scsm.su = amf_su_deserialize (scsm.sg, - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + if ((scsm.su = amf_su_deserialize (scsm.sg, tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("SU '%s' deserialised", scsm.su->name.value); break; case AMF_COMP: - if ((scsm.comp = amf_comp_deserialize (scsm.su, - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + if ((scsm.comp = amf_comp_deserialize (scsm.su, tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("Component '%s' deserialised", scsm.comp->name.value); break; case AMF_HEALTHCHECK: if ((scsm.healthcheck = amf_healthcheck_deserialize (scsm.comp, - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("Healthcheck '%s' deserialised", scsm.healthcheck->safHealthcheckKey.key); break; case AMF_SI: - if ((scsm.si = amf_si_deserialize (scsm.app, - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + if ((scsm.si = amf_si_deserialize (scsm.app, tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("SI '%s' deserialised", scsm.si->name.value); break; case AMF_SI_ASSIGNMENT: if ((scsm.si_assignment = amf_si_assignment_deserialize (scsm.si, - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("SI Ass '%s' deserialised", @@ -1775,14 +1794,14 @@ static void message_handler_req_exec_amf_sync_data ( break; case AMF_CSI: if ((scsm.csi = amf_csi_deserialize (scsm.si, - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("CSI '%s' deserialised", scsm.csi->name.value); break; case AMF_CSI_ASSIGNMENT: - if ((scsm.csi_assignment = amf_csi_assignment_deserialize (scsm.csi, - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + if ((scsm.csi_assignment = amf_csi_assignment_deserialize ( + scsm.csi, tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("CSI Ass '%s' deserialised", @@ -1790,7 +1809,7 @@ static void message_handler_req_exec_amf_sync_data ( break; case AMF_CSI_ATTRIBUTE: if ((scsm.csi_attribute = amf_csi_attribute_deserialize (scsm.csi, - ((char*) req_exec) + sz, req_exec->header.size - sz)) == NULL) { + tmp)) == NULL) { openais_exit_error (AIS_DONE_FATAL_ERR); } SYNCTRACE ("CSI Attr '%s' deserialised", @@ -1818,19 +1837,15 @@ static void message_handler_req_exec_amf_sync_ready ( amf_sync_activate (); } + + static void message_handler_req_exec_amf_cluster_start_tmo ( void *message, unsigned int nodeid) { if (scsm.state != NORMAL_OPERATION) { return; } - - if (nodeid == scsm.sync_master) { - dprintf ("Cluster startup timeout, assigning workload"); - amf_cluster_assign_workload (amf_cluster); - } else { - dprintf ("received CLUSTER_START_TMO from non sync-master, ignoring"); - } + amf_cluster_start_tmo_event (nodeid == scsm.sync_master, amf_cluster); } static void message_handler_req_exec_amf_sync_request ( @@ -1838,7 +1853,7 @@ static void message_handler_req_exec_amf_sync_request ( { struct req_exec_amf_sync_request *req_exec = message; clm_node_t *clm_node; - + SYNCTRACE ("from: %s, name: %s, state %s", totempg_ifaces_print (nodeid), req_exec->hostname, scsm_state_names[scsm.state]); @@ -1862,6 +1877,7 @@ static void message_handler_req_exec_amf_sync_request ( } } + /***************************************************************************** * Library Interface Implementation ****************************************************************************/ diff --git a/exec/amf.h b/exec/amf.h index 29c42b1e..e96d90d0 100644 --- a/exec/amf.h +++ b/exec/amf.h @@ -50,6 +50,8 @@ #include "timer.h" #include "aispoll.h" +#define AMF_PROTOCOL_VERSION 1 + enum clc_component_types { clc_component_sa_aware = 0, /* sa aware */ clc_component_proxied_pre = 1, /* proxied, pre-instantiable */ @@ -67,22 +69,28 @@ typedef enum { } amf_node_eesm_state_t; typedef enum { - NODE_ACSM_REPAIR_NEEDED, + APP_AC_UNINSTANTIATED = 1, + APP_AC_STARTING_SGS, + APP_AC_STARTED, + APP_AC_ASSIGNING_WORKLOAD, + APP_AC_WORKLOAD_ASSIGNED +} app_avail_control_state_t; + +typedef enum { + NODE_ACSM_REPAIR_NEEDED = 1, NODE_ACSM_ESCALLATION_LEVEL_0, NODE_ACSM_ESCALLATION_LEVEL_2, NODE_ACSM_ESCALLATION_LEVEL_3, NODE_ACSM_FAILING_FAST_REBOOTING_NODE, NODE_ACSM_FAILING_FAST_ACTIVATING_STANDBY_NODE, - NODE_ACSM_FAILING_GRACEFULLY_SWITCHING_OVER, + NODE_ACSM_FAILING_GRACEFULLY_SWITCHING_OVER, NODE_ACSM_FAILING_GRACEFULLY_FAILING_OVER, NODE_ACSM_FAILING_GRACEFULLY_REBOOTING_NODE, NODE_ACSM_LEAVING_SPONTANEOUSLY_FAILING_OVER, NODE_ACSM_LEAVING_SPONTANEOUSLY_WAITING_FOR_NODE_TO_JOIN, - NODE_ACSM_JOINING_STARTING_SERVICE_UNITS, - NODE_ACSM_JOINING_ASSIGNING_ACTIVE_WORKLOAD, - NODE_ACSM_JOINING_ASSIGNING_STANDBY_WORKLOAD - -} amf_node_acsm_state_t; + NODE_ACSM_JOINING_STARTING_APPLICATIONS, + NODE_ACSM_JOINING_ASSIGNING_WORKLOAD +} amf_node_acsm_state_t; typedef enum { SG_AC_Idle = 0, @@ -101,6 +109,7 @@ typedef enum { SG_AC_WaitingAfterOperationFailed } sg_avail_control_state_t; + typedef enum { SG_RT_FailoverSU = 1, SG_RT_FailoverNode @@ -147,14 +156,36 @@ struct amf_si_assignment; struct amf_csi_assignment; struct amf_healthcheck; -enum cluster_states { - CLUSTER_UNINSTANTIATED, - CLUSTER_STARTING_COMPONENTS, - CLUSTER_STARTING_WORKLOAD, - CLUSTER_STARTED -}; +typedef enum { + CLUSTER_AC_UNINSTANTIATED = 1, + CLUSTER_AC_STARTING_APPLICATIONS, + CLUSTER_AC_WAITING_OVER_TIME, + CLUSTER_AC_ASSIGNING_WORKLOAD, + CLUSTER_AC_STARTED, + CLUSTER_AC_TERMINATING_APPLICATIONS, + CLUSTER_AC_WORKLOAD_REMOVESD, + CLUSTER_AC_REMOVING_WORKLOAD, + CLUSTER_AC_DEACTIVATING_WORKLOAD, + CLUSTER_AC_QUISING +} cluster_avail_control_state_t; -struct amf_cluster { +typedef enum amf_cluster_event { + CLUSTER_SYNC_READY_EV = 1 +} amf_cluster_event_t; + + + +typedef struct amf_deferred { + struct amf_deferred *next; +} amf_deferred_t; + +typedef struct cluster_deferred { + amf_deferred_t defered_list; + struct amf_node *node; + amf_cluster_event_t event; +} cluster_deferredt_t; + +typedef struct amf_cluster { /* Configuration Attributes */ SaNameT name; SaUint32T saAmfClusterStartupTimeout; @@ -169,8 +200,9 @@ struct amf_cluster { /* Implementation */ poll_timer_handle timeout_handle; - enum cluster_states state; -}; + cluster_avail_control_state_t acsm_state; + cluster_deferredt_t *deferred_events_head; +} amf_cluster_t; typedef struct amf_node { /* Configuration Attributes */ @@ -195,7 +227,7 @@ typedef struct amf_node { amf_node_acsm_state_t acsm_state; } amf_node_t; -struct amf_application { +typedef struct amf_application { /* Configuration Attributes */ SaNameT name; @@ -213,16 +245,18 @@ struct amf_application { SaStringT clccli_path; struct amf_application *next; struct amf_node *node_to_start; -}; + app_avail_control_state_t acsm_state; +} amf_application_t; struct sg_recovery_scope { sg_recovery_type_t recovery_type; struct amf_si **sis; struct amf_su **sus; struct amf_comp *comp; + struct amf_node *node; }; -struct amf_sg { +typedef struct amf_sg { /* Configuration Attributes */ SaNameT name; saAmfRedundancyModelT saAmfSGRedundancyModel; @@ -249,18 +283,18 @@ struct amf_sg { /* Relations */ struct amf_application *application; - /* ordered list of SUs */ + /* ordered list of SUs */ struct amf_su *su_head; /* Implementation */ SaStringT clccli_path; struct amf_sg *next; sg_avail_control_state_t avail_state; - struct sg_recovery_scope recovery_scope; + struct sg_recovery_scope recovery_scope; struct amf_node *node_to_start; -}; +} amf_sg_t; -struct amf_su { +typedef struct amf_su { /* Configuration Attributes */ SaNameT name; SaUint32T saAmfSURank; @@ -272,12 +306,12 @@ struct amf_su { SaBoolT saAmfSUPreInstantiable; SaAmfOperationalStateT saAmfSUOperState; SaAmfAdminStateT saAmfSUAdminState; -// SaAmfReadinessStateT saAmfSUReadinessState; + /* SaAmfReadinessStateT saAmfSUReadinessState; */ SaAmfPresenceStateT saAmfSUPresenceState; -// SaNameT saAmfSUAssignedSIs; + /* SaNameT saAmfSUAssignedSIs; */ SaNameT saAmfSUHostedByNode; -/* SaUint32T saAmfSUNumCurrActiveSIs; */ -/* SaUint32T saAmfSUNumCurrStandbySIs; */ + /* SaUint32T saAmfSUNumCurrActiveSIs; */ + /* SaUint32T saAmfSUNumCurrStandbySIs; */ SaUint32T saAmfSURestartCount; /* Relations */ @@ -288,11 +322,11 @@ struct amf_su { su_restart_control_state_t restart_control_state; su_restart_control_state_t escalation_level_history_state; SaStringT clccli_path; - SaUint32T su_failover_cnt; /* missing in SAF specs? */ + SaUint32T su_failover_cnt; /* missing in SAF specs? */ struct amf_su *next; -}; +} amf_su_t; -struct amf_comp { +typedef struct amf_comp { /* Configuration Attributes */ SaNameT name; SaNameT **saAmfCompCsTypes; @@ -334,12 +368,12 @@ struct amf_comp { /* Runtime Attributes */ SaAmfOperationalStateT saAmfCompOperState; -// SaAmfReadinessStateT saAmfCompReadinessState; + /* SaAmfReadinessStateT saAmfCompReadinessState; */ SaAmfPresenceStateT saAmfCompPresenceState; SaUint32T saAmfCompRestartCount; -/* SaUint32T saAmfCompNumCurrActiveCsi; */ -/* SaUint32T saAmfCompNumCurrStandbyCsi; */ -/* SaNameT saAmfCompAssignedCsi; */ + /* SaUint32T saAmfCompNumCurrActiveCsi; */ + /* SaUint32T saAmfCompNumCurrStandbyCsi; */ + /*SaNameT saAmfCompAssignedCsi; */ SaNameT saAmfCompCurrProxyName; SaNameT **saAmfCompCurrProxiedNames; @@ -353,14 +387,15 @@ struct amf_comp { void *conn; enum clc_component_types comptype; struct amf_healthcheck *healthcheck_head; - - /** - * Flag that indicates of this component has a suspected error + poll_timer_handle instantiate_timeout_handle; + /* + * Flag that indicates of this component has a suspected error */ SaUint32T error_suspected; -}; +} amf_comp_t; -struct amf_healthcheck { + +typedef struct amf_healthcheck { /* Configuration Attributes */ SaAmfHealthcheckKeyT safHealthcheckKey; SaUint32T saAmfHealthcheckMaxDuration; @@ -377,9 +412,9 @@ struct amf_healthcheck { poll_timer_handle timer_handle_duration; poll_timer_handle timer_handle_period; -}; +} amf_healthcheck_t; -struct amf_si { +typedef struct amf_si { /* Configuration Attributes */ SaNameT name; SaNameT saAmfSIProtectedbySG; @@ -390,9 +425,9 @@ struct amf_si { /* Runtime Attributes */ SaAmfAdminStateT saAmfSIAdminState; -// SaAmfAssignmentStateT saAmfSIAssignmentState; -// SaUint32T saAmfSINumCurrActiveAssignments; -// SaUint32T saAmfSINumCurrStandbyAssignments; + /* SaAmfAssignmentStateT saAmfSIAssignmentState; */ + /* SaUint32T saAmfSINumCurrActiveAssignments; */ + /* SaUint32T saAmfSINumCurrStandbyAssignments; */ /* Relations */ struct amf_application *application; @@ -403,9 +438,9 @@ struct amf_si { /* Implementation */ struct amf_si *next; -}; +} amf_si_t; -struct amf_si_ranked_su { +typedef struct amf_si_ranked_su { /* Configuration Attributes */ SaNameT name; SaUint32T saAmfRank; @@ -417,9 +452,9 @@ struct amf_si_ranked_su { /* Implementation */ struct amf_si_ranked_su *su_next; struct amf_si_ranked_su *si_next; -}; +} amf_si_ranked_su_t; -struct amf_si_dependency { +typedef struct amf_si_dependency { /* Configuration Attributes */ SaNameT name; int saAmfToleranceTime; @@ -428,9 +463,9 @@ struct amf_si_dependency { /* Implementation */ struct amf_si_dependency *next; -}; +} amf_si_dependency_t; -struct amf_si_assignment { +typedef struct amf_si_assignment { /* Runtime Attributes */ SaNameT name; SaAmfHAStateT saAmfSISUHAState; @@ -444,9 +479,9 @@ struct amf_si_assignment { struct amf_si_assignment *next; void (*assumed_callback_fn) ( struct amf_si_assignment *si_assignment, int result); -}; +} amf_si_assignment_t; -struct amf_csi { +typedef struct amf_csi { /* Configuration Attributes */ SaNameT name; SaNameT saAmfCSTypeName; @@ -459,18 +494,18 @@ struct amf_csi { /* Implementation */ struct amf_csi *next; -}; +} amf_csi_t; -struct amf_csi_attribute { +typedef struct amf_csi_attribute { /* Configuration Attributes */ SaStringT name; SaStringT *value; /* Implementation */ struct amf_csi_attribute *next; -}; +} amf_csi_attribute_t; -struct amf_csi_assignment { +typedef struct amf_csi_assignment { /* Runtime Attributes */ SaNameT name; SaAmfHAStateT saAmfCSICompHAState; /* confirmed HA state */ @@ -483,7 +518,7 @@ struct amf_csi_assignment { SaAmfHAStateT requested_ha_state; struct amf_csi_assignment *next; struct amf_si_assignment *si_assignment; -}; +} amf_csi_assignment_t; enum amf_response_interfaces { AMF_RESPONSE_HEALTHCHECKCALLBACK = 1, @@ -495,14 +530,16 @@ enum amf_response_interfaces { enum amf_message_req_types { MESSAGE_REQ_EXEC_AMF_COMPONENT_REGISTER = 0, MESSAGE_REQ_EXEC_AMF_COMPONENT_ERROR_REPORT = 1, - MESSAGE_REQ_EXEC_AMF_CLC_CLEANUP_COMPLETED = 2, - MESSAGE_REQ_EXEC_AMF_HEALTHCHECK_TMO = 3, - MESSAGE_REQ_EXEC_AMF_RESPONSE = 4, - MESSAGE_REQ_EXEC_AMF_SYNC_START = 5, - MESSAGE_REQ_EXEC_AMF_SYNC_DATA = 6, - MESSAGE_REQ_EXEC_AMF_SYNC_READY = 7, - MESSAGE_REQ_EXEC_AMF_CLUSTER_START_TMO = 8, - MESSAGE_REQ_EXEC_AMF_SYNC_REQUEST = 9 + MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE = 2, + MESSAGE_REQ_EXEC_AMF_CLC_CLEANUP_COMPLETED = 3, + MESSAGE_REQ_EXEC_AMF_HEALTHCHECK_TMO = 4, + MESSAGE_REQ_EXEC_AMF_RESPONSE = 5, + MESSAGE_REQ_EXEC_AMF_SYNC_START = 6, + MESSAGE_REQ_EXEC_AMF_SYNC_DATA = 7, + MESSAGE_REQ_EXEC_AMF_SYNC_READY = 8, + MESSAGE_REQ_EXEC_AMF_CLUSTER_START_TMO = 9, + MESSAGE_REQ_EXEC_AMF_SYNC_REQUEST = 10, + MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE_TMO = 11 }; struct req_exec_amf_clc_cleanup_completed { @@ -516,6 +553,16 @@ struct req_exec_amf_healthcheck_tmo { SaAmfHealthcheckKeyT safHealthcheckKey; }; +struct req_exec_amf_comp_instantiate { + mar_req_header_t header; + SaNameT compName; +}; + +struct req_exec_amf_comp_instantiate_tmo { + mar_req_header_t header; + SaNameT compName; +}; + struct req_exec_amf_cluster_start_tmo { mar_req_header_t header; @@ -529,6 +576,9 @@ extern void amf_runtime_attributes_print (struct amf_cluster *cluster); extern int amf_enabled (struct objdb_iface_ver0 *objdb); extern void *_amf_malloc (size_t size, char *file, unsigned int line); #define amf_malloc(size) _amf_malloc ((size), __FILE__, __LINE__) +extern void *_amf_calloc (size_t nmemb, size_t size, char *file, + unsigned int line); +#define amf_calloc(nmemb,size) _amf_calloc ((nmemb), (size), __FILE__, __LINE__) extern const char *amf_admin_state (int state); extern const char *amf_op_state (int state); @@ -540,17 +590,20 @@ extern char *amf_serialize_SaNameT ( char *buf, int *size, int *offset, SaNameT *name); extern char *amf_serialize_SaStringT ( char *buf, int *size, int *offset, SaStringT str); +extern char *amf_serialize_SaUint16T ( + char *buf, int *size, int *offset, SaUint16T num); extern char *amf_serialize_SaUint32T ( char *buf, int *size, int *offset, SaUint32T num); extern char *amf_serialize_SaUint64T (char *buf, SaUint64T num); extern char *amf_serialize_opaque ( - char *buf, int *size, int *offset, char *cp, int cnt); + char *buf, int *size, int *offset, void *cp, int cnt); extern char *amf_deserialize_SaNameT (char *buf, SaNameT *name); extern char *amf_deserialize_SaStringT (char *buf, SaStringT *str); +extern char *amf_deserialize_SaUint16T (char *buf, SaUint16T *num); extern char *amf_deserialize_SaUint32T (char *buf, SaUint32T *num); extern char *amf_deserialize_SaUint64T (char *buf, SaUint64T *num); -extern char *amf_deserialize_opaque (char *buf, char *dst, int *cnt); -extern void amf_msg_mcast (int id, void *buf, size_t len); +extern char *amf_deserialize_opaque (char *buf, void *dst, int *cnt); +extern int amf_msg_mcast (int msg_id, void *buf, size_t len); extern void amf_util_init (void); /*===========================================================================*/ @@ -561,10 +614,12 @@ extern struct amf_node *amf_node_new (struct amf_cluster *cluster, char *name); extern void amf_node_init (void); extern void *amf_node_serialize (struct amf_node *node, int *len); extern struct amf_node *amf_node_deserialize ( - struct amf_cluster *cluster, char *buf, int size); + struct amf_cluster *cluster, char *buf); extern struct amf_node *amf_node_find (SaNameT *name); extern struct amf_node *amf_node_find_by_nodeid (unsigned int nodeid); extern struct amf_node *amf_node_find_by_hostname (const char *hostname); +extern void amf_node_sg_failed_over (struct amf_node *node, + struct amf_sg *sg_in); /* Event methods */ extern void amf_node_sync_ready (struct amf_node *node); @@ -601,10 +656,25 @@ extern void timer_function_node_probation_period_expired (void *node); extern void amf_cluster_init (void); extern struct amf_cluster *amf_cluster_new (void); extern void *amf_cluster_serialize (struct amf_cluster *cluster, int *len); -extern struct amf_cluster *amf_cluster_deserialize (char *buf, int size); +extern struct amf_cluster *amf_cluster_deserialize (char *buf); +extern int amf_cluster_applications_assigned (struct amf_cluster *cluster); +extern int amf_cluster_applications_started_with_no_starting_sgs ( + struct amf_cluster *cluster); + + + /* Event methods */ -extern void amf_cluster_sync_ready (struct amf_cluster *cluster); +extern void amf_cluster_start_tmo_event (int is_sync_master, + struct amf_cluster *cluster); +extern void amf_cluster_sync_ready (struct amf_cluster *cluster, + struct amf_node *node); +/** + * + * @param cluster + * @param app + */ +extern void amf_cluster_start_applications(struct amf_cluster *cluster); extern void amf_cluster_assign_workload (struct amf_cluster *cluster); /* Response event methods */ @@ -628,7 +698,8 @@ extern int amf_application_calc_and_set_si_dependency_level ( extern void *amf_application_serialize ( struct amf_application *application, int *len); extern struct amf_application *amf_application_deserialize ( - struct amf_cluster *cluster, char *buf, int size); + struct amf_cluster *cluster, char *buf); +extern int amf_application_all_sg_started (struct amf_application *app); /* Event methods */ extern void amf_application_start ( @@ -652,7 +723,7 @@ extern struct amf_sg *amf_sg_new (struct amf_application *app, char *name); extern void amf_sg_delete (struct amf_sg *sg); extern void *amf_sg_serialize (struct amf_sg *sg, int *len); extern struct amf_sg *amf_sg_deserialize ( - struct amf_application *app, char *buf, int size); + struct amf_application *app, char *buf); /** * Request SG to start (instantiate all SUs) @@ -669,7 +740,7 @@ extern void amf_sg_start (struct amf_sg *sg, struct amf_node *node); * @param sg * @param dependency_level */ -extern void amf_sg_assign_si (struct amf_sg *sg, int dependency_level); +extern int amf_sg_assign_si_req (struct amf_sg *sg, int dependency_level); extern void amf_sg_failover_node_req ( struct amf_sg *sg, struct amf_node *node); @@ -679,7 +750,6 @@ extern void amf_sg_failover_comp_req ( struct amf_sg *sg, struct amf_node *node); extern void amf_sg_switchover_node_req ( struct amf_sg *sg, struct amf_node *node); - /* Response event methods */ extern void amf_sg_su_state_changed ( struct amf_sg *sg, struct amf_su *su, SaAmfStateT type, int state); @@ -702,7 +772,7 @@ extern void amf_su_delete (struct amf_su *su); extern char *amf_su_dn_make (struct amf_su *su, SaNameT *name); extern void *amf_su_serialize (struct amf_su *su, int *len); extern struct amf_su *amf_su_deserialize ( - struct amf_sg *sg, char *buf, int size); + struct amf_sg *sg, char *buf); extern int amf_su_is_local (struct amf_su *su); extern struct amf_si_assignment *amf_su_get_next_si_assignment ( struct amf_su *su, const struct amf_si_assignment *si_assignment); @@ -752,7 +822,7 @@ extern struct amf_comp *amf_comp_find ( struct amf_cluster *cluster, SaNameT *name); extern void *amf_comp_serialize (struct amf_comp *comp, int *len); extern struct amf_comp *amf_comp_deserialize ( - struct amf_su *su, char *buf, int size); + struct amf_su *su, char *buf); extern void amf_comp_foreach_csi_assignment ( struct amf_comp *component, void (*foreach_fn)(struct amf_comp *component, @@ -766,6 +836,8 @@ extern SaAmfReadinessStateT amf_comp_get_saAmfCompReadinessState ( extern void amf_comp_instantiate (struct amf_comp *comp); extern void amf_comp_terminate (struct amf_comp *comp); extern void amf_comp_node_left (struct amf_comp *comp); +extern void amf_comp_instantiate_event(struct amf_comp *comp); +extern void amf_comp_instantiate_tmo_event (struct amf_comp *comp); /** * Request the component to assume a HA state @@ -832,10 +904,11 @@ extern SaAisErrorT amf_comp_healthcheck_confirm ( SaAmfHealthcheckKeyT *healthcheckKey, SaAisErrorT healthcheckResult); +extern amf_healthcheck_t *amf_healthcheck_new (struct amf_comp *comp); extern void *amf_healthcheck_serialize ( struct amf_healthcheck *healthcheck, int *len); extern struct amf_healthcheck *amf_healthcheck_deserialize ( - struct amf_comp *comp, char *buf, int size); + struct amf_comp *comp, char *buf); /*===========================================================================*/ /* amfsi.c */ @@ -848,11 +921,11 @@ extern void amf_si_delete (struct amf_si *si); extern int amf_si_calc_and_set_csi_dependency_level (struct amf_si *si); extern void *amf_si_serialize (struct amf_si *si, int *len); extern struct amf_si *amf_si_deserialize ( - struct amf_application *app, char *buf, int size); + struct amf_application *app, char *buf); extern void *amf_si_assignment_serialize ( struct amf_si_assignment *si_assignment, int *len); extern struct amf_si_assignment *amf_si_assignment_deserialize ( - struct amf_si *si, char *buf, int size); + struct amf_si *si, char *buf); #if 0 char *amf_si_assignment_dn_make (struct amf_su *su, struct amf_si *si, SaNameT *name); @@ -972,11 +1045,11 @@ extern struct amf_csi *amf_csi_find (struct amf_si *si, char *name); extern void amf_csi_delete (struct amf_csi *csi); extern void *amf_csi_serialize (struct amf_csi *csi, int *len); extern struct amf_csi *amf_csi_deserialize ( - struct amf_si *si, char *buf, int size); + struct amf_si *si, char *buf); extern void *amf_csi_assignment_serialize ( struct amf_csi_assignment *csi_assignment, int *len); extern struct amf_csi_assignment *amf_csi_assignment_deserialize ( - struct amf_csi *csi, char *buf, int size); + struct amf_csi *csi, char *buf); extern char *amf_csi_dn_make (struct amf_csi *csi, SaNameT *name); extern char *amf_csi_assignment_dn_make ( struct amf_csi_assignment *csi_assignment, SaNameT *name); @@ -986,9 +1059,9 @@ extern struct amf_csi_attribute *amf_csi_attribute_new (struct amf_csi *csi); extern void *amf_csi_attribute_serialize ( struct amf_csi_attribute *csi_attribute, int *len); extern struct amf_csi_attribute *amf_csi_attribute_deserialize ( - struct amf_csi *csi, char *buf, int size); + struct amf_csi *csi, char *buf); /* extern int sa_amf_grep(const char *string, char *pattern, size_t nmatch, */ -/* char** sub_match_array); */ +/* char** sub_match_array); */ extern int sa_amf_grep(const char *string, char *pattern, size_t nmatch, SaNameT *sub_match_array); diff --git a/exec/amfapp.c b/exec/amfapp.c index 878ab59c..cd8bcf6d 100644 --- a/exec/amfapp.c +++ b/exec/amfapp.c @@ -65,51 +65,182 @@ * (In the future this state will also be assumed after the LOCK_INSTANTIATION * administrative command.) * - * State STARTED is assumed when the application has been initially started and - * will in the future be re-assumed after the administrative command RESTART - * have been executed. + * State WORKLOAD_ASSIGNED is assumed when the application has been initially + * started and will in the future be re-assumed after the administrative + * command RESTART have been executed. * - */ -#include + * 1. AMF Synchronization Control State Machine + * ========================================= + * + * 1.1 State Transition Table + * + * State: Event: Action: New state: + * =========================================================================== + * UNINSTANTIATED start A6,A1 STARTING_SGS + * STARTING_SGS start [C4] A7 + * STARTING_SGS sg_started [C1] A8,A9 STARTED + * STARTED start A6,A1 STARTING_SGS + * STARTED assign_workload A3 ASSIGNING_WORKLOAD + * ASSIGNING_WORKLOAD assign_workload A7 + * ASSIGNING_WORKLOAD start A7 + * ASSIGNING_WORKLOAD sg_assigned [C2] A10,A9 WORKLOAD_ASSIGNED + * WORKLOAD_ASSIGNED start A6,A1 STARTING_SGS + * WORKLOAD_ASSIGNED assign_workload A3 ASSIGNING_WORKLOAD +* +* 1.2 State Description +* ===================== +* UNINSTANTIATED - No SUs within the SGs contained in the application have been +* instantiated. +* STARTING_SGS - Waiting for the contained SGs to start. +* STARTED - No SUs within the SGs contained in the application are in the +* process of beein instantiated. Either the SUs are instantiated or +* instantiation was not possible or instantiation has failed. +* ASSIGNING_WORKLOAD - Waiting for the contained SGs to indicate they have +* assigned workload to its SUs. +* WORKLOAD_ASSIGNED - at least some workload has been assigned to the SUs that +* are in-service. +* +* 1.3 Actions +* =========== +* A1 - [foreach sg in application] sg_start +* A2 - +* A3 - [foreach sg in application] sg_assign +* A4 - +* A5 - +* A6 - save value of received node parameter +* A7 - defer the event +* A8 - [node == NULL] cluster_application_started else node_application_started +* A9 - recall defered events +* A10 - [node == NULL] cluster_application_assigned else +* node_application_assigned +* +* 1.4 Guards +* ========== +* C1 - No sg has availability control state == INSTANTIATING_SERVICE_UNITS +* C2 - All sgs have availability control state == IDLE +* C3 - +* C4 - saved node value != received node value +*/ +#include #include "amf.h" #include "print.h" +#include "util.h" -static int all_sg_started (struct amf_application *app) +/****************************************************************************** + * Internal (static) utility functions + *****************************************************************************/ + +int no_su_is_instantiating (struct amf_application *app) { struct amf_sg *sg; + struct amf_su *su; int all_su_instantiated = 1; - /* TODO: spare SUs... */ + for (sg = app->sg_head; sg != NULL; sg = sg->next) { + for (su = sg->su_head; su != NULL; su = su->next) { + if (su->saAmfSUPresenceState == SA_AMF_PRESENCE_INSTANTIATING) { + all_su_instantiated = 0; + break; + } + } + } + return all_su_instantiated; + +} + +#ifdef COMPILE_OUT +static int all_sg_started (struct amf_application *app) +{ + struct amf_sg *sg; + int all_su_instantiated = 1; + + for (sg = app->sg_head; sg != NULL; sg = sg->next) { + if (sg->avail_state == SG_AC_InstantiatingServiceUnits) { + all_su_instantiated = 0; + break; + } + } + return all_su_instantiated; +} +#endif + +static int all_sg_assigned (struct amf_application *app) +{ + struct amf_sg *sg; + int all_sg_assigned = 1; for (sg = app->sg_head; sg != NULL; sg = sg->next) { if (sg->avail_state != SG_AC_Idle) { - all_su_instantiated = 0; + all_sg_assigned = 0; break; } } - - return all_su_instantiated; + return all_sg_assigned; } +/****************************************************************************** + * Event methods + *****************************************************************************/ void amf_application_start ( struct amf_application *app, struct amf_node *node) { struct amf_sg *sg; ENTER ("'%s'", app->name.value); + assert (app != NULL); + switch (app->acsm_state) { + case APP_AC_UNINSTANTIATED: + app->node_to_start = node; + app->acsm_state = APP_AC_STARTING_SGS; - app->node_to_start = node; + for (sg = app->sg_head; sg != NULL; sg = sg->next) { + amf_sg_start (sg, node); + } + break; + case APP_AC_STARTING_SGS: + if (app->node_to_start == node) { + for (sg = app->sg_head; sg != NULL; sg = sg->next) { + amf_sg_start (sg, node); + } + } else { + /* TODO: Save the start request until state == APP_AC_STARTED */ + log_printf (LOG_LEVEL_ERROR, "Request to start application" + " =%s in state = %d",app->name.value, app->acsm_state); + openais_exit_error (AIS_DONE_FATAL_ERR); + } + break; + case APP_AC_STARTED: + /* TODO: Recall defered events */ + app->node_to_start = node; + app->acsm_state = APP_AC_STARTING_SGS; + for (sg = app->sg_head; sg != NULL; sg = sg->next) { + amf_sg_start (sg, node); + } + break; + case APP_AC_ASSIGNING_WORKLOAD: + /* TODO: Save the start request until state == APP_AC_STARTED */ + log_printf (LOG_LEVEL_ERROR, "Request to start application" + " =%s in state = %d (should be defered)", + app->name.value, app->acsm_state); + openais_exit_error (AIS_DONE_FATAL_ERR); + break; + case APP_AC_WORKLOAD_ASSIGNED: + app->node_to_start = node; + app->acsm_state = APP_AC_STARTING_SGS; - /* TODO: Calculate and set SI dependency levels */ - - for (sg = app->sg_head; sg != NULL; sg = sg->next) { - amf_sg_start (sg, node); + for (sg = app->sg_head; sg != NULL; sg = sg->next) { + amf_sg_start (sg, node); + } + break; + default: + assert (0); + break; } } void amf_application_assign_workload ( - struct amf_application *app, struct amf_node *node) + struct amf_application *app, struct amf_node *node) { struct amf_sg *sg; @@ -118,30 +249,75 @@ void amf_application_assign_workload ( * Each dependency level should be looped and amf_sg_assign_si * called several times. */ - + assert (app != NULL); app->node_to_start = node; - for (sg = app->sg_head; sg != NULL; sg = sg->next) { - amf_sg_assign_si (sg, 0); + switch (app->acsm_state) { + case APP_AC_WORKLOAD_ASSIGNED: + TRACE1 ("APP_AC_WORKLOAD_ASSIGNED"); + /* Fall through */ + case APP_AC_STARTED: { + int posible_to_assign_si = 0; + app->acsm_state = APP_AC_ASSIGNING_WORKLOAD; + for (sg = app->sg_head; sg != NULL; sg = sg->next) { + if (amf_sg_assign_si_req (sg, 0)) { + posible_to_assign_si = 1; + } + } + if (posible_to_assign_si == 0) { + app->acsm_state = APP_AC_WORKLOAD_ASSIGNED; + } + break; + } + case APP_AC_ASSIGNING_WORKLOAD: + if (app->node_to_start == node) { + /*Calling object has violated the contract !*/ + assert (0); + } else { + /* + * TODO: Save the request to assign workload until state == + * WORKLOAD_ASSIGNED + */ + + log_printf (LOG_LEVEL_ERROR, "Request to assign workload to" + " application =%s in state = %d (should be defered)", + app->name.value, app->acsm_state); + openais_exit_error (AIS_DONE_FATAL_ERR); + } + break; + default: + /*Calling object has violated the contract !*/ + assert (0); + break; } } -void amf_application_init (void) -{ - log_init ("AMF"); -} - +/****************************************************************************** + * Event response methods + *****************************************************************************/ void amf_application_sg_started ( struct amf_application *app, struct amf_sg *sg, struct amf_node *node) { ENTER ("'%s'", app->name.value); - if (all_sg_started (app)) { - if (app->node_to_start == NULL) { - amf_cluster_application_started (app->cluster, app); - } else { - amf_node_application_started (app->node_to_start, app); - } + assert (app != NULL); + + switch (app->acsm_state) { + case APP_AC_STARTING_SGS: + if (no_su_is_instantiating (app)) { + app->acsm_state = APP_AC_STARTED; + if (app->node_to_start == NULL) { + amf_cluster_application_started (app->cluster, app); + } else { + amf_node_application_started (app->node_to_start, app); + } + } + break; + default: + log_printf (LOG_LEVEL_ERROR, "amf_application_sg_started()" + " called in state = %d", app->acsm_state); + openais_exit_error (AIS_DONE_FATAL_ERR); + break; } } @@ -149,22 +325,42 @@ void amf_application_sg_assigned ( struct amf_application *app, struct amf_sg *sg) { ENTER ("'%s'", app->name.value); - if (app->node_to_start == NULL) { - amf_cluster_application_workload_assigned (app->cluster, app); - } else { - amf_node_application_workload_assigned (app->node_to_start, app); + assert (app != NULL); + + switch (app->acsm_state) { + case APP_AC_ASSIGNING_WORKLOAD: + if (all_sg_assigned (app)){ + app->acsm_state = APP_AC_WORKLOAD_ASSIGNED; + if (app->node_to_start == NULL){ + amf_cluster_application_workload_assigned (app->cluster, app); + } else { + amf_node_application_workload_assigned (app->node_to_start, app); + } + } + break; + default: + log_printf (LOG_LEVEL_ERROR, "amf_application_sg_assigned()" + " called in state = %d", app->acsm_state); + openais_exit_error (AIS_DONE_FATAL_ERR); + break; } } -struct amf_application *amf_application_new (struct amf_cluster *cluster) +/****************************************************************************** + * General methods + *****************************************************************************/ +void amf_application_init (void) { - struct amf_application *app = amf_malloc (sizeof (struct amf_application)); + log_init ("AMF"); +} + +struct amf_application *amf_application_new (struct amf_cluster *cluster) { + struct amf_application *app = amf_calloc (1, sizeof (struct amf_application)); app->cluster = cluster; app->next = cluster->application_head; cluster->application_head = app; - app->sg_head = NULL; - app->si_head = NULL; + app->acsm_state = APP_AC_UNINSTANTIATED; return app; } @@ -173,6 +369,7 @@ void amf_application_delete (struct amf_application *app) struct amf_sg *sg; struct amf_si *si; + assert (app != NULL); for (sg = app->sg_head; sg != NULL;) { struct amf_sg *tmp = sg; sg = sg->next; @@ -184,7 +381,6 @@ void amf_application_delete (struct amf_application *app) si = si->next; amf_si_delete (tmp); } - free (app); } @@ -194,6 +390,7 @@ void *amf_application_serialize ( char *buf = NULL; int offset = 0, size = 0; + assert (app != NULL); TRACE8 ("%s", app->name.value); buf = amf_serialize_SaNameT (buf, &size, &offset, &app->name); @@ -203,23 +400,23 @@ void *amf_application_serialize ( buf, &size, &offset, app->saAmfApplicationCurrNumSG); buf = amf_serialize_SaStringT ( buf, &size, &offset, app->clccli_path); + buf = amf_serialize_SaUint32T ( + buf, &size, &offset, app->acsm_state); *len = offset; - return buf; } struct amf_application *amf_application_deserialize ( - struct amf_cluster *cluster, char *buf, int size) -{ + struct amf_cluster *cluster, char *buf) { char *tmp = buf; - struct amf_application *app; + struct amf_application *app = amf_application_new (cluster); - app = amf_application_new (cluster); tmp = amf_deserialize_SaNameT (tmp, &app->name); tmp = amf_deserialize_SaUint32T (tmp, &app->saAmfApplicationAdminState); tmp = amf_deserialize_SaUint32T (tmp, &app->saAmfApplicationCurrNumSG); tmp = amf_deserialize_SaStringT (tmp, &app->clccli_path); + tmp = amf_deserialize_SaUint32T (tmp, &app->acsm_state); return app; } @@ -229,6 +426,7 @@ struct amf_application *amf_application_find ( { struct amf_application *app; + assert (cluster != NULL); for (app = cluster->application_head; app != NULL; app = app->next) { if (app->name.length == strlen(name) && diff --git a/exec/amfcluster.c b/exec/amfcluster.c index b2d78ad8..8e57aebc 100644 --- a/exec/amfcluster.c +++ b/exec/amfcluster.c @@ -64,9 +64,71 @@ * * State STARTED is assumed when the cluster has been initially started and * will in the future be re-assumed after the administrative command RESTART - * have been executed. + * have been executed. + * + * 1. Cluster Availability Control State Machine + * ============================================= + * + * 1.1 State Transition Table + * + * State: Event: Action: New state: + * =========================================================================== + * UNINSTANTIATED sync_ready [C1] A2,A1 STARTING_APPS + * STARTING_APPS sync_ready A2,A1 STARTING_APPS + * STARTING_APPS app_started [C3] A7,A3 ASSIGNING_WORKLOAD + * STARTING_APPS local_timer_expired A8 STARTING_APPS + * STARTING_APPS time_out [C2] A7,A3 ASSIGNING_WORKLOAD + * STARTING_APPS time_out A7 WAITING_OVERTIME + * WAITING_OVERTIME sync_ready A4 WAITING_OVERTIME + * WAITING_OVERTIME app_started A3 ASSIGNING_WORKLOAD + * ASSIGNING_WORKLOAD sync_ready A4 ASSIGNING_WORKLOAD + * ASSIGNING_WORKLOAD app_assigned [C4] A6 STARTED + * STARTED sync_ready A8 STARTED + * + * 1.2 State Description + * ===================== + * UNINSTANTIATED - No SUs within any SG in any Application is instantiated. + * STARTING_APPLICATIONS - All applications have been requested to start + * their contained SGs, which in its turn has requested + * their contained SUs to instantiate all their + * components. The cluster startup timer is running. + * WAITING_OVERTIME - The cluster startup timer has expired but all + * applications have yet not responded that they have been + * started. Cluster will wait infinitely for the + * applications to respond. It is correct to do so even when + * the startup timer has expired, because the applications + * will report they are started as soon as there is no + * attempt to instantiate any of its components pending, + * because attempts to instantiate a component can not go on + * forever, see saAmfCompInstantiateTimeout, + * saAmfCompNumMaxInstantiateWithoutDelay and + * saAmfCompNumMaxInstantiateWithDelay. + * ASSIGNING_WORKLOAD - All applications have been requested to assign it's + * specified workload to it's service units according to + * the redundancy model specified by it's SGs. + * STARTED - A best effort has been made to instatiate the components of all + * applications and assign the specified workload as close as possible + * to what is described in the configuration. + * + * 1.3 Actions + * =========== + * A1 - [foreach application in cluster]/start application + * A2 - start cluster startup timer + * A3 - [foreach application in cluster]/assign workload to application + * A4 - defer sync_ready event + * A5 - forward sync_ready to appropriate node object + * A6 - recall deferred event + * A7 - stop node local instance of cluster startup timer + * A8 - multicast 'cluster startup timer time-out' event + * + * 1.4 Guards + * ========== + * C1 - No sg has availability control state == INSTANTIATING_SERVICE_UNITS + * C2 - No application has Availability Control state == STARTING_SGS + * C3 - All SGs are fully instantiated */ + #include #include @@ -76,13 +138,15 @@ #include "main.h" #include "service.h" + /** - * Determine if all applications are started + * Determine if all applications are started so that all + * SUs is in SA_AMF_PRESENCE_INSTANTIATED prsense state * @param cluster * * @return int */ -static int all_applications_started (struct amf_cluster *cluster) +static int cluster_applications_started_instantiated (struct amf_cluster *cluster) { int all_started = 1; struct amf_application *app; @@ -100,64 +164,204 @@ static int all_applications_started (struct amf_cluster *cluster) } } -done: + done: return all_started; } -static void timer_function_cluster_assign_workload_tmo (void *_cluster) +static int cluster_applications_are_starting_sgs(struct amf_cluster *cluster) { - struct req_exec_amf_cluster_start_tmo req; - struct iovec iovec; - - ENTER (""); - - req.header.size = sizeof (struct req_exec_amf_cluster_start_tmo); - req.header.id = SERVICE_ID_MAKE (AMF_SERVICE, - MESSAGE_REQ_EXEC_AMF_CLUSTER_START_TMO); - - iovec.iov_base = (char *)&req; - iovec.iov_len = sizeof (req); - - assert (totempg_groups_mcast_joined (openais_group_handle, - &iovec, 1, TOTEMPG_AGREED) == 0); + struct amf_application *application = 0; + int is_starting_sgs = 0; + for (application = cluster->application_head; application != NULL; + application = application->next) { + if (application->acsm_state == APP_AC_STARTING_SGS) { + is_starting_sgs = 1; + break; + } + } + return is_starting_sgs; } -void amf_cluster_sync_ready (struct amf_cluster *cluster) + + +static void acsm_cluster_enter_assigning_workload (struct amf_cluster *cluster) +{ + log_printf(LOG_NOTICE, + "Cluster: all applications started, assigning workload."); + cluster->acsm_state = CLUSTER_AC_ASSIGNING_WORKLOAD; + amf_cluster_assign_workload (cluster); +} + +static void timer_function_cluster_assign_workload_tmo (void *cluster) +{ + struct req_exec_amf_cluster_start_tmo req; + ((struct amf_cluster*)cluster)->timeout_handle = 0;; + + ENTER (""); + + amf_msg_mcast (MESSAGE_REQ_EXEC_AMF_CLUSTER_START_TMO, &req, sizeof(req)); +} + +static inline void stop_cluster_startup_timer (struct amf_cluster *cluster) +{ + if (cluster->timeout_handle) { + dprintf ("Stop cluster startup timer"); + poll_timer_delete (aisexec_poll_handle, + cluster->timeout_handle); + cluster->timeout_handle = 0; + } +} + +static void start_cluster_startup_timer (struct amf_cluster *cluster) +{ + poll_timer_add (aisexec_poll_handle, + cluster->saAmfClusterStartupTimeout, + cluster, + timer_function_cluster_assign_workload_tmo, + &cluster->timeout_handle); +} + +static inline void amf_cluster_enter_starting_applications ( + struct amf_cluster *cluster) +{ + ENTER (""); + start_cluster_startup_timer (cluster); + amf_cluster->acsm_state = CLUSTER_AC_STARTING_APPLICATIONS; + amf_cluster_start_applications (cluster); +} + +static void add_assign_workload_deferred_list (struct amf_cluster *cluster, + struct amf_node *node, amf_cluster_event_t event) +{ + cluster_deferredt_t *tmp_deferred_list = + calloc (1, sizeof (cluster_deferredt_t)); + + tmp_deferred_list->defered_list.next = + (amf_deferred_t*) cluster->deferred_events_head; + cluster->deferred_events_head = tmp_deferred_list; +} + + +static void defer_assigning_worload_to_node (struct amf_node *node, + amf_cluster_event_t event) +{ + + add_assign_workload_deferred_list(amf_cluster, node, event); +} + +static amf_deferred_t *recall_defered_cluster_events ( + struct amf_cluster *cluster) +{ + return (amf_deferred_t*) cluster->deferred_events_head; +} + + +static void acsm_cluster_enter_started (struct amf_cluster *cluster) +{ + + amf_deferred_t *deferred_events; + + amf_cluster->acsm_state = CLUSTER_AC_STARTED; + + for (deferred_events = recall_defered_cluster_events (cluster); + deferred_events != NULL; + deferred_events = deferred_events->next){ + + amf_node_sync_ready (((cluster_deferredt_t*)deferred_events)->node); + } +} + +int amf_cluster_applications_started_with_no_starting_sgs (struct amf_cluster *cluster) +{ + return !cluster_applications_are_starting_sgs (cluster); +} + +void amf_cluster_start_tmo_event (int is_sync_masterm, + struct amf_cluster *cluster) +{ + ENTER ("acsm_state = %d", amf_cluster->acsm_state); + + stop_cluster_startup_timer (cluster); + + switch (cluster->acsm_state) { + case CLUSTER_AC_STARTING_APPLICATIONS: + if (cluster_applications_are_starting_sgs (cluster)) { + dprintf ("Cluster startup timeout, start waiting over time"); + amf_cluster->acsm_state = CLUSTER_AC_WAITING_OVER_TIME; + } else { + dprintf ("Cluster startup timeout, assigning workload"); + acsm_cluster_enter_assigning_workload (cluster); + } + break; + case CLUSTER_AC_ASSIGNING_WORKLOAD: + /* ignore cluster startup timer expiration */ + case CLUSTER_AC_STARTED: + /* ignore cluster startup timer expiration */ + case CLUSTER_AC_WAITING_OVER_TIME: + /* ignore cluster startup timer expiration */ + break; + + default: + log_printf(LOG_LEVEL_ERROR, "Cluster timout expired in wrong cluster" + " state = %d", cluster->acsm_state); + assert(0); + break; + } +} + + +/** + * Start all applications in the cluster and start + * the cluster startup timeout. + * @param cluster + * @param app + */ +void amf_cluster_start_applications(struct amf_cluster *cluster) { struct amf_application *app; + for (app = cluster->application_head; app != NULL; app = app->next) { + amf_application_start (app, NULL); + } +} + + + +void amf_cluster_sync_ready (struct amf_cluster *cluster, struct amf_node *node) +{ log_printf(LOG_NOTICE, "Cluster: starting applications."); - - switch (amf_cluster->state) { - case CLUSTER_UNINSTANTIATED: { - amf_cluster->state = CLUSTER_STARTING_COMPONENTS; - for (app = cluster->application_head; app != NULL; app = app->next) { - amf_application_start (app, NULL); + switch (amf_cluster->acsm_state) { + case CLUSTER_AC_UNINSTANTIATED: + if (amf_cluster->saAmfClusterAdminState == SA_AMF_ADMIN_UNLOCKED) { + amf_cluster_enter_starting_applications (cluster); } - poll_timer_add (aisexec_poll_handle, - cluster->saAmfClusterStartupTimeout, - cluster, - timer_function_cluster_assign_workload_tmo, - &cluster->timeout_handle); - break; - } - case CLUSTER_STARTING_COMPONENTS: { + case CLUSTER_AC_STARTING_APPLICATIONS: + amf_cluster_enter_starting_applications(cluster); break; - } - case CLUSTER_STARTING_WORKLOAD: { + case CLUSTER_AC_ASSIGNING_WORKLOAD: + defer_assigning_worload_to_node (node, CLUSTER_SYNC_READY_EV); log_printf (LOG_LEVEL_ERROR, "Sync ready not implemented in " - "cluster state: %u\n", amf_cluster->state); + "cluster state: %u\n", amf_cluster->acsm_state); assert (0); break; - } - case CLUSTER_STARTED: { - assert (0); + case CLUSTER_AC_WAITING_OVER_TIME: + /* TODO: Defer the implementation of assigning + * workload to those syncronized nodes to CLUSTER_AC_STARTED + * state. + */ + defer_assigning_worload_to_node (node, CLUSTER_SYNC_READY_EV); break; - } + case CLUSTER_AC_STARTED: + TRACE1 ("Node sync ready sent from cluster in " + "CLUSTER_AC_STARTED state"); + amf_node_sync_ready (node); + break; + default: assert (0); + break; } } @@ -167,95 +371,120 @@ void amf_cluster_init (void) log_init ("AMF"); } + + void amf_cluster_application_started ( struct amf_cluster *cluster, struct amf_application *application) { ENTER ("application '%s' started", application->name.value); - if (all_applications_started (cluster)) { - log_printf(LOG_NOTICE, - "Cluster: all applications started, assigning workload."); - - if (cluster->timeout_handle) { - poll_timer_delete (aisexec_poll_handle, cluster->timeout_handle); - cluster->timeout_handle = 0; + switch (cluster->acsm_state) { + case CLUSTER_AC_STARTING_APPLICATIONS: + if (cluster_applications_started_instantiated (cluster)) { + stop_cluster_startup_timer (cluster); + acsm_cluster_enter_assigning_workload (cluster); + } + break; + case CLUSTER_AC_WAITING_OVER_TIME: + if (amf_cluster_applications_started_with_no_starting_sgs (cluster)) { + acsm_cluster_enter_assigning_workload (cluster); + } + break; + default: { + log_printf (LOG_ERR,"Error invalid cluster availability state %d", + cluster->acsm_state); + openais_exit_error(cluster->acsm_state); + break; } - cluster->state = CLUSTER_STARTING_WORKLOAD; - amf_cluster_assign_workload (cluster); } } -struct amf_cluster *amf_cluster_new (void) -{ - struct amf_cluster *cluster = calloc (1, sizeof (struct amf_cluster)); - - if (cluster == NULL) { - openais_exit_error (AIS_DONE_OUT_OF_MEMORY); - } +struct amf_cluster *amf_cluster_new (void) { + struct amf_cluster *cluster = amf_calloc (1, sizeof (struct amf_cluster)); cluster->saAmfClusterStartupTimeout = -1; cluster->saAmfClusterAdminState = SA_AMF_ADMIN_UNLOCKED; - + cluster->deferred_events_head = 0; + cluster->acsm_state = CLUSTER_AC_UNINSTANTIATED; return cluster; } +int amf_cluster_applications_assigned (struct amf_cluster *cluster) +{ + struct amf_application *app = 0; + int is_all_application_assigned = 1; + + for (app = cluster->application_head; app != NULL; app = app->next) { + if (app->acsm_state != APP_AC_WORKLOAD_ASSIGNED) { + is_all_application_assigned = 0; + break; + } + } + return is_all_application_assigned; +} + void amf_cluster_application_workload_assigned ( struct amf_cluster *cluster, struct amf_application *app) { - log_printf (LOG_NOTICE, "Cluster: application %s assigned.", - app->name.value); - amf_cluster->state = CLUSTER_STARTED; + ENTER (""); + switch (cluster->acsm_state) { + case CLUSTER_AC_ASSIGNING_WORKLOAD: + log_printf (LOG_NOTICE, "Cluster: application %s assigned.", + app->name.value); + if (amf_cluster_applications_assigned (cluster)) { + acsm_cluster_enter_started (cluster); + } + break; + default: + assert(0); + break; + } } void *amf_cluster_serialize (struct amf_cluster *cluster, int *len) { - int objsz = sizeof (struct amf_cluster); - struct amf_cluster *copy; + char *buf = NULL; + int offset = 0, size = 0; - copy = amf_malloc (objsz); - memcpy (copy, cluster, objsz); - *len = objsz; - TRACE8 ("%s", copy->name.value); + TRACE8 ("%s", cluster->name.value); - return copy; + buf = amf_serialize_SaNameT (buf, &size, &offset, &cluster->name); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + cluster->saAmfClusterStartupTimeout); + buf = amf_serialize_SaNameT (buf, &size, &offset, + &cluster->saAmfClusterClmCluster); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + cluster->saAmfClusterAdminState); + buf = amf_serialize_SaUint32T (buf, &size, &offset, cluster->acsm_state); + + *len = offset; + + return buf; } -struct amf_cluster *amf_cluster_deserialize (char *buf, int size) -{ - int objsz = sizeof (struct amf_cluster); +struct amf_cluster *amf_cluster_deserialize (char *buf) { + char *tmp = buf; + struct amf_cluster *cluster = amf_cluster_new (); - if (objsz > size) { - return NULL; - } else { - struct amf_cluster *obj = amf_cluster_new (); - if (obj == NULL) { - return NULL; - } - memcpy (obj, buf, objsz); - TRACE8 ("%s", obj->name.value); - obj->node_head = NULL; - obj->application_head = NULL; - obj->timeout_handle = 0; + tmp = amf_deserialize_SaNameT (tmp, &cluster->name); + tmp = amf_deserialize_SaUint32T (tmp, &cluster->saAmfClusterStartupTimeout); + tmp = amf_deserialize_SaNameT (tmp, &cluster->saAmfClusterClmCluster); + tmp = amf_deserialize_SaUint32T (tmp, &cluster->saAmfClusterAdminState); + tmp = amf_deserialize_SaUint32T (tmp, &cluster->acsm_state); - return obj; - } + return cluster; } void amf_cluster_assign_workload (struct amf_cluster *cluster) { struct amf_application *app; - ENTER (""); - cluster->state = CLUSTER_STARTING_WORKLOAD; - - if (cluster->timeout_handle) { - poll_timer_delete (aisexec_poll_handle, cluster->timeout_handle); - cluster->timeout_handle = 0; - } - for (app = cluster->application_head; app != NULL; app = app->next) { amf_application_assign_workload (app, NULL); } } + + + diff --git a/exec/amfcomp.c b/exec/amfcomp.c index 6d23a754..5f3447c3 100644 --- a/exec/amfcomp.c +++ b/exec/amfcomp.c @@ -482,6 +482,24 @@ static void *clc_command_run (void *context) return (0); } +static void amf_comp_instantiate_tmo (void *component) +{ + SaNameT compName; + amf_comp_dn_make (component, &compName); + + amf_msg_mcast (MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE_TMO, + &compName, sizeof (SaNameT)); +} + +static void start_component_instantiate_timer (struct amf_comp *component) +{ + poll_timer_add (aisexec_poll_handle, + component->saAmfCompInstantiateTimeout, + component, + amf_comp_instantiate_tmo, + &component->instantiate_timeout_handle); +} + /* * Instantiate possible operations */ @@ -506,6 +524,7 @@ static int clc_cli_instantiate (struct amf_comp *comp) if (res != 0) { log_printf (LOG_LEVEL_ERROR, "pthread_create failed: %d", res); } + start_component_instantiate_timer (comp); // clc_command_run_data->completion_callback (clc_command_run_data); // TODO error code from pthread_create @@ -683,11 +702,8 @@ struct amf_healthcheck *amf_comp_find_healthcheck ( struct amf_comp *amf_comp_new(struct amf_su *su, char *name) { struct amf_comp *tail = su->comp_head; - struct amf_comp *comp = calloc (1, sizeof (struct amf_comp)); + struct amf_comp *comp = amf_calloc (1, sizeof (struct amf_comp)); - if (comp == NULL) { - openais_exit_error(AIS_DONE_OUT_OF_MEMORY); - } while (tail != NULL) { if (tail->next == NULL) { break; @@ -1184,17 +1200,37 @@ static void lib_csi_set_request ( free(p); } +static void stop_component_instantiate_timer (struct amf_comp *component) +{ + if (component->instantiate_timeout_handle) { + dprintf ("Stop cluster startup timer"); + poll_timer_delete (aisexec_poll_handle, + component->instantiate_timeout_handle); + component->instantiate_timeout_handle = 0; + } +} + + SaAisErrorT amf_comp_register (struct amf_comp *comp) { TRACE2("Exec comp register '%s'", comp->name.value); - - if (comp->saAmfCompPresenceState == SA_AMF_PRESENCE_RESTARTING) { - comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATED); - } else if (comp->saAmfCompPresenceState == SA_AMF_PRESENCE_INSTANTIATING) { - amf_comp_operational_state_set (comp, SA_AMF_OPERATIONAL_ENABLED); - comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATED); - } else { - assert (0); + stop_component_instantiate_timer (comp); + + switch (comp->saAmfCompPresenceState) { + case SA_AMF_PRESENCE_RESTARTING: + comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATED); + break; + case SA_AMF_PRESENCE_INSTANTIATING: + amf_comp_operational_state_set (comp, SA_AMF_OPERATIONAL_ENABLED); + comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATED); + break; + case SA_AMF_PRESENCE_INSTANTIATION_FAILED: + /* ignore due to instantitate timeout a while ago */ + break; + default: + assert (0); + break; + } return SA_AIS_OK; @@ -1367,23 +1403,98 @@ SaAisErrorT amf_comp_healthcheck_stop ( return error; } + /** * Instantiate a component * @param comp */ void amf_comp_instantiate (struct amf_comp *comp) { - int res = 0; ENTER ("'%s' SU '%s'", getSaNameT (&comp->name), getSaNameT (&comp->su->name)); - if (comp->saAmfCompPresenceState != SA_AMF_PRESENCE_RESTARTING) { - comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATING); + switch (comp->saAmfCompPresenceState) { + case SA_AMF_PRESENCE_RESTARTING: + /* fall through */ + case SA_AMF_PRESENCE_UNINSTANTIATED: + if (amf_su_is_local (comp->su)) { + TRACE1("Send instantiate event for comp '%s' from host %s", + comp->name.value, comp->su->saAmfSUHostedByNode.value); + SaNameT compName; + amf_comp_dn_make (comp, &compName); + amf_msg_mcast (MESSAGE_REQ_EXEC_AMF_COMPONENT_INSTANTIATE, + &compName, sizeof (SaNameT)); + } + break; + default: + dprintf("Instantiate ignored in Component presence state %d", + comp->saAmfCompPresenceState); + break; } +} - if (amf_su_is_local (comp->su)) { - res = clc_interfaces[comp->comptype]->instantiate (comp); +void amf_comp_instantiate_tmo_event (struct amf_comp *comp) +{ + ENTER ("Comp instantiate timeout after %d seconds '%s' '%s'", + comp->saAmfCompInstantiateTimeout, comp->su->name.value, + comp->name.value); + + switch (comp->saAmfCompPresenceState) { + case SA_AMF_PRESENCE_RESTARTING: + amf_comp_operational_state_set (comp, SA_AMF_OPERATIONAL_DISABLED); + comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATION_FAILED); + + break; + case SA_AMF_PRESENCE_INSTANTIATING: + + amf_comp_operational_state_set (comp, SA_AMF_OPERATIONAL_DISABLED); + comp_presence_state_set (comp, SA_AMF_PRESENCE_INSTANTIATION_FAILED); + + break; + default: + assert (0); + break; + } +} + + + +void amf_comp_instantiate_event (struct amf_comp *component) +{ + int res; + ENTER (""); + switch (component->saAmfCompPresenceState) { + case SA_AMF_PRESENCE_INSTANTIATING: + case SA_AMF_PRESENCE_INSTANTIATED: + case SA_AMF_PRESENCE_TERMINATING: + case SA_AMF_PRESENCE_INSTANTIATION_FAILED: + case SA_AMF_PRESENCE_TERMINATION_FAILED: + dprintf("Instantiate ignored in Component presence state %d", + component->saAmfCompPresenceState); + break; + case SA_AMF_PRESENCE_UNINSTANTIATED: + + comp_presence_state_set (component, SA_AMF_PRESENCE_INSTANTIATING); + amf_su_comp_state_changed(component->su, + component,SA_AMF_PRESENCE_STATE,SA_AMF_PRESENCE_INSTANTIATING); + if (amf_su_is_local (component->su)) { + res = clc_interfaces[component->comptype]->instantiate ( + component); + } + + break; + case SA_AMF_PRESENCE_RESTARTING: + if (amf_su_is_local (component->su)) { + res = clc_interfaces[component->comptype]->instantiate ( + component); + } + break; + default: + dprintf("Component presence state %d", + component->saAmfCompPresenceState); + assert (0); + break; } } @@ -1953,14 +2064,12 @@ void *amf_comp_serialize (struct amf_comp *component, int *len) * * @return struct amf_comp* */ -struct amf_comp *amf_comp_deserialize (struct amf_su *su, char *buf, int size) +struct amf_comp *amf_comp_deserialize (struct amf_su *su, char *buf) { char *tmp = buf; - struct amf_comp *component; int i; SaUint32T cnt; - - component = amf_comp_new (su, ""); + struct amf_comp *component = amf_comp_new (su, ""); tmp = amf_deserialize_SaNameT (tmp, &component->name); tmp = amf_deserialize_SaUint32T (tmp, &cnt); @@ -2063,33 +2172,51 @@ struct amf_comp *amf_comp_deserialize (struct amf_su *su, char *buf, int size) void *amf_healthcheck_serialize (struct amf_healthcheck *healthcheck, int *len) { - int objsz = sizeof (struct amf_healthcheck); - struct amf_healthcheck *copy; + char *buf = NULL; + int offset = 0, size = 0; - copy = amf_malloc (objsz); - memcpy (copy, healthcheck, objsz); - *len = objsz; + TRACE8 ("%s", healthcheck->safHealthcheckKey.key); - return copy; + buf = amf_serialize_opaque (buf, &size, &offset, + &healthcheck->safHealthcheckKey.key, SA_AMF_HEALTHCHECK_KEY_MAX); + buf = amf_serialize_SaUint16T (buf, &size, &offset, + healthcheck->safHealthcheckKey.keyLen); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + healthcheck->saAmfHealthcheckMaxDuration); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + healthcheck->saAmfHealthcheckPeriod); + + *len = offset; + + return buf; } struct amf_healthcheck *amf_healthcheck_deserialize ( - struct amf_comp *comp, char *buf, int size) + struct amf_comp *comp, char *buf) { - int objsz = sizeof (struct amf_healthcheck); + char *tmp = buf; + int cnt; + amf_healthcheck_t *healthcheck = amf_healthcheck_new (comp); - if (objsz > size) { - return NULL; - } else { - struct amf_healthcheck *obj = amf_malloc (sizeof (struct amf_healthcheck)); - memcpy (obj, buf, objsz); - obj->active = 0; - obj->timer_handle_duration = 0; - obj->timer_handle_period = 0; - obj->comp = comp; - obj->next = comp->healthcheck_head; - comp->healthcheck_head = obj; - return obj; - } + tmp = amf_deserialize_opaque (tmp, &healthcheck->safHealthcheckKey.key, &cnt); + tmp = amf_deserialize_SaUint16T (tmp, + &healthcheck->safHealthcheckKey.keyLen); + tmp = amf_deserialize_SaUint32T (tmp, + &healthcheck->saAmfHealthcheckMaxDuration); + tmp = amf_deserialize_SaUint32T (tmp, + &healthcheck->saAmfHealthcheckPeriod); + + return healthcheck; +} + +amf_healthcheck_t *amf_healthcheck_new (struct amf_comp *comp) +{ + amf_healthcheck_t *healthcheck = amf_calloc (1, sizeof (amf_healthcheck_t)); + + healthcheck->comp = comp; + healthcheck->next = comp->healthcheck_head; + comp->healthcheck_head = healthcheck; + + return healthcheck; } diff --git a/exec/amfnode.c b/exec/amfnode.c index 5abd53e8..d335a8f8 100644 --- a/exec/amfnode.c +++ b/exec/amfnode.c @@ -72,14 +72,13 @@ * - MANAGING_HOSTED_SERVICE_UNITS ( * . FAILING_FAST (REBOOTING_NODE and ACTIVATING_STANDBY_NODE) * . FAILING_GRACEFULLY (SWITCHING_OVER, FAILING_OVER and REBOOTING_NODE) - * . LEAVING_SPONTANEOUSLY (DEACTIVATE_DEPENDENT and + * . LEAVING_SPONTANEOUSLY (FAILING_OVER and * WAITING_FOR_NODE_TO_JOIN) - * . JOINING (STARTING_SERVICE_UNITS, ASSIGNING_ACTIVE_WORKLOAD and - * ASSIGNING_STANDBY_WORKLOAD) + * . JOINING (STARTING_SERVICE_UNITS and ASSIGNING_STANDBY_WORKLOAD) * - * REPAIR_NEEDED indicates the node needs a manual repair and this state will - * maintained until the administrative command REPAIRED is entered - * (implemented in the future) + * REPAIR_NEEDED indicates the node needs a manual repair and this state will be + * maintained until the administrative command REPAIRED is entered (implemented + * in the future) * * ESCALLATION_LEVEL is a kind of idle state where no actions are performed * and used only to remember the escallation level. Substate LEVEL_0 indicates @@ -105,7 +104,66 @@ * JOINING state handles the start of a node in all cases except cluster start, * which is handled by the CLUSTER class. * - */ + * 1. Node Availability Control State Machine + * ========================================== + * + * 1.1 State Transition Table + * + * State: Event: Action: New state: + * ============================================================================ + * ESCALATION_LEVEL_0 node_sync_ready A6 JOINING_STARTING_APPLS + * ESCALATION_LEVEL_0 node_leave A9,A8 LEAVING_SP_FAILING_OVER + * JOINING_STARTING_APPLS appl_started [C4] A7 JOINING_ASSIGNING_WL + * JOINING_ASSIGNING_WL appl_assigned [C5] ESCALATION_LEVEL_0 + * LEAVING_SP_FAILING_OVER sg_failed_over [C1] LEAVING_SP_WAIT_FOR_JOIN + * LEAVING_SP_WAIT_FOR_JOIN node_sync_ready A6 JOINING_STARTING_APPLS + * + * 1.2 State Description + * ===================== + * ESCALATION_LEVEL_0 - Node is synchronized and idle. + * JOINING_STARTING_APPLS - JOINING_STARTING_APPLICATIONS + * Node has ordered all applications to start its SUs + * hosted on current node and is now waiting for them + * to acknowledge that they have started. + * + * JOINING_ASSIGNING_WL - JOINING_ASSIGNING_WORKLOAD + * Node has ordered all applications to assign workload + * to all its SUs which currently have no workload and + * is now waiting for the applications to acknowledge. + * + * LEAVING_SP_FAILING_OVER - LEAVING_SPONTANEOUSLY_FAILING_OVER + * Node has received an event telling that this node + * has left the cluster and has ordered all service + * groups to failover those of its SUs that were + * hosted on current node. + * + * LEAVING_SP_WAIT_FOR_JOIN - LEAVING_SPONTANEOUSLY_WAITING_FOR_NODE_TO_JOIN + * Node is waiting for current node to join again. + * + * 1.3 Actions + * =========== + * A1 - + * A2 - + * A3 - + * A4 - + * A5 - + * A6 - [foreach application in cluster]start application + * A7 - [foreach application in cluster]assign workload to application + * A8 - [foreach application in cluster] + * [foreach SG in application ]failover node + * A9 - [foreach application in cluster] + * [foreach SG in application ] + * [foreach SU in SG where the SU is hosted on current node] + * [foreach comp in such an SU]indicate that the node has left the cluster + * + * 1.4 Guards + * ========== + * C1 - All SG availability control state machines (ACSM) == IDLE + * C2 - + * C3 - + * C4 - No applications are in ACSM state == STARTING_SGS + * C5 - All applications have ACSM state == WORKLOAD_ASSIGNED + */ #include #include @@ -115,6 +173,9 @@ #include "print.h" #include "main.h" +/****************************************************************************** + * Internal (static) utility functions + *****************************************************************************/ static void amf_node_acsm_enter_leaving_spontaneously(struct amf_node *node) { @@ -132,7 +193,10 @@ static void amf_node_acsm_enter_failing_over (struct amf_node *node) ENTER("'%s'", node->name.value); node->acsm_state = NODE_ACSM_LEAVING_SPONTANEOUSLY_FAILING_OVER; - + /* + * Indicate to each component object in the model that current + * node has left the cluster + */ for (app = amf_cluster->application_head; app != NULL; app = app->next) { for (sg = app->sg_head; sg != NULL; sg = sg->next) { for (su = sg->su_head; su != NULL; su = su->next) { @@ -146,26 +210,89 @@ static void amf_node_acsm_enter_failing_over (struct amf_node *node) } } - for (app = amf_cluster->application_head; app != NULL; app = app->next) { - for (sg = app->sg_head; sg != NULL; sg = sg->next) { + /* + * Let all service groups with service units hosted on current node failover + * its workload + */ + for (app = amf_cluster->application_head; app != NULL; app = + app->next) { + for (sg = app->sg_head; sg != NULL; sg = + sg->next) { amf_sg_failover_node_req(sg, node); } } } +#ifdef COMPILE_OUT +static int all_applications_on_node_started (struct amf_node *node, + struct amf_cluster *cluster) +{ + int all_started = 1; + struct amf_application *app; + struct amf_sg *sg; + struct amf_su *su; + + for (app = cluster->application_head; app != NULL; app = app->next) { + for (sg = app->sg_head; sg != NULL; sg = sg->next) { + for (su = sg->su_head; su != NULL; su = su->next) { + /* + * TODO: Replace the if-statement below with the if-statementin + * this comment when the real problem is fixed ! + */ + if (su->saAmfSUPresenceState != + SA_AMF_PRESENCE_INSTANTIATED && + name_match(&su->saAmfSUHostedByNode,&node->name)) { + all_started = 0; goto done; + } + if (su->saAmfSUPresenceState != SA_AMF_PRESENCE_INSTANTIATED ) { + all_started = 0; + goto done; + } + } + } + } + + done: + return all_started; + +} +#endif + + +/****************************************************************************** + * Event methods + *****************************************************************************/ + /** - * Node leave event is obtained from amf_confchg_fn + * This event indicates that a node has unexpectedly left the cluster. Node + * leave event is obtained from amf_confchg_fn. * * @param node */ void amf_node_leave (struct amf_node *node) { + assert (node != NULL); ENTER("'%s', CLM node '%s'", node->name.value, node->saAmfNodeClmNode.value); - amf_node_acsm_enter_leaving_spontaneously(node); - amf_node_acsm_enter_failing_over (node); -} + + switch (node->acsm_state) { + case NODE_ACSM_ESCALLATION_LEVEL_0: + case NODE_ACSM_ESCALLATION_LEVEL_2: + case NODE_ACSM_ESCALLATION_LEVEL_3: + amf_node_acsm_enter_leaving_spontaneously(node); + amf_node_acsm_enter_failing_over (node); + break; + case NODE_ACSM_REPAIR_NEEDED: + break; + default: + log_printf (LOG_LEVEL_ERROR, "amf_node_leave()called in state = %d" + " (should have been defered)", node->acsm_state); + openais_exit_error (AIS_DONE_FATAL_ERR); + break; + + } +} /** * * @param node @@ -216,85 +343,246 @@ void amf_node_comp_failover_req ( } /** - * Node constructor - * @param loc - * @param cluster + * This event indicates that current node has joined and its cluster model has + * been synchronized with the other nodes cluster models. + * * @param node */ -struct amf_node *amf_node_new (struct amf_cluster *cluster, char *name) -{ - struct amf_node *node = calloc (1, sizeof (struct amf_node)); - - if (node == NULL) { - openais_exit_error(AIS_DONE_OUT_OF_MEMORY); - } - node->next = cluster->node_head; - node->saAmfNodeAdminState = SA_AMF_ADMIN_UNLOCKED; - node->saAmfNodeOperState = SA_AMF_OPERATIONAL_ENABLED; - node->saAmfNodeAutoRepair = SA_TRUE; - node->cluster = cluster; - node->saAmfNodeSuFailOverProb = -1; - node->saAmfNodeSuFailoverMax = ~0; - setSaNameT (&node->name, name); - - return node; -} - -void *amf_node_serialize (struct amf_node *node, int *len) -{ - int objsz = sizeof (struct amf_node); - struct amf_node *copy; - - copy = amf_malloc (objsz); - memcpy (copy, node, objsz); - *len = objsz; - TRACE8 ("%s", copy->name.value); - - return copy; -} - -struct amf_node *amf_node_deserialize ( - struct amf_cluster *cluster, char *buf, int size) -{ - int objsz = sizeof (struct amf_node); - - if (objsz > size) { - return NULL; - } else { - struct amf_node *obj = amf_node_new (cluster, ""); - if (obj == NULL) { - return NULL; - } - memcpy (obj, buf, objsz); - TRACE8 ("%s", obj->name.value); - obj->cluster = cluster; - obj->next = cluster->node_head; - cluster->node_head = obj; - return obj; - } -} - void amf_node_sync_ready (struct amf_node *node) { struct amf_application *app; assert (node != NULL); - log_printf(LOG_NOTICE, "Node %s sync ready, starting hosted SUs.", + log_printf(LOG_NOTICE, "Node=%s: sync ready, starting hosted SUs.", node->name.value); node->saAmfNodeOperState = SA_AMF_OPERATIONAL_ENABLED; - for (app = amf_cluster->application_head; app != NULL; app = app->next) { - amf_application_start (app, node); + + switch (node->acsm_state) { + case NODE_ACSM_ESCALLATION_LEVEL_0: + case NODE_ACSM_ESCALLATION_LEVEL_2: + case NODE_ACSM_ESCALLATION_LEVEL_3: + case NODE_ACSM_LEAVING_SPONTANEOUSLY_WAITING_FOR_NODE_TO_JOIN: + node->acsm_state = NODE_ACSM_JOINING_STARTING_APPLICATIONS; + for (app = amf_cluster->application_head; app != NULL; app = app->next) { + amf_application_start (app, node); + } + break; + case NODE_ACSM_REPAIR_NEEDED: + break; + default: + log_printf (LOG_LEVEL_ERROR, "amf_node_sync_ready()called in state" + " = %d (should have been defered)", node->acsm_state); + openais_exit_error (AIS_DONE_FATAL_ERR); + break; + } } +/****************************************************************************** + * Event response methods + *****************************************************************************/ + +/** + * This event indicates that an application has started. Started in this context + * means that none of its contained service units is in an -ING state with other + * words successfully instantiated, instantiation has failed or instantiation + * was not possible (due to the node on which the SU was to be hosted is not + * operational). + * + * @param node + * @param application which has been started + */ +void amf_node_application_started (struct amf_node *node, + struct amf_application *app) +{ + assert (node != NULL && app != NULL ); + ENTER ("Node=%s: application '%s' started", node->name.value, + app->name.value); + + switch (node->acsm_state) { + case NODE_ACSM_JOINING_STARTING_APPLICATIONS: + if (amf_cluster_applications_started_with_no_starting_sgs( + app->cluster)) { + log_printf(LOG_NOTICE, + "Node=%s: all applications started, assigning workload.", + node->name.value); + + node->acsm_state = NODE_ACSM_JOINING_ASSIGNING_WORKLOAD; + for (app = app->cluster->application_head; app != NULL; + app = app->next) { + amf_application_assign_workload (app, node); + } + } + break; + default: + log_printf (LOG_LEVEL_ERROR, "amf_node_application_started()" + "called in state = %d (unexpected !!)", node->acsm_state); + openais_exit_error (AIS_DONE_FATAL_ERR); + break; + + } +} + +/** + * This event indicates that an application has been assigned workload. + * + * @param node + * @param application which has been assigned workload + */ +void amf_node_application_workload_assigned (struct amf_node *node, + struct amf_application *app) +{ + assert (node != NULL && app != NULL ); + ENTER ("Node=%s: application '%s' started", node->name.value, + app->name.value); + + switch (node->acsm_state) { + case NODE_ACSM_JOINING_ASSIGNING_WORKLOAD: + + if (amf_cluster_applications_assigned (amf_cluster)) { + log_printf(LOG_NOTICE, "Node=%s: all workload assigned", + node->name.value); + /* + * TODO: new state should be set via history + */ + node->acsm_state = NODE_ACSM_ESCALLATION_LEVEL_0; + } + break; + default: + log_printf (LOG_LEVEL_ERROR, "amf_node_application_workload_assigned()" + "called in state = %d (unexpected !!)", node->acsm_state); + openais_exit_error (AIS_DONE_FATAL_ERR); + break; + } +} + +/** + * This event indicates that an SG has failed over its workload after a node + * failure. + * + * @param node + * @param sg_in SG which is now ready with its failover + */ +void amf_node_sg_failed_over (struct amf_node *node, struct amf_sg *sg_in) +{ + struct amf_sg *sg; + struct amf_application *app; + int all_sg_has_failed_over = 1; + + assert (node != NULL && app != NULL); + ENTER ("Node=%s: SG '%s' started", node->name.value, + sg_in->name.value); + + switch (node->acsm_state) { + case NODE_ACSM_LEAVING_SPONTANEOUSLY_FAILING_OVER: + for (app = amf_cluster->application_head; app != NULL; + app = app->next) { + for (sg = app->sg_head; sg != NULL; sg = sg->next) { + if (sg->avail_state != SG_AC_Idle) { + all_sg_has_failed_over = 0; + goto end; + } + } + } + break; + default: + log_printf (LOG_LEVEL_ERROR, "amf_node_sg_failed_over()" + "called in state = %d (unexpected !!)", node->acsm_state); + openais_exit_error (AIS_DONE_FATAL_ERR); + break; + } + end: + if (all_sg_has_failed_over) { + node->acsm_state = NODE_ACSM_LEAVING_SPONTANEOUSLY_WAITING_FOR_NODE_TO_JOIN; + } +} + +/****************************************************************************** + * General methods + *****************************************************************************/ + void amf_node_init (void) { log_init ("AMF"); } -struct amf_node *amf_node_find (SaNameT *name) +/** + * Node constructor + * @param loc + * @param cluster + * @param node + */ +struct amf_node *amf_node_new (struct amf_cluster *cluster, char *name) { + struct amf_node *node = amf_calloc (1, sizeof (struct amf_node)); + + setSaNameT (&node->name, name); + node->saAmfNodeAdminState = SA_AMF_ADMIN_UNLOCKED; + node->saAmfNodeOperState = SA_AMF_OPERATIONAL_ENABLED; + node->saAmfNodeAutoRepair = SA_TRUE; + node->saAmfNodeSuFailOverProb = -1; + node->saAmfNodeSuFailoverMax = ~0; + node->cluster = cluster; + node->next = cluster->node_head; + cluster->node_head = node; + node->acsm_state = NODE_ACSM_ESCALLATION_LEVEL_0; + + return node; +} + +void *amf_node_serialize (struct amf_node *node, int *len) { + char *buf = NULL; + int offset = 0, size = 0; + + TRACE8 ("%s", node->name.value); + + buf = amf_serialize_SaNameT (buf, &size, &offset, &node->name); + buf = amf_serialize_SaNameT (buf, &size, &offset, &node->saAmfNodeClmNode); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + node->saAmfNodeSuFailOverProb); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + node->saAmfNodeSuFailoverMax); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + node->saAmfNodeAutoRepair); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + node->saAmfNodeRebootOnInstantiationFailure); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + node->saAmfNodeRebootOnTerminationFailure); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + node->saAmfNodeAdminState); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + node->saAmfNodeOperState); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + node->nodeid); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + node->acsm_state); + + *len = offset; + + return buf; +} + +struct amf_node *amf_node_deserialize (struct amf_cluster *cluster, char *buf) { + char *tmp = buf; + struct amf_node *node = amf_node_new (cluster, ""); + + tmp = amf_deserialize_SaNameT (tmp, &node->name); + tmp = amf_deserialize_SaNameT (tmp, &node->saAmfNodeClmNode); + tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeSuFailOverProb); + tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeSuFailoverMax); + tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeAutoRepair); + tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeRebootOnInstantiationFailure); + tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeRebootOnTerminationFailure); + tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeAdminState); + tmp = amf_deserialize_SaUint32T (tmp, &node->saAmfNodeOperState); + tmp = amf_deserialize_SaUint32T (tmp, &node->nodeid); + tmp = amf_deserialize_SaUint32T (tmp, &node->acsm_state); + + return node; +} + +struct amf_node *amf_node_find (SaNameT *name) { struct amf_node *node; assert (name != NULL && amf_cluster != NULL); @@ -310,8 +598,7 @@ struct amf_node *amf_node_find (SaNameT *name) return NULL; } -struct amf_node *amf_node_find_by_nodeid (unsigned int nodeid) -{ +struct amf_node *amf_node_find_by_nodeid (unsigned int nodeid) { struct amf_node *node; assert (amf_cluster != NULL); @@ -327,8 +614,7 @@ struct amf_node *amf_node_find_by_nodeid (unsigned int nodeid) return NULL; } -struct amf_node *amf_node_find_by_hostname (const char *hostname) -{ +struct amf_node *amf_node_find_by_hostname (const char *hostname) { struct amf_node *node; assert (hostname != NULL && amf_cluster != NULL); @@ -344,67 +630,3 @@ struct amf_node *amf_node_find_by_hostname (const char *hostname) return NULL; } -static int all_applications_on_node_started (struct amf_node *node, - struct amf_cluster *cluster) -{ - int all_started = 1; - struct amf_application *app; - struct amf_sg *sg; - struct amf_su *su; - - for (app = cluster->application_head; app != NULL; app = app->next) { - for (sg = app->sg_head; sg != NULL; sg = sg->next) { - for (su = sg->su_head; su != NULL; su = su->next) { - /* TODO: Replace the if-statement below with the if-statement in this comment when - the real problem is fixed ! - if (su->saAmfSUPresenceState != SA_AMF_PRESENCE_INSTANTIATED && - name_match(&su->saAmfSUHostedByNode,&node->name)) { - all_started = 0; - goto done; - } -*/ - if (su->saAmfSUPresenceState != SA_AMF_PRESENCE_INSTANTIATED ) { - all_started = 0; - goto done; - } - } - } - } - -done: - return all_started; - -} - -void amf_node_application_started (struct amf_node *node, - struct amf_application *_app) -{ - struct amf_application *app = _app; - - ENTER ("application '%s' started", app->name.value); - - if (all_applications_on_node_started (node, app->cluster)) { - - log_printf(LOG_NOTICE, - "Node: all applications started, assigning workload."); - - for (app = _app->cluster->application_head; app != NULL; - app = app->next) { - amf_application_assign_workload (app, node); - } - } - -} - -void amf_node_application_workload_assigned (struct amf_node *node, - struct amf_application *app) -{ - - log_printf(LOG_NOTICE, "Node: all workload assigned on node %s", - node->name.value); - - /** - * TODO: Set node acsm state - */ -} - diff --git a/exec/amfsg.c b/exec/amfsg.c index 7df6862c..a93203ee 100644 --- a/exec/amfsg.c +++ b/exec/amfsg.c @@ -182,6 +182,7 @@ static void return_to_idle (struct amf_sg *sg) break; case SG_RT_FailoverNode: + amf_node_sg_failed_over (sg->recovery_scope.node, sg); log_printf ( LOG_NOTICE, "'%s for %s' recovery action finished", sg_recovery_type_text[sg->recovery_scope.recovery_type], @@ -191,6 +192,7 @@ static void return_to_idle (struct amf_sg *sg) log_printf ( LOG_NOTICE, "'%s' recovery action finished", sg_recovery_type_text[0]); + break; } } @@ -280,7 +282,7 @@ static void acsm_enter_terminating_suspected (struct amf_sg *sg) struct amf_su **sus= sg->recovery_scope.sus; sg->avail_state = SG_AC_TerminatingSuspected; - /* + /* * Terminate suspected SU(s) */ while (*sus != 0) { @@ -501,18 +503,19 @@ static void acsm_enter_repairing_su (struct amf_sg *sg) openais_exit_error (AIS_DONE_FATAL_ERR); } if (node->saAmfNodeOperState == SA_AMF_OPERATIONAL_ENABLED) { + /* node is synchronized */ is_any_su_instantiated = 1; amf_su_instantiate ((*sus)); - } else { - return_to_idle (sg); } } sus++; } + if (is_any_su_instantiated == 0) { return_to_idle (sg); } + } /** @@ -605,7 +608,7 @@ static void set_scope_for_failover_su (struct amf_sg *sg, struct amf_su *su) struct amf_su **sus; SaNameT dn; sg->recovery_scope.recovery_type = SG_RT_FailoverSU; - + sg->recovery_scope.node = NULL; sg->recovery_scope.comp = NULL; sg->recovery_scope.sus = (struct amf_su **) @@ -653,6 +656,7 @@ static void set_scope_for_failover_node (struct amf_sg *sg, struct amf_node *nod ENTER ("'%s'", node->name.value); sg->recovery_scope.recovery_type = SG_RT_FailoverNode; + sg->recovery_scope.node = node; sg->recovery_scope.comp = NULL; sg->recovery_scope.sus = (struct amf_su **) calloc (1, sizeof (struct amf_su *)); @@ -908,6 +912,7 @@ static void assign_si_assumed_cbfn ( confirmed_assignments); amf_runtime_attributes_print (amf_cluster); assert (0); + break; } } @@ -922,31 +927,57 @@ static inline int div_round (int a, int b) return res; } +#ifdef COMPILE_OUT static int all_su_has_presence_state ( struct amf_sg *sg, struct amf_node *node_to_start, - SaAmfPresenceStateT state) -{ - struct amf_su *su; - int all_set = 1; + SaAmfPresenceStateT state) +{ + struct amf_su *su; + int all_set = 1; for (su = sg->su_head; su != NULL; su = su->next) { if (su->saAmfSUPresenceState != state) { if (node_to_start == NULL) { - all_set = 0; + all_set = 0; + break; + } else { + if (name_match(&node_to_start->name, + &su->saAmfSUHostedByNode)) { + all_set = 0; + break; + } + } + } + } + return all_set; +} +#endif + +static int no_su_has_presence_state ( + struct amf_sg *sg, struct amf_node *node_to_start, + SaAmfPresenceStateT state) +{ + struct amf_su *su; + int no_su_has_presence_state = 1; + for (su = sg->su_head; su != NULL; su = su->next) { + + if (su->saAmfSUPresenceState == state) { + if (node_to_start == NULL) { + no_su_has_presence_state = 0; break; } else { if (name_match(&node_to_start->name, &su->saAmfSUHostedByNode)) { - all_set = 0; + no_su_has_presence_state = 0; break; } } } } - return all_set; -} + return no_su_has_presence_state; +} static int all_su_in_scope_has_presence_state ( struct amf_sg *sg, SaAmfPresenceStateT state) @@ -1300,14 +1331,16 @@ static int assign_si (struct amf_sg *sg, int dependency_level) return assigned; } -void amf_sg_assign_si (struct amf_sg *sg, int dependency_level) +int amf_sg_assign_si_req (struct amf_sg *sg, int dependency_level) { + int posible_to_assign_si; sg->avail_state = SG_AC_AssigningOnRequest; - if (assign_si (sg, dependency_level) == 0) { + + if ((posible_to_assign_si = assign_si (sg, dependency_level)) == 0) { return_to_idle (sg); - amf_application_sg_assigned (sg->application, sg); } + return posible_to_assign_si; } void amf_sg_failover_node_req ( @@ -1406,8 +1439,8 @@ void amf_sg_su_state_changed (struct amf_sg *sg, if (type == SA_AMF_PRESENCE_STATE) { if (state == SA_AMF_PRESENCE_INSTANTIATED) { if (sg->avail_state == SG_AC_InstantiatingServiceUnits) { - if (all_su_has_presence_state(sg, sg->node_to_start, - SA_AMF_PRESENCE_INSTANTIATED)) { + if (no_su_has_presence_state(sg, sg->node_to_start, + SA_AMF_PRESENCE_INSTANTIATING)) { su->sg->avail_state = SG_AC_Idle; amf_application_sg_started ( sg->application, sg, this_amf_node); @@ -1425,6 +1458,7 @@ void amf_sg_su_state_changed (struct amf_sg *sg, assert (0); } } else { + dprintf ("avail-state: %u", sg->avail_state); assert (0); } } else if (state == SA_AMF_PRESENCE_UNINSTANTIATED) { @@ -1444,6 +1478,17 @@ void amf_sg_su_state_changed (struct amf_sg *sg, } else { assert (0); } + } else if (state == SA_AMF_PRESENCE_INSTANTIATING) { + ; /* nop */ + } else if (state == SA_AMF_PRESENCE_INSTANTIATION_FAILED) { + if (sg->avail_state == SG_AC_InstantiatingServiceUnits) { + if (no_su_has_presence_state(sg, sg->node_to_start, + SA_AMF_PRESENCE_INSTANTIATING)) { + su->sg->avail_state = SG_AC_Idle; + amf_application_sg_started ( + sg->application, sg, this_amf_node); + } + } } else { assert (0); } @@ -1493,14 +1538,9 @@ void amf_sg_failover_su_req ( struct amf_sg *amf_sg_new (struct amf_application *app, char *name) { - struct amf_sg *sg = calloc (1, sizeof (struct amf_sg)); + struct amf_sg *sg = amf_calloc (1, sizeof (struct amf_sg)); - if (sg == NULL) { - openais_exit_error (AIS_DONE_OUT_OF_MEMORY); - } - - sg->next = app->sg_head; - app->sg_head = sg; + setSaNameT (&sg->name, name); sg->saAmfSGAdminState = SA_AMF_ADMIN_UNLOCKED; sg->saAmfSGNumPrefActiveSUs = 1; sg->saAmfSGNumPrefStandbySUs = 1; @@ -1513,8 +1553,8 @@ struct amf_sg *amf_sg_new (struct amf_application *app, char *name) sg->saAmfSGAutoAdjustProb = -1; sg->saAmfSGAutoRepair = SA_TRUE; sg->application = app; - setSaNameT (&sg->name, name); - sg->node_to_start = NULL; + sg->next = app->sg_head; + app->sg_head = sg; return sg; } @@ -1585,13 +1625,10 @@ void *amf_sg_serialize (struct amf_sg *sg, int *len) return buf; } -struct amf_sg *amf_sg_deserialize ( - struct amf_application *app, char *buf, int size) +struct amf_sg *amf_sg_deserialize (struct amf_application *app, char *buf) { char *tmp = buf; - struct amf_sg *sg; - - sg = amf_sg_new (app, ""); + struct amf_sg *sg = amf_sg_new (app, ""); tmp = amf_deserialize_SaNameT (tmp, &sg->name); tmp = amf_deserialize_SaUint32T (tmp, &sg->saAmfSGRedundancyModel); diff --git a/exec/amfsi.c b/exec/amfsi.c index 5332eb87..ab055232 100644 --- a/exec/amfsi.c +++ b/exec/amfsi.c @@ -503,11 +503,7 @@ void amf_csi_delete_assignments (struct amf_csi *csi, struct amf_su *su) struct amf_si *amf_si_new (struct amf_application *app, char *name) { struct amf_si *tail = app->si_head; - struct amf_si *si = calloc (1, sizeof (struct amf_si)); - - if (si == NULL) { - openais_exit_error (AIS_DONE_OUT_OF_MEMORY); - } + struct amf_si *si = amf_calloc (1, sizeof (struct amf_si)); while (tail != NULL) { if (tail->next == NULL) { @@ -559,38 +555,42 @@ void amf_si_delete (struct amf_si *si) void *amf_si_serialize (struct amf_si *si, int *len) { - int objsz = sizeof (struct amf_si); - struct amf_si *copy; + char *buf = NULL; + int offset = 0, size = 0; - copy = amf_malloc (objsz); - memcpy (copy, si, objsz); - *len = objsz; - TRACE8 ("%s", copy->name.value); + TRACE8 ("%s", si->name.value); - return copy; + buf = amf_serialize_SaNameT (buf, &size, &offset, &si->name); + buf = amf_serialize_SaNameT (buf, &size, &offset, + &si->saAmfSIProtectedbySG); + buf = amf_serialize_SaUint32T (buf, &size, &offset, si->saAmfSIRank); + buf = amf_serialize_SaUint32T (buf, &size, &offset, si->saAmfSINumCSIs); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + si->saAmfSIPrefActiveAssignments); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + si->saAmfSIPrefStandbyAssignments); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + si->saAmfSIAdminState); + + *len = offset; + + return buf; } -struct amf_si *amf_si_deserialize ( - struct amf_application *app, char *buf, int size) +struct amf_si *amf_si_deserialize (struct amf_application *app, char *buf) { - int objsz = sizeof (struct amf_si); + char *tmp = buf; + struct amf_si *si = amf_si_new (app, ""); - if (objsz > size) { - return NULL; - } else { - struct amf_si *tmp = (struct amf_si*) buf; - struct amf_si *si = amf_si_new (app, (char*)tmp->name.value); - TRACE8 ("%s", si->name.value); + tmp = amf_deserialize_SaNameT (tmp, &si->name); + tmp = amf_deserialize_SaNameT (tmp, &si->saAmfSIProtectedbySG); + tmp = amf_deserialize_SaUint32T (tmp, &si->saAmfSIRank); + tmp = amf_deserialize_SaUint32T (tmp, &si->saAmfSINumCSIs); + tmp = amf_deserialize_SaUint32T (tmp, &si->saAmfSIPrefActiveAssignments); + tmp = amf_deserialize_SaUint32T (tmp, &si->saAmfSIPrefStandbyAssignments); + tmp = amf_deserialize_SaUint32T (tmp, &si->saAmfSIAdminState); - memcpy (&si->saAmfSIProtectedbySG, &tmp->saAmfSIProtectedbySG, - sizeof (SaNameT)); - si->saAmfSIRank = tmp->saAmfSIRank; - si->saAmfSINumCSIs = tmp->saAmfSINumCSIs; - si->saAmfSIPrefActiveAssignments = tmp->saAmfSIPrefActiveAssignments; - si->saAmfSIPrefStandbyAssignments = tmp->saAmfSIPrefStandbyAssignments; - si->saAmfSIAdminState = tmp->saAmfSIAdminState; - return si; - } + return si; } /***************************************************************************** @@ -599,48 +599,48 @@ struct amf_si *amf_si_deserialize ( struct amf_si_assignment *amf_si_assignment_new (struct amf_si *si) { - struct amf_si_assignment *si_assignment; + struct amf_si_assignment *si_assignment = + amf_calloc (1, sizeof (struct amf_si_assignment)); - si_assignment = amf_malloc (sizeof (struct amf_si_assignment)); si_assignment->si = si; + si_assignment->next = si->assigned_sis; + si->assigned_sis = si_assignment; return si_assignment; } void *amf_si_assignment_serialize ( - struct amf_si_assignment *si_assignment, int *len) + amf_si_assignment_t *si_assignment, int *len) { - int objsz = sizeof (struct amf_si_assignment); - struct amf_si_assignment *copy; + char *buf = NULL; + int offset = 0, size = 0; - copy = amf_malloc (objsz); - memcpy (copy, si_assignment, objsz); - *len = objsz; - TRACE8 ("%s", copy->name.value); + TRACE8 ("%s", si_assignment->name.value); - return copy; + buf = amf_serialize_SaNameT (buf, &size, &offset, &si_assignment->name); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + si_assignment->saAmfSISUHAState); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + si_assignment->requested_ha_state); + + *len = offset; + + return buf; } struct amf_si_assignment *amf_si_assignment_deserialize ( - struct amf_si *si, char *buf, int size) + struct amf_si *si, char *buf) { - int objsz = sizeof (struct amf_si_assignment); + char *tmp = buf; + struct amf_si_assignment *si_assignment = amf_si_assignment_new (si); - if (objsz > size) { - return NULL; - } else { - struct amf_si_assignment *obj = amf_si_assignment_new (si); - if (obj == NULL) { - return NULL; - } - memcpy (obj, buf, objsz); - TRACE8 ("%s", obj->name.value); - obj->si = si; - obj->su = amf_su_find (si->application->cluster, &obj->name); - obj->next = si->assigned_sis; - si->assigned_sis = obj; - return obj; - } + tmp = amf_deserialize_SaNameT (tmp, &si_assignment->name); + tmp = amf_deserialize_SaUint32T (tmp, &si_assignment->saAmfSISUHAState); + tmp = amf_deserialize_SaUint32T (tmp, &si_assignment->requested_ha_state); + si_assignment->su = amf_su_find (si->application->cluster, + &si_assignment->name); + + return si_assignment; } struct amf_si *amf_si_find (struct amf_application *app, char *name) @@ -668,10 +668,11 @@ struct amf_si *amf_si_find (struct amf_application *app, char *name) struct amf_csi *amf_csi_new (struct amf_si *si) { - struct amf_csi *csi; + struct amf_csi *csi = amf_calloc (1, sizeof (struct amf_csi)); - csi = amf_malloc (sizeof (struct amf_csi)); csi->si = si; + csi->next = si->csi_head; + si->csi_head = csi; return csi; } @@ -691,37 +692,28 @@ void amf_csi_delete (struct amf_csi *csi) void *amf_csi_serialize (struct amf_csi *csi, int *len) { - int objsz = sizeof (struct amf_csi); - struct amf_csi *copy; + char *buf = NULL; + int offset = 0, size = 0; - copy = amf_malloc (objsz); - memcpy (copy, csi, objsz); - *len = objsz; - TRACE8 ("%s", copy->name.value); + TRACE8 ("%s", csi->name.value); - return copy; + buf = amf_serialize_SaNameT (buf, &size, &offset, &csi->name); + buf = amf_serialize_SaNameT (buf, &size, &offset, &csi->saAmfCSTypeName); + + *len = offset; + + return buf; } -struct amf_csi *amf_csi_deserialize (struct amf_si *si, char *buf, int size) +struct amf_csi *amf_csi_deserialize (struct amf_si *si, char *buf) { - int objsz = sizeof (struct amf_csi); + char *tmp = buf; + struct amf_csi *csi = amf_csi_new (si); - if (objsz > size) { - return NULL; - } else { - struct amf_csi *obj = amf_csi_new (si); - if (obj == NULL) { - return NULL; - } - memcpy (obj, buf, objsz); - TRACE8 ("%s", obj->name.value); - obj->si = si; - obj->assigned_csis = NULL; - obj->attributes_head = NULL; - obj->next = si->csi_head; - si->csi_head = obj; - return obj; - } + tmp = amf_deserialize_SaNameT (tmp, &csi->name); + tmp = amf_deserialize_SaNameT (tmp, &csi->saAmfCSTypeName); + + return csi; } struct amf_csi *amf_csi_find (struct amf_si *si, char *name) @@ -747,10 +739,12 @@ struct amf_csi *amf_csi_find (struct amf_si *si, char *name) struct amf_csi_assignment *amf_csi_assignment_new (struct amf_csi *csi) { - struct amf_csi_assignment *csi_assignment; + struct amf_csi_assignment *csi_assignment = + amf_calloc (1, sizeof (struct amf_csi_assignment)); - csi_assignment = amf_malloc (sizeof (struct amf_csi_assignment)); csi_assignment->csi = csi; + csi_assignment->next = csi->assigned_csis; + csi->assigned_csis = csi_assignment; return csi_assignment; } @@ -758,15 +752,20 @@ struct amf_csi_assignment *amf_csi_assignment_new (struct amf_csi *csi) void *amf_csi_assignment_serialize ( struct amf_csi_assignment *csi_assignment, int *len) { - int objsz = sizeof (struct amf_csi_assignment); - struct amf_csi_assignment *copy; + char *buf = NULL; + int offset = 0, size = 0; - copy = amf_malloc (objsz); - memcpy (copy, csi_assignment, objsz); - *len = objsz; - TRACE8 ("%s", copy->name.value); + TRACE8 ("%s", csi_assignment->name.value); - return copy; + buf = amf_serialize_SaNameT (buf, &size, &offset, &csi_assignment->name); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + csi_assignment->saAmfCSICompHAState); + buf = amf_serialize_SaUint32T (buf, &size, &offset, + csi_assignment->requested_ha_state); + + *len = offset; + + return buf; } struct amf_si_assignment *si_assignment_find ( @@ -794,28 +793,22 @@ struct amf_si_assignment *si_assignment_find ( } struct amf_csi_assignment *amf_csi_assignment_deserialize ( - struct amf_csi *csi, char *buf, int size) + struct amf_csi *csi, char *buf) { - int objsz = sizeof (struct amf_csi_assignment); + char *tmp = buf; + struct amf_csi_assignment *csi_assignment = amf_csi_assignment_new (csi); - if (objsz > size) { - return NULL; - } else { - struct amf_csi_assignment *obj = amf_csi_assignment_new (csi); - if (obj == NULL) { - return NULL; - } - memcpy (obj, buf, objsz); - TRACE8 ("%s", obj->name.value); - obj->csi = csi; - obj->comp = amf_comp_find (csi->si->application->cluster, &obj->name); - assert (obj->comp != NULL); - obj->next = csi->assigned_csis; - csi->assigned_csis = obj; + tmp = amf_deserialize_SaNameT (tmp, &csi_assignment->name); + tmp = amf_deserialize_SaUint32T (tmp, + &csi_assignment->saAmfCSICompHAState); + tmp = amf_deserialize_SaUint32T (tmp, &csi_assignment->requested_ha_state); - obj->si_assignment = si_assignment_find(obj); - return obj; - } + csi_assignment->comp = amf_comp_find (csi->si->application->cluster, + &csi_assignment->name); + assert (csi_assignment->comp != NULL); + csi_assignment->si_assignment = si_assignment_find(csi_assignment); + + return csi_assignment; } char *amf_csi_assignment_dn_make ( @@ -913,9 +906,9 @@ end: struct amf_csi_attribute *amf_csi_attribute_new (struct amf_csi *csi) { - struct amf_csi_attribute *csi_attribute; + struct amf_csi_attribute *csi_attribute = + amf_calloc (1, sizeof (struct amf_csi_assignment)); - csi_attribute = amf_malloc (sizeof (struct amf_csi_assignment)); csi_attribute->next = csi->attributes_head; csi->attributes_head = csi_attribute; @@ -949,7 +942,7 @@ void *amf_csi_attribute_serialize ( } struct amf_csi_attribute *amf_csi_attribute_deserialize ( - struct amf_csi *csi, char *buf, int size) + struct amf_csi *csi, char *buf) { char *tmp = buf; struct amf_csi_attribute *csi_attribute; diff --git a/exec/amfsu.c b/exec/amfsu.c index 0f90f8cc..9748fcc6 100644 --- a/exec/amfsu.c +++ b/exec/amfsu.c @@ -128,20 +128,6 @@ #include "print.h" #include "main.h" -int amf_su_presence_state_all_comps_in_su_are_set (struct amf_su *su, - SaAmfPresenceStateT state) -{ - int all_set = 1; - struct amf_comp *comp; - - for (comp = su->comp_head; comp != NULL; comp = comp->next) { - if (comp->saAmfCompPresenceState != state) { - all_set = 0; - } - } - - return all_set; -} /** * This function only logs since the readiness state is runtime @@ -264,6 +250,182 @@ static void comp_restart (struct amf_comp *comp) amf_comp_restart (comp); } +static void si_ha_state_assumed_cbfn ( + struct amf_si_assignment *si_assignment, int result) +{ + struct amf_si_assignment *tmp_si_assignment; + struct amf_comp *comp; + struct amf_csi_assignment *csi_assignment; + int all_confirmed = 1; + + ENTER (""); + + tmp_si_assignment = amf_su_get_next_si_assignment(si_assignment->su, NULL); + + while (tmp_si_assignment != NULL) { + for (comp = tmp_si_assignment->su->comp_head; comp != NULL; + comp = comp->next) { + + csi_assignment = amf_comp_get_next_csi_assignment(comp, NULL); + while (csi_assignment != NULL) { + + if (csi_assignment->requested_ha_state != + csi_assignment->saAmfCSICompHAState) { + all_confirmed = 0; + } + csi_assignment = amf_comp_get_next_csi_assignment( + comp, csi_assignment); + } + } + tmp_si_assignment = amf_su_get_next_si_assignment( + si_assignment->su, tmp_si_assignment); + } + + if (all_confirmed) { + switch (si_assignment->su->restart_control_state) { + case SU_RC_RESTART_COMP_SETTING: + log_printf (LOG_NOTICE, "Component restart recovery finished"); + break; + case SU_RC_RESTART_SU_SETTING: + log_printf (LOG_NOTICE, "SU restart recovery finished"); + break; + default: + assert (0); + break; + } + si_assignment->su->restart_control_state = + si_assignment->su->escalation_level_history_state; + } +} + +static void reassign_sis(struct amf_su *su) +{ + struct amf_si_assignment *si_assignment; + + ENTER (""); + + si_assignment = amf_su_get_next_si_assignment(su, NULL); + + while (si_assignment != NULL) { + si_assignment->saAmfSISUHAState = 0; /* unknown */ + amf_si_ha_state_assume (si_assignment, si_ha_state_assumed_cbfn); + si_assignment = amf_su_get_next_si_assignment(su, si_assignment); + } +} + +static void su_comp_presence_state_changed ( + struct amf_su *su, struct amf_comp *comp, int state) +{ + ENTER ("'%s', '%s'", su->name.value, comp->name.value); + + switch (state) { + case SA_AMF_PRESENCE_INSTANTIATED: + switch (su->restart_control_state) { + case SU_RC_ESCALATION_LEVEL_2: + /* + * TODO: send to node + */ + case SU_RC_ESCALATION_LEVEL_0: + if (amf_su_presence_state_all_comps_in_su_are_set ( + comp->su, SA_AMF_PRESENCE_INSTANTIATED)) { + + su_presence_state_set ( + comp->su, SA_AMF_PRESENCE_INSTANTIATED); + } + break; + case SU_RC_RESTART_COMP_RESTARTING: + su->restart_control_state = SU_RC_RESTART_COMP_SETTING; + reassign_sis (comp->su); + break; + case SU_RC_RESTART_SU_INSTANTIATING: + if (amf_su_presence_state_all_comps_in_su_are_set ( + comp->su, SA_AMF_PRESENCE_INSTANTIATED)) { + + su->restart_control_state = SU_RC_RESTART_SU_SETTING; + su_presence_state_set ( + comp->su, SA_AMF_PRESENCE_INSTANTIATED); + reassign_sis (comp->su); + } + break; + default: + dprintf ("state %d", su->restart_control_state); + assert (0); + break; + } + break; + case SA_AMF_PRESENCE_UNINSTANTIATED: + if (amf_su_presence_state_all_comps_in_su_are_set ( + su, SA_AMF_PRESENCE_UNINSTANTIATED)) { + + su_presence_state_set (comp->su,SA_AMF_PRESENCE_UNINSTANTIATED); + } + break; + case SA_AMF_PRESENCE_INSTANTIATING: + su_presence_state_set (comp->su,SA_AMF_PRESENCE_INSTANTIATING); + break; + case SA_AMF_PRESENCE_RESTARTING: + break; + case SA_AMF_PRESENCE_TERMINATING: + break; + case SA_AMF_PRESENCE_INSTANTIATION_FAILED: + su_presence_state_set ( + comp->su, SA_AMF_PRESENCE_INSTANTIATION_FAILED); + break; + default: + assert (0); + break; + } +} + +static void su_comp_op_state_changed ( + struct amf_su *su, struct amf_comp *comp, int state) +{ + ENTER ("'%s', '%s'", su->name.value, comp->name.value); + + switch (state) { + case SA_AMF_OPERATIONAL_ENABLED: + { + struct amf_comp *comp_compare; + int all_set = 1; + for (comp_compare = comp->su->comp_head; + comp_compare != NULL; comp_compare = comp_compare->next) { + if (comp_compare->saAmfCompOperState != + SA_AMF_OPERATIONAL_ENABLED) { + + all_set = 0; + break; + } + } + if (all_set) { + su_operational_state_set (comp->su, SA_AMF_OPERATIONAL_ENABLED); + } else { + su_operational_state_set (comp->su, SA_AMF_OPERATIONAL_DISABLED); + } + break; + } + case SA_AMF_OPERATIONAL_DISABLED: + break; + default: + assert (0); + break; + } +} + +int amf_su_presence_state_all_comps_in_su_are_set (struct amf_su *su, + SaAmfPresenceStateT state) +{ + int all_set = 1; + struct amf_comp *comp; + + for (comp = su->comp_head; comp != NULL; comp = comp->next) { + if (comp->saAmfCompPresenceState != state) { + all_set = 0; + } + } + + return all_set; +} + void amf_su_instantiate (struct amf_su *su) { struct amf_comp *comp; @@ -331,158 +493,6 @@ void amf_su_assign_si (struct amf_su *su, struct amf_si *si, } } -static void si_ha_state_assumed_cbfn ( - struct amf_si_assignment *si_assignment, int result) -{ - struct amf_si_assignment *tmp_si_assignment; - struct amf_comp *comp; - struct amf_csi_assignment *csi_assignment; - int all_confirmed = 1; - - ENTER (""); - - tmp_si_assignment = amf_su_get_next_si_assignment(si_assignment->su, NULL); - - while (tmp_si_assignment != NULL) { - for (comp = tmp_si_assignment->su->comp_head; comp != NULL; - comp = comp->next) { - - csi_assignment = amf_comp_get_next_csi_assignment(comp, NULL); - while (csi_assignment != NULL) { - - if (csi_assignment->requested_ha_state != - csi_assignment->saAmfCSICompHAState) { - all_confirmed = 0; - } - csi_assignment = amf_comp_get_next_csi_assignment( - comp, csi_assignment); - } - } - tmp_si_assignment = amf_su_get_next_si_assignment( - si_assignment->su, tmp_si_assignment); - } - - if (all_confirmed) { - switch (si_assignment->su->restart_control_state) { - case SU_RC_RESTART_COMP_SETTING: - log_printf (LOG_NOTICE, "Component restart recovery finished"); - break; - case SU_RC_RESTART_SU_SETTING: - log_printf (LOG_NOTICE, "SU restart recovery finished"); - break; - default: - assert (0); - } - si_assignment->su->restart_control_state = - si_assignment->su->escalation_level_history_state; - } -} - -static void reassign_sis(struct amf_su *su) -{ - struct amf_si_assignment *si_assignment; - - ENTER (""); - - si_assignment = amf_su_get_next_si_assignment(su, NULL); - - while (si_assignment != NULL) { - si_assignment->saAmfSISUHAState = 0; /* unknown */ - amf_si_ha_state_assume (si_assignment, si_ha_state_assumed_cbfn); - si_assignment = amf_su_get_next_si_assignment(su, si_assignment); - } -} - -static void su_comp_presence_state_changed ( - struct amf_su *su, struct amf_comp *comp, int state) -{ - ENTER ("'%s', '%s'", su->name.value, comp->name.value); - - switch (state) { - case SA_AMF_PRESENCE_INSTANTIATED: - switch (su->restart_control_state) { - case SU_RC_ESCALATION_LEVEL_2: - /* - * TODO: send to node - */ - case SU_RC_ESCALATION_LEVEL_0: - if (amf_su_presence_state_all_comps_in_su_are_set ( - comp->su, SA_AMF_PRESENCE_INSTANTIATED)) { - - su_presence_state_set ( - comp->su, SA_AMF_PRESENCE_INSTANTIATED); - } - break; - case SU_RC_RESTART_COMP_RESTARTING: - su->restart_control_state = SU_RC_RESTART_COMP_SETTING; - reassign_sis (comp->su); - break; - case SU_RC_RESTART_SU_INSTANTIATING: - if (amf_su_presence_state_all_comps_in_su_are_set ( - comp->su, SA_AMF_PRESENCE_INSTANTIATED)) { - - su->restart_control_state = SU_RC_RESTART_SU_SETTING; - su_presence_state_set ( - comp->su, SA_AMF_PRESENCE_INSTANTIATED); - reassign_sis (comp->su); - } - break; - default: - dprintf ("state %d", su->restart_control_state); - assert (0); - } - break; - case SA_AMF_PRESENCE_UNINSTANTIATED: - if (amf_su_presence_state_all_comps_in_su_are_set ( - su, SA_AMF_PRESENCE_UNINSTANTIATED)) { - - su_presence_state_set (comp->su, - SA_AMF_PRESENCE_UNINSTANTIATED); - } - break; - case SA_AMF_PRESENCE_INSTANTIATING: - break; - case SA_AMF_PRESENCE_RESTARTING: - break; - case SA_AMF_PRESENCE_TERMINATING: - break; - default: - assert (0); - } -} - -static void su_comp_op_state_changed ( - struct amf_su *su, struct amf_comp *comp, int state) -{ - ENTER ("'%s', '%s'", su->name.value, comp->name.value); - - switch (state) { - case SA_AMF_OPERATIONAL_ENABLED: - { - struct amf_comp *comp_compare; - int all_set = 1; - for (comp_compare = comp->su->comp_head; - comp_compare != NULL; comp_compare = comp_compare->next) { - if (comp_compare->saAmfCompOperState != - SA_AMF_OPERATIONAL_ENABLED) { - - all_set = 0; - break; - } - } - if (all_set) { - su_operational_state_set (comp->su, SA_AMF_OPERATIONAL_ENABLED); - } else { - su_operational_state_set (comp->su, SA_AMF_OPERATIONAL_DISABLED); - } - break; - } - case SA_AMF_OPERATIONAL_DISABLED: - break; - default: - assert (0); - } -} /** * Used by a component to report a state change event @@ -503,6 +513,7 @@ void amf_su_comp_state_changed ( break; default: assert (0); + break; } } @@ -749,11 +760,7 @@ SaAmfReadinessStateT amf_su_get_saAmfSUReadinessState (struct amf_su *su) struct amf_su *amf_su_new (struct amf_sg *sg, char *name) { struct amf_su *tail = sg->su_head; - struct amf_su *su = calloc (1, sizeof (struct amf_su)); - - if (su == NULL) { - openais_exit_error (AIS_DONE_OUT_OF_MEMORY); - } + struct amf_su *su = amf_calloc (1, sizeof (struct amf_su)); while (tail != NULL) { if (tail->next == NULL) { @@ -836,12 +843,10 @@ void *amf_su_serialize (struct amf_su *su, int *len) return buf; } -struct amf_su *amf_su_deserialize (struct amf_sg *sg, char *buf, int size) +struct amf_su *amf_su_deserialize (struct amf_sg *sg, char *buf) { char *tmp = buf; - struct amf_su *su; - - su = amf_su_new (sg, ""); + struct amf_su *su = amf_su_new (sg, ""); tmp = amf_deserialize_SaNameT (tmp, &su->name); tmp = amf_deserialize_SaUint32T (tmp, &su->saAmfSURank); diff --git a/exec/amfutil.c b/exec/amfutil.c index 36d6042f..675a6c5d 100644 --- a/exec/amfutil.c +++ b/exec/amfutil.c @@ -327,11 +327,7 @@ struct amf_cluster *amf_config_read (char **error_string) cluster->node_head = node; current_parse = AMF_NODE; } else if ((loc = strstr_rs (line, "safApp=")) != 0) { - app = calloc (1, sizeof (struct amf_application)); - app->next = cluster->application_head; - cluster->application_head = app; - app->cluster = cluster; - app->saAmfApplicationAdminState = SA_AMF_ADMIN_UNLOCKED; + app = amf_application_new (cluster); setSaNameT (&app->name, trim_str (loc)); current_parse = AMF_APPLICATION; sg_cnt = 0; @@ -409,6 +405,7 @@ struct amf_cluster *amf_config_read (char **error_string) sg_cnt++; sg->recovery_scope.comp = NULL; sg->recovery_scope.recovery_type = 0; + sg->recovery_scope.node = NULL; sg->recovery_scope.sis = NULL; sg->recovery_scope.sus = NULL; current_parse = AMF_SG; @@ -921,7 +918,7 @@ void amf_runtime_attributes_print (struct amf_cluster *cluster) log_printf (LOG_INFO, "safCluster=%s", getSaNameT(&cluster->name)); log_printf (LOG_INFO, " admin state: %s\n", admin_state_text[cluster->saAmfClusterAdminState]); - log_printf (LOG_INFO, " state: %u\n", cluster->state); + log_printf (LOG_INFO, " state: %u\n", cluster->acsm_state); for (node = cluster->node_head; node != NULL; node = node->next) { log_printf (LOG_INFO, " safNode=%s\n", getSaNameT (&node->name)); log_printf (LOG_INFO, " CLM Node: %s\n", getSaNameT (&node->saAmfNodeClmNode)); @@ -1109,6 +1106,24 @@ char *amf_serialize_SaStringT (char *buf, int *size, int *offset, SaStringT str) return amf_serialize_opaque (buf, size, offset, str, len); } +char *amf_serialize_SaUint16T (char *buf, int *size, int *offset, SaUint16T num) +{ + char *tmp = buf; + + if ((*size - *offset ) < sizeof (SaUint16T)) { + *size += sizeof (SaUint16T); + tmp = realloc (buf, *size); + if (tmp == NULL) { + openais_exit_error (AIS_DONE_OUT_OF_MEMORY); + } + } + + *((SaUint16T *)&tmp[*offset]) = num; + (*offset) += sizeof (SaUint16T); + + return tmp; +} + char *amf_serialize_SaUint32T (char *buf, int *size, int *offset, SaUint32T num) { char *tmp = buf; @@ -1127,14 +1142,8 @@ char *amf_serialize_SaUint32T (char *buf, int *size, int *offset, SaUint32T num) return tmp; } -char *amf_serialize_SaUint64T (char *buf, SaUint64T num) -{ - *((SaUint64T *)buf) = num; - return buf + sizeof (SaUint64T); -} - char *amf_serialize_opaque ( - char *buf, int *size, int *offset, char *src, int cnt) + char *buf, int *size, int *offset, void *src, int cnt) { unsigned int required_size; char *tmp = buf; @@ -1185,19 +1194,19 @@ char *amf_deserialize_SaStringT (char *buf, SaStringT *str) return tmp; } +char *amf_deserialize_SaUint16T (char *buf, SaUint16T *num) +{ + *num = *((SaUint16T *)buf); + return buf + sizeof (SaUint16T); +} + char *amf_deserialize_SaUint32T (char *buf, SaUint32T *num) { *num = *((SaUint32T *)buf); return buf + sizeof (SaUint32T); } -char *amf_deserialize_SaUint64T (char *buf, SaUint64T *num) -{ - *num = *((SaUint64T *)buf); - return buf + sizeof (SaUint64T); -} - -char *amf_deserialize_opaque (char *buf, char *dst, int *cnt) +char *amf_deserialize_opaque (char *buf, void *dst, int *cnt) { *cnt = *((SaUint32T *)buf); memcpy (dst, buf + sizeof (SaUint32T), *cnt); @@ -1216,6 +1225,18 @@ void *_amf_malloc (size_t size, char *file, unsigned int line) return tmp; } +void *_amf_calloc (size_t nmemb, size_t size, char *file, unsigned int line) +{ + void *tmp = calloc (nmemb, size); + + if (tmp == NULL) { + log_printf (LOG_LEVEL_ERROR, "AMF out-of-memory at %s:%u", file, line); + openais_exit_error (AIS_DONE_OUT_OF_MEMORY); + } + + return tmp; +} + int sa_amf_grep_one_sub_match(const char *string, char *pattern, SaNameT *matches_arr) { @@ -1297,17 +1318,26 @@ out: } -void amf_msg_mcast (int id, void *buf, size_t len) +/** + * Multicast a message to the cluster. Errors are treated as + * fatal and will exit the program. + * @param msg_id + * @param buf + * @param len + * + * @return int + */ +int amf_msg_mcast (int msg_id, void *buf, size_t len) { struct req_exec_amf_msg msg; struct iovec iov[2]; int iov_cnt; int res; -// ENTER ("%u, %p, %u", id, buf, len); +// ENTER ("%u, %p, %u", msg_id, buf, len); msg.header.size = sizeof (msg); - msg.header.id = SERVICE_ID_MAKE (AMF_SERVICE, id); + msg.header.id = SERVICE_ID_MAKE (AMF_SERVICE, msg_id); iov[0].iov_base = &msg; iov[0].iov_len = sizeof (msg); @@ -1328,6 +1358,8 @@ void amf_msg_mcast (int id, void *buf, size_t len) dprintf("Unable to send %d bytes\n", msg.header.size); openais_exit_error (AIS_DONE_FATAL_ERR); } + + return res; } void amf_util_init (void)