diff --git a/exec/evt.c b/exec/evt.c index cdb131dc..dee9e746 100644 --- a/exec/evt.c +++ b/exec/evt.c @@ -65,6 +65,8 @@ static int message_handler_req_lib_activatepoll (struct conn_info *conn_info, void *message); static int lib_evt_open_channel(struct conn_info *conn_info, void *message); +static int lib_evt_open_channel_async(struct conn_info *conn_info, + void *message); static int lib_evt_close_channel(struct conn_info *conn_info, void *message); static int lib_evt_event_subscribe(struct conn_info *conn_info, void *message); @@ -101,6 +103,11 @@ static struct libais_handler evt_libais_handlers[] = { .response_id = MESSAGE_RES_EVT_OPEN_CHANNEL, }, { + .libais_handler_fn = lib_evt_open_channel_async, + .response_size = sizeof(struct res_evt_channel_open), + .response_id = MESSAGE_RES_EVT_OPEN_CHANNEL, + }, + { .libais_handler_fn = lib_evt_close_channel, .response_size = sizeof(struct res_evt_channel_close), .response_id = MESSAGE_RES_EVT_CLOSE_CHANNEL, @@ -123,7 +130,7 @@ static struct libais_handler evt_libais_handlers[] = { { .libais_handler_fn = lib_evt_event_clear_retentiontime, .response_size = sizeof(struct res_evt_event_clear_retentiontime), - .response_id = MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME, + .response_id = MESSAGE_RES_EVT_CLEAR_RETENTIONTIME, }, { .libais_handler_fn = lib_evt_event_data_get, @@ -1997,10 +2004,14 @@ static int lib_evt_open_channel(struct conn_info *conn_info, void *message) ocp->ocp_chan_name = req->ico_channel_name; ocp->ocp_open_flag = req->ico_open_flag; ocp->ocp_conn_info = conn_info; + ocp->ocp_c_handle = req->ico_c_handle; ocp->ocp_timer_handle = 0; list_init(&ocp->ocp_entry); list_add_tail(&ocp->ocp_entry, &open_pending); if (req->ico_timeout != 0) { + /* + * Time in nanoseconds - convert to miliseconds + */ msec_in_future = (uint32_t)(req->ico_timeout / 1000000ULL); ret = poll_timer_add(aisexec_poll_handle, msec_in_future, @@ -2025,6 +2036,83 @@ open_return: return 0; } +/* + * Handler for saEvtChannelOpen + */ +static int lib_evt_open_channel_async(struct conn_info *conn_info, + void *message) +{ + SaErrorT error; + struct req_evt_channel_open *req; + struct res_evt_channel_open res; + struct open_chan_pending *ocp; + int msec_in_future; + int ret; + + req = message; + + + log_printf(CHAN_OPEN_DEBUG, + "saEvtChannelOpenAsync (Async Open channel request)\n"); + log_printf(CHAN_OPEN_DEBUG, + "handle 0x%x, to 0x%x\n", + req->ico_c_handle, + req->ico_invocation); + log_printf(CHAN_OPEN_DEBUG, "flags %x, channel name(%d) %s\n", + req->ico_open_flag, + req->ico_channel_name.length, + req->ico_channel_name.value); + /* + * Open the channel. + * + */ + error = evt_open_channel(&req->ico_channel_name, req->ico_open_flag); + + if (error != SA_AIS_OK) { + goto open_return; + } + + ocp = malloc(sizeof(struct open_chan_pending)); + if (!ocp) { + error = SA_AIS_ERR_NO_MEMORY; + goto open_return; + } + + ocp->ocp_async = 1; + ocp->ocp_invocation = req->ico_invocation; + ocp->ocp_c_handle = req->ico_c_handle; + ocp->ocp_chan_name = req->ico_channel_name; + ocp->ocp_open_flag = req->ico_open_flag; + ocp->ocp_conn_info = conn_info; + ocp->ocp_timer_handle = 0; + list_init(&ocp->ocp_entry); + list_add_tail(&ocp->ocp_entry, &open_pending); + if (req->ico_timeout != 0) { + /* + * Time in nanoseconds - convert to miliseconds + */ + msec_in_future = (uint32_t)(req->ico_timeout / 1000000ULL); + ret = poll_timer_add(aisexec_poll_handle, + msec_in_future, + ocp, + chan_open_timeout, + &ocp->ocp_timer_handle); + if (ret != 0) { + log_printf(LOG_LEVEL_WARNING, + "Error setting timeout for open channel %s\n", + req->ico_channel_name.value); + } + } + +open_return: + res.ico_head.size = sizeof(res); + res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL; + res.ico_head.error = error; + libais_send_response (conn_info, &res, sizeof(res)); + + return 0; +} + /* @@ -2420,7 +2508,7 @@ static int lib_evt_event_clear_retentiontime(struct conn_info *conn_info, } res.iec_head.size = sizeof(res); - res.iec_head.id = MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME; + res.iec_head.id = MESSAGE_RES_EVT_CLEAR_RETENTIONTIME; res.iec_head.error = error; libais_send_response (conn_info, &res, sizeof(res)); @@ -2981,7 +3069,6 @@ static void evt_chan_open_finish(struct open_chan_pending *ocp, struct event_svr_channel_instance *eci) { uint32_t handle; - struct res_evt_channel_open res; struct event_svr_channel_open *eco; SaErrorT error; struct libevt_ci *esip = &ocp->ocp_conn_info->ais_ci.u.libevt_ci; @@ -2990,7 +3077,7 @@ static void evt_chan_open_finish(struct open_chan_pending *ocp, log_printf(CHAN_OPEN_DEBUG, "Open channel finish %s\n", getSaNameT(&ocp->ocp_chan_name)); - if (!ocp->ocp_async && ocp->ocp_timer_handle) { + if (ocp->ocp_timer_handle) { ret = poll_timer_delete(aisexec_poll_handle, ocp->ocp_timer_handle); if (ret != 0 ) { log_printf(LOG_LEVEL_WARNING, @@ -3032,15 +3119,28 @@ static void evt_chan_open_finish(struct open_chan_pending *ocp, * open instance for later subscriptions, etc. */ saHandleInstancePut(&esip->esi_hdb, handle); + open_return: log_printf(CHAN_OPEN_DEBUG, "Open channel finish %s send response %d\n", getSaNameT(&ocp->ocp_chan_name), error); - res.ico_head.size = sizeof(res); - res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL; - res.ico_head.error = error; - res.ico_channel_handle = handle; - libais_send_response (ocp->ocp_conn_info, &res, sizeof(res)); + if (ocp->ocp_async) { + struct res_evt_open_chan_async resa; + resa.ica_head.size = sizeof(resa); + resa.ica_head.id = MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK; + resa.ica_head.error = error; + resa.ica_channel_handle = handle; + resa.ica_c_handle = ocp->ocp_c_handle; + resa.ica_invocation = ocp->ocp_invocation; + libais_send_response (ocp->ocp_conn_info, &resa, sizeof(resa)); + } else { + struct res_evt_channel_open res; + res.ico_head.size = sizeof(res); + res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL; + res.ico_head.error = error; + res.ico_channel_handle = handle; + libais_send_response (ocp->ocp_conn_info, &res, sizeof(res)); + } if (ret == 0) { list_del(&ocp->ocp_entry); diff --git a/include/ipc_evt.h b/include/ipc_evt.h index 0d4f6d78..7d7325ef 100644 --- a/include/ipc_evt.h +++ b/include/ipc_evt.h @@ -42,6 +42,7 @@ enum req_evt_types { MESSAGE_REQ_EVT_OPEN_CHANNEL = 1, + MESSAGE_REQ_EVT_OPEN_CHANNEL_ASYNC, MESSAGE_REQ_EVT_CLOSE_CHANNEL, MESSAGE_REQ_EVT_SUBSCRIBE, MESSAGE_REQ_EVT_UNSUBSCRIBE, @@ -64,6 +65,7 @@ enum res_evt_types { /* * MESSAGE_REQ_EVT_OPEN_CHANNEL + * MESSAGE_REQ_EVT_OPEN_CHANNEL_ASYNC * * ico_head Request head * ico_open_flag: Channel open flags @@ -78,13 +80,18 @@ struct req_evt_channel_open { struct req_header ico_head; SaUint8T ico_open_flag; SaNameT ico_channel_name; - SaEvtChannelHandleT ico_c_handle; /* client chan handle */ - SaTimeT ico_timeout; /* open only */ - SaInvocationT ico_invocation; /* open async only */ + SaEvtChannelHandleT ico_c_handle; + SaTimeT ico_timeout; + SaInvocationT ico_invocation; }; /* * MESSAGE_RES_EVT_OPEN_CHANNEL + * + * Used by both the blocing and non-blocking + * versions. Only the error code in the header is used by the async + * open. The channel handle will be returnd via the channel open + * callback. * * * ico_head: Results head @@ -102,10 +109,20 @@ struct res_evt_channel_open { /* * MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK * - * TODO: Define this + * ica_head: Results head. + * ica_c_handle: Lib size channel handle. So we can look + * up the new open channel info from the callback. + * ica_channel_handle: Server side handle. + * ica_invocation: Supplied by the user in the open call. Passed to + * the callback so the user can assocate the callback + * to the particular open. */ struct res_evt_open_chan_async { - struct res_header ico_head; + struct res_header ica_head; + SaEvtChannelHandleT ica_c_handle; + uint32_t ica_channel_handle; + SaInvocationT ica_invocation; + }; diff --git a/lib/evt.c b/lib/evt.c index bc0216ae..987a60e5 100644 --- a/lib/evt.c +++ b/lib/evt.c @@ -1,6 +1,6 @@ /* - * Copyright (c) 2004 Mark Haverkamp - * Copyright (c) 2004 Open Source Development Lab + * Copyright (c) 2004-2005 Mark Haverkamp + * Copyright (c) 2004-2005 Open Source Development Lab * * All rights reserved. * @@ -617,10 +617,32 @@ saEvtDispatch( break; case MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK: - /* - * TODO: do something here + { + struct res_evt_open_chan_async *resa = + (struct res_evt_open_chan_async *)dispatch_data; + struct event_channel_instance *eci; + + /* + * Check for errors. If there are none, then + * look up the local channel via the handle that we + * got from the callback request. All we need to do + * is place in the handle from the server side and then + * we can call the callback. */ - printf("Dispatch: Open callback\n"); + error = resa->ica_head.error; + if (error == SA_AIS_OK) { + error = saHandleInstanceGet(&channel_handle_db, + resa->ica_c_handle, (void*)&eci); + if (error == SA_AIS_OK) { + eci->eci_svr_channel_handle = resa->ica_channel_handle; + saHandleInstancePut (&channel_handle_db, + resa->ica_c_handle); + } + } + callbacks.saEvtChannelOpenCallback(resa->ica_invocation, + resa->ica_c_handle, error); + + } break; default: @@ -903,16 +925,102 @@ chan_close_done: return error; } +/* + * The saEvtChannelOpenAsync() function creates a new event channel or open an + * existing channel. The saEvtChannelOpenAsync() function is a non-blocking + * operation. A new event channel handle is returned in the channel open + * callback function (SaEvtChannelOpenCallbackT). + */ SaAisErrorT saEvtChannelOpenAsync(SaEvtHandleT evt_handle, SaInvocationT invocation, const SaNameT *channel_name, SaEvtChannelOpenFlagsT channel_open_flags) { - /* - * TODO: Fill in code + struct event_instance *evti; + struct req_evt_channel_open req; + struct res_evt_channel_open res; + struct event_channel_instance *eci; + SaEvtChannelHandleT channel_handle; + SaAisErrorT error; + + error = saHandleInstanceGet(&evt_instance_handle_db, evt_handle, + (void*)&evti); + + if (error != SA_AIS_OK) { + goto chan_open_done; + } + + /* + * create a handle for this open channel */ - return SA_AIS_ERR_LIBRARY; + error = saHandleCreate(&channel_handle_db, sizeof(*eci), + &channel_handle); + if (error != SA_AIS_OK) { + goto chan_open_put; + } + + error = saHandleInstanceGet(&channel_handle_db, channel_handle, + (void*)&eci); + if (error != SA_AIS_OK) { + saHandleDestroy(&channel_handle_db, channel_handle); + goto chan_open_put; + } + + + /* + * Send the request to the server. The response isn't the open channel, + * just an ack. The open channel will be returned when the channel open + * callback is called. + */ + req.ico_head.size = sizeof(req); + req.ico_head.id = MESSAGE_REQ_EVT_OPEN_CHANNEL_ASYNC; + req.ico_c_handle = channel_handle; + req.ico_timeout = 0; + req.ico_invocation = invocation; + req.ico_open_flag = channel_open_flags; + req.ico_channel_name = *channel_name; + + + pthread_mutex_lock(&evti->ei_mutex); + + error = saSendRetry(evti->ei_fd, &req, sizeof(req), MSG_NOSIGNAL); + if (error != SA_AIS_OK) { + pthread_mutex_unlock (&evti->ei_mutex); + goto chan_open_free; + } + error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, + MESSAGE_RES_EVT_OPEN_CHANNEL); + + pthread_mutex_unlock (&evti->ei_mutex); + + if (error != SA_AIS_OK) { + goto chan_open_free; + } + + error = res.ico_head.error; + if (error != SA_AIS_OK) { + goto chan_open_free; + } + + eci->eci_svr_channel_handle = 0; /* filled in by callback */ + eci->eci_channel_name = *channel_name; + eci->eci_open_flags = channel_open_flags; + eci->eci_instance_handle = evt_handle; + eci->eci_closing = 0; + pthread_mutex_init(&eci->eci_mutex, NULL); + saHandleInstancePut (&evt_instance_handle_db, evt_handle); + saHandleInstancePut (&channel_handle_db, channel_handle); + + return SA_AIS_OK; + +chan_open_free: + saHandleDestroy(&channel_handle_db, channel_handle); + saHandleInstancePut (&channel_handle_db, channel_handle); +chan_open_put: + saHandleInstancePut (&evt_instance_handle_db, evt_handle); +chan_open_done: + return error; } SaAisErrorT