IPC: handle a connection disconnect from the server better

Only problem with SOCKET.

Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
This commit is contained in:
Angus Salkeld 2012-05-02 10:15:19 +10:00
parent 78fb4ad682
commit 9f09aba8dd
4 changed files with 110 additions and 25 deletions

View File

@ -200,6 +200,7 @@ int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service *s);
int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data);
int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data);
struct qb_ipcs_connection* qb_ipcs_connection_alloc(struct qb_ipcs_service *s);
void qb_ipcs_sockets_disconnect(struct qb_ipcs_connection *c);
int32_t qb_ipcs_process_request(struct qb_ipcs_service *s,
struct qb_ipc_request_header *hdr);

View File

@ -221,9 +221,15 @@ qb_ipc_us_recv_ready(struct qb_ipc_one_way * one_way, int32_t ms_timeout)
return -EAGAIN;
} else if (poll_events == -1) {
return -errno;
} else if (poll_events == 1 && (ufds.revents & (POLLERR | POLLHUP))) {
} else if (poll_events == 1 && (ufds.revents & POLLERR)) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLERR", one_way->u.us.sock);
return -ENOTCONN;
} else if (poll_events == 1 && (ufds.revents & POLLHUP)) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLHUP", one_way->u.us.sock);
return -ENOTCONN;
} else if (poll_events == 1 && (ufds.revents & POLLNVAL)) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLNVAL", one_way->u.us.sock);
return -ENOTCONN;
}
return 0;
}
@ -643,27 +649,22 @@ handle_new_connection(struct qb_ipcs_service *s,
c->state = QB_IPCS_CONNECTION_ACTIVE;
qb_list_add(&c->list, &s->connections);
if (s->needs_sock_for_poll) {
if (s->needs_sock_for_poll || s->type == QB_IPC_SOCKET) {
qb_ipcs_connection_ref(c);
res = s->poll_fns.dispatch_add(s->poll_priority,
c->setup.u.us.sock,
POLLIN | POLLPRI | POLLNVAL,
c,
qb_ipcs_dispatch_connection_request);
}
if (s->type == QB_IPC_SOCKET) {
c->request.u.us.sock = c->setup.u.us.sock;
c->response.u.us.sock = c->setup.u.us.sock;
res = s->poll_fns.dispatch_add(s->poll_priority,
c->request.u.us.sock,
POLLIN | POLLPRI | POLLNVAL,
c,
qb_ipcs_dispatch_connection_request);
if (res < 0) {
qb_util_log(LOG_ERR,
"Error adding socket to mainloop.");
}
}
if (s->type == QB_IPC_SOCKET) {
c->request.u.us.sock = c->setup.u.us.sock;
c->response.u.us.sock = c->setup.u.us.sock;
}
send_response:
response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
@ -974,14 +975,35 @@ qb_ipc_us_q_len_get(struct qb_ipc_one_way *one_way)
return qb_atomic_int_get(&ctl->sent);
}
void
qb_ipcs_sockets_disconnect(struct qb_ipcs_connection *c)
{
int sock = -1;
qb_enter();
if (c->service->needs_sock_for_poll && c->setup.u.us.sock > 0) {
sock = c->setup.u.us.sock;
qb_ipcc_us_sock_close(sock);
c->setup.u.us.sock = -1;
}
if (c->request.type == QB_IPC_SOCKET) {
sock = c->request.u.us.sock;
}
if (sock > 0) {
(void)c->service->poll_fns.dispatch_del(sock);
qb_ipcs_connection_unref(c);
}
}
static void
qb_ipcs_us_disconnect(struct qb_ipcs_connection *c)
{
qb_enter();
munmap(c->request.u.us.shared_data, sizeof(struct ipc_us_control));
unlink(c->request.u.us.shared_file_name);
close(c->request.u.us.sock);
close(c->event.u.us.sock);
qb_ipcc_us_sock_close(c->request.u.us.sock);
qb_ipcc_us_sock_close(c->event.u.us.sock);
}
void

View File

@ -511,12 +511,8 @@ qb_ipcs_disconnect(struct qb_ipcs_connection *c)
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->state = QB_IPCS_CONNECTION_INACTIVE;
c->service->stats.closed_connections++;
if (c->service->needs_sock_for_poll && c->setup.u.us.sock > 0) {
(void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock);
qb_ipcc_us_sock_close(c->setup.u.us.sock);
c->setup.u.us.sock = -1;
qb_ipcs_connection_unref(c);
}
qb_ipcs_sockets_disconnect(c);
/* return early as it's an incomplete connection.
*/
return;
@ -526,12 +522,7 @@ qb_ipcs_disconnect(struct qb_ipcs_connection *c)
c->service->stats.active_connections--;
c->service->stats.closed_connections++;
if (c->service->needs_sock_for_poll && c->setup.u.us.sock > 0) {
(void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock);
qb_ipcc_us_sock_close(c->setup.u.us.sock);
c->setup.u.us.sock = -1;
qb_ipcs_connection_unref(c);
}
qb_ipcs_sockets_disconnect(c);
}
if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) {
res = 0;
@ -668,6 +659,10 @@ qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data)
int32_t recvd = 0;
ssize_t avail;
if (revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "NVAL conn:%p fd:%d", c, fd);
return -EINVAL;
}
if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "HUP conn:%p fd:%d", c, fd);
qb_ipcs_disconnect(c);

View File

@ -46,6 +46,8 @@ enum my_msg_ids {
IPC_MSG_RES_BULK_EVENTS,
IPC_MSG_REQ_SERVER_FAIL,
IPC_MSG_RES_SERVER_FAIL,
IPC_MSG_REQ_SERVER_DISCONNECT,
IPC_MSG_RES_SERVER_DISCONNECT,
};
/* Test Cases
@ -71,6 +73,7 @@ static qb_ipcs_service_t* s1;
static int32_t turn_on_fc = QB_FALSE;
static int32_t fc_enabled = 89;
static int32_t send_event_on_created = QB_FALSE;
static int32_t disconnect_after_created = QB_FALSE;
static int32_t num_bulk_events = 10;
static int32_t
@ -134,6 +137,8 @@ s1_msg_process_fn(qb_ipcs_connection_t *c,
} else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) {
exit(0);
} else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) {
qb_ipcs_disconnect(c);
}
return 0;
}
@ -703,6 +708,64 @@ START_TEST(test_ipc_event_on_created_us)
}
END_TEST
static void
test_ipc_disconnect_after_created(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
struct iovec iov[1];
int32_t c = 0;
int32_t j = 0;
pid_t pid;
int32_t res;
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, MAX_MSG_SIZE);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
sleep(1);
c++;
}
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn));
req_header.id = IPC_MSG_REQ_SERVER_DISCONNECT;
req_header.size = sizeof(struct qb_ipc_request_header);
iov[0].iov_len = req_header.size;
iov[0].iov_base = &req_header;
res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), -1);
/*
* confirm we get -ENOTCONN
*/
ck_assert_int_eq(res, -ENOTCONN);
ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn));
qb_ipcc_disconnect(conn);
stop_process(pid);
}
START_TEST(test_ipc_disconnect_after_created_us)
{
qb_enter();
disconnect_after_created = QB_TRUE;
ipc_type = QB_IPC_SOCKET;
ipc_name = __func__;
test_ipc_disconnect_after_created();
qb_leave();
}
END_TEST
static void
test_ipc_server_fail(void)
{
@ -905,6 +968,10 @@ ipc_suite(void)
tcase_add_test(tc, test_ipc_event_on_created_us);
suite_add_tcase(s, tc);
tc = tcase_create("ipc_disconnect_after_created_us");
tcase_add_test(tc, test_ipc_disconnect_after_created_us);
suite_add_tcase(s, tc);
return s;
}