Qdevices-net improvements

- Implement node list
- Implement send buffer list
- Add nodelist message type
- Add ring_id, config_version, data_center_id, node_state, node_info,
  node_list_type and vote msg options

Signed-off-by: Jan Friesse <jfriesse@redhat.com>
This commit is contained in:
Jan Friesse 2015-09-09 16:10:13 +02:00
parent fc9817fec4
commit e221441b6c
23 changed files with 1842 additions and 391 deletions

View File

@ -39,12 +39,14 @@ sbin_PROGRAMS = corosync-qnetd corosync-qdevice-net
sbin_SCRIPTS = corosync-qnetd-certutil
corosync_qnetd_SOURCES = corosync-qnetd.c dynar.c msg.c msgio.c nss-sock.c \
qnetd-client.c qnetd-clients-list.c qnetd-log.c \
qnetd-poll-array.c timer-list.c tlv.c
qnetd-client.c qnetd-client-list.c qnetd-log.c \
qnetd-poll-array.c timer-list.c tlv.c send-buffer-list.c \
node-list.c
corosync_qdevice_net_SOURCES = corosync-qdevice-net.c dynar.c msg.c msgio.c nss-sock.c \
qnetd-client.c qnetd-clients-list.c qnetd-log.c \
qnetd-poll-array.c timer-list.c tlv.c
qnetd-client.c qnetd-client-list.c qnetd-log.c \
qnetd-poll-array.c timer-list.c tlv.c send-buffer-list.c \
node-list.c
corosync_qnetd_CFLAGS = $(nss_CFLAGS)
corosync_qnetd_LDADD = $(nss_LIBS)

View File

@ -48,6 +48,7 @@
#include <getopt.h>
#include <err.h>
#include <keyhi.h>
#include <poll.h>
/*
* Needed for creating nspr handle from unix fd
@ -65,6 +66,7 @@
#include "msgio.h"
#include "qnetd-log.h"
#include "timer-list.h"
#include "send-buffer-list.h"
#define NSS_DB_DIR COROSYSCONFDIR "/qdevice-net/nssdb"
@ -80,6 +82,8 @@
#define QDEVICE_NET_NSS_CLIENT_CERT_NICKNAME "Cluster Cert"
#define QDEVICE_NET_VOTEQUORUM_DEVICE_NAME "QdeviceNet"
#define QDEVICE_NET_MAX_SEND_BUFFERS 10
#define qdevice_net_log qnetd_log
#define qdevice_net_log_nss qnetd_log_nss
#define qdevice_net_log_init qnetd_log_init
@ -105,14 +109,9 @@ struct qdevice_net_instance {
size_t max_receive_size;
size_t min_send_size;
struct dynar receive_buffer;
struct dynar send_buffer;
struct dynar echo_request_send_buffer;
int sending_msg;
struct send_buffer_list send_buffer_list;
int skipping_msg;
int sending_echo_request_msg;
size_t msg_already_received_bytes;
size_t msg_already_sent_bytes;
size_t echo_request_msg_already_sent_bytes;
enum qdevice_net_state state;
uint32_t expected_msg_seq_num;
uint32_t echo_request_expected_msg_seq_num;
@ -138,7 +137,8 @@ static votequorum_ring_id_t global_last_received_ring_id;
static void
err_nss(void) {
errx(1, "nss error %d: %s", PR_GetError(), PR_ErrorToString(PR_GetError(), PR_LANGUAGE_I_DEFAULT));
errx(1, "nss error %d: %s", PR_GetError(), PR_ErrorToString(PR_GetError(),
PR_LANGUAGE_I_DEFAULT));
}
static SECStatus
@ -167,47 +167,35 @@ qdevice_net_nss_get_client_auth_data(void *arg, PRFileDesc *sock, struct CERTDis
return (NSS_GetClientAuthData(arg, sock, caNames, pRetCert, pRetKey));
}
static int
qdevice_net_schedule_send(struct qdevice_net_instance *instance)
{
if (instance->sending_msg) {
/*
* Msg is already scheduled for send
*/
return (-1);
}
instance->msg_already_sent_bytes = 0;
instance->sending_msg = 1;
return (0);
}
static int
qdevice_net_schedule_echo_request_send(struct qdevice_net_instance *instance)
{
if (instance->sending_echo_request_msg) {
qdevice_net_log(LOG_ERR, "Can't schedule send of echo request msg, because "
"previous message wasn't yet sent. Disconnecting from server.");
return (-1);
}
struct send_buffer_list_entry *send_buffer;
if (instance->echo_reply_received_msg_seq_num != instance->echo_request_expected_msg_seq_num) {
if (instance->echo_reply_received_msg_seq_num !=
instance->echo_request_expected_msg_seq_num) {
qdevice_net_log(LOG_ERR, "Server didn't send echo reply message on time. "
"Disconnecting from server.");
return (-1);
}
send_buffer = send_buffer_list_get_new(&instance->send_buffer_list);
if (send_buffer == NULL) {
qdevice_net_log(LOG_CRIT, "Can't allocate send list buffer for reply msg.");
return (-1);
}
instance->echo_request_expected_msg_seq_num++;
if (msg_create_echo_request(&instance->echo_request_send_buffer, 1, instance->echo_request_expected_msg_seq_num) == -1) {
if (msg_create_echo_request(&send_buffer->buffer, 1,
instance->echo_request_expected_msg_seq_num) == -1) {
qdevice_net_log(LOG_ERR, "Can't allocate send buffer for echo request msg");
return (-1);
}
instance->echo_request_msg_already_sent_bytes = 0;
instance->sending_echo_request_msg = 1;
send_buffer_list_put(&instance->send_buffer_list, send_buffer);
return (0);
}
@ -241,7 +229,8 @@ qdevice_net_log_msg_decode_error(int ret)
* 1 - Use TLS
*/
static int
qdevice_net_check_tls_compatibility(enum tlv_tls_supported server_tls, enum tlv_tls_supported client_tls)
qdevice_net_check_tls_compatibility(enum tlv_tls_supported server_tls,
enum tlv_tls_supported client_tls)
{
int res;
@ -275,7 +264,8 @@ qdevice_net_check_tls_compatibility(enum tlv_tls_supported server_tls, enum tlv_
}
static int
qdevice_net_msg_received_preinit(struct qdevice_net_instance *instance, const struct msg_decoded *msg)
qdevice_net_msg_received_preinit(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
qdevice_net_log(LOG_ERR, "Received unexpected preinit message. Disconnecting from server");
@ -284,11 +274,13 @@ qdevice_net_msg_received_preinit(struct qdevice_net_instance *instance, const st
}
static int
qdevice_net_msg_check_seq_number(struct qdevice_net_instance *instance, const struct msg_decoded *msg)
qdevice_net_msg_check_seq_number(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
if (!msg->seq_number_set || msg->seq_number != instance->expected_msg_seq_num) {
qdevice_net_log(LOG_ERR, "Received message doesn't contain seq_number or it's not expected one.");
qdevice_net_log(LOG_ERR, "Received message doesn't contain seq_number or "
"it's not expected one.");
return (-1);
}
@ -297,7 +289,8 @@ qdevice_net_msg_check_seq_number(struct qdevice_net_instance *instance, const st
}
static int
qdevice_net_msg_check_echo_reply_seq_number(struct qdevice_net_instance *instance, const struct msg_decoded *msg)
qdevice_net_msg_check_echo_reply_seq_number(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
if (!msg->seq_number_set) {
@ -307,7 +300,8 @@ qdevice_net_msg_check_echo_reply_seq_number(struct qdevice_net_instance *instanc
}
if (msg->seq_number != instance->echo_request_expected_msg_seq_num) {
qdevice_net_log(LOG_ERR, "Server doesn't replied in expected time. Closing connection");
qdevice_net_log(LOG_ERR, "Server doesn't replied in expected time. "
"Closing connection");
return (-1);
}
@ -322,12 +316,20 @@ qdevice_net_send_init(struct qdevice_net_instance *instance)
size_t no_supported_msgs;
enum tlv_opt_type *supported_opts;
size_t no_supported_opts;
struct send_buffer_list_entry *send_buffer;
tlv_get_supported_options(&supported_opts, &no_supported_opts);
msg_get_supported_messages(&supported_msgs, &no_supported_msgs);
instance->expected_msg_seq_num++;
if (msg_create_init(&instance->send_buffer, 1, instance->expected_msg_seq_num,
send_buffer = send_buffer_list_get_new(&instance->send_buffer_list);
if (send_buffer == NULL) {
qdevice_net_log(LOG_ERR, "Can't allocate send list buffer for init msg");
return (-1);
}
if (msg_create_init(&send_buffer->buffer, 1, instance->expected_msg_seq_num,
supported_msgs, no_supported_msgs, supported_opts, no_supported_opts,
instance->node_id) == 0) {
qdevice_net_log(LOG_ERR, "Can't allocate send buffer for init msg");
@ -335,11 +337,7 @@ qdevice_net_send_init(struct qdevice_net_instance *instance)
return (-1);
}
if (qdevice_net_schedule_send(instance) != 0) {
qdevice_net_log(LOG_ERR, "Can't schedule send of init msg");
return (-1);
}
send_buffer_list_put(&instance->send_buffer_list, send_buffer);
instance->state = QDEVICE_NET_STATE_WAITING_INIT_REPLY;
@ -348,12 +346,15 @@ qdevice_net_send_init(struct qdevice_net_instance *instance)
static int
qdevice_net_msg_received_preinit_reply(struct qdevice_net_instance *instance, const struct msg_decoded *msg)
qdevice_net_msg_received_preinit_reply(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
int res;
struct send_buffer_list_entry *send_buffer;
if (instance->state != QDEVICE_NET_STATE_WAITING_PREINIT_REPLY) {
qdevice_net_log(LOG_ERR, "Received unexpected preinit reply message. Disconnecting from server");
qdevice_net_log(LOG_ERR, "Received unexpected preinit reply message. "
"Disconnecting from server");
return (-1);
}
@ -366,7 +367,8 @@ qdevice_net_msg_received_preinit_reply(struct qdevice_net_instance *instance, co
* Check TLS support
*/
if (!msg->tls_supported_set || !msg->tls_client_cert_required_set) {
qdevice_net_log(LOG_ERR, "Required tls_supported or tls_client_cert_required option is unset");
qdevice_net_log(LOG_ERR, "Required tls_supported or tls_client_cert_required "
"option is unset");
return (-1);
}
@ -381,18 +383,23 @@ qdevice_net_msg_received_preinit_reply(struct qdevice_net_instance *instance, co
/*
* Start TLS
*/
send_buffer = send_buffer_list_get_new(&instance->send_buffer_list);
if (send_buffer == NULL) {
qdevice_net_log(LOG_ERR, "Can't allocate send list buffer for "
"starttls msg");
return (-1);
}
instance->expected_msg_seq_num++;
if (msg_create_starttls(&instance->send_buffer, 1, instance->expected_msg_seq_num) == 0) {
if (msg_create_starttls(&send_buffer->buffer, 1,
instance->expected_msg_seq_num) == 0) {
qdevice_net_log(LOG_ERR, "Can't allocate send buffer for starttls msg");
return (-1);
}
if (qdevice_net_schedule_send(instance) != 0) {
qdevice_net_log(LOG_ERR, "Can't schedule send of starttls msg");
return (-1);
}
send_buffer_list_put(&instance->send_buffer_list, send_buffer);
instance->state = QDEVICE_NET_STATE_WAITING_STARTTLS_BEING_SENT;
} else if (res == 0) {
@ -405,13 +412,16 @@ qdevice_net_msg_received_preinit_reply(struct qdevice_net_instance *instance, co
}
static int
qdevice_net_msg_received_init_reply(struct qdevice_net_instance *instance, const struct msg_decoded *msg)
qdevice_net_msg_received_init_reply(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
size_t zi;
int res;
struct send_buffer_list_entry *send_buffer;
if (instance->state != QDEVICE_NET_STATE_WAITING_INIT_REPLY) {
qdevice_net_log(LOG_ERR, "Received unexpected init reply message. Disconnecting from server");
qdevice_net_log(LOG_ERR, "Received unexpected init reply message. "
"Disconnecting from server");
return (-1);
}
@ -421,13 +431,15 @@ qdevice_net_msg_received_init_reply(struct qdevice_net_instance *instance, const
}
if (!msg->server_maximum_request_size_set || !msg->server_maximum_reply_size_set) {
qdevice_net_log(LOG_ERR, "Required maximum_request_size or maximum_reply_size option is unset");
qdevice_net_log(LOG_ERR, "Required maximum_request_size or maximum_reply_size "
"option is unset");
return (-1);
}
if (msg->supported_messages == NULL || msg->supported_options == NULL) {
qdevice_net_log(LOG_ERR, "Required supported messages or supported options option is unset");
qdevice_net_log(LOG_ERR, "Required supported messages or supported options "
"option is unset");
return (-1);
}
@ -440,16 +452,16 @@ qdevice_net_msg_received_init_reply(struct qdevice_net_instance *instance, const
if (msg->server_maximum_request_size < instance->min_send_size) {
qdevice_net_log(LOG_ERR,
"Server accepts maximum %zu bytes message but this client minimum is %zu bytes.",
msg->server_maximum_request_size, instance->min_send_size);
"Server accepts maximum %zu bytes message but this client minimum "
"is %zu bytes.", msg->server_maximum_request_size, instance->min_send_size);
return (-1);
}
if (msg->server_maximum_reply_size > instance->max_receive_size) {
qdevice_net_log(LOG_ERR,
"Server may send message up to %zu bytes message but this client maximum is %zu bytes.",
msg->server_maximum_reply_size, instance->max_receive_size);
"Server may send message up to %zu bytes message but this client maximum "
"is %zu bytes.", msg->server_maximum_reply_size, instance->max_receive_size);
return (-1);
}
@ -458,8 +470,8 @@ qdevice_net_msg_received_init_reply(struct qdevice_net_instance *instance, const
* Change buffer sizes
*/
dynar_set_max_size(&instance->receive_buffer, msg->server_maximum_reply_size);
dynar_set_max_size(&instance->send_buffer, msg->server_maximum_request_size);
dynar_set_max_size(&instance->echo_request_send_buffer, msg->server_maximum_request_size);
send_buffer_list_set_max_buffer_size(&instance->send_buffer_list,
msg->server_maximum_request_size);
/*
@ -482,20 +494,23 @@ qdevice_net_msg_received_init_reply(struct qdevice_net_instance *instance, const
/*
* Send set options message
*/
send_buffer = send_buffer_list_get_new(&instance->send_buffer_list);
if (send_buffer == NULL) {
qdevice_net_log(LOG_ERR, "Can't allocate send list buffer for set option msg");
return (-1);
}
instance->expected_msg_seq_num++;
if (msg_create_set_option(&instance->send_buffer, 1, instance->expected_msg_seq_num,
if (msg_create_set_option(&send_buffer->buffer, 1, instance->expected_msg_seq_num,
1, instance->decision_algorithm, 1, instance->heartbeat_interval) == 0) {
qdevice_net_log(LOG_ERR, "Can't allocate send buffer for set option msg");
return (-1);
}
if (qdevice_net_schedule_send(instance) != 0) {
qdevice_net_log(LOG_ERR, "Can't schedule send of set option msg");
return (-1);
}
send_buffer_list_put(&instance->send_buffer_list, send_buffer);
instance->state = QDEVICE_NET_STATE_WAITING_SET_OPTION_REPLY;
@ -503,7 +518,8 @@ qdevice_net_msg_received_init_reply(struct qdevice_net_instance *instance, const
}
static int
qdevice_net_msg_received_stattls(struct qdevice_net_instance *instance, const struct msg_decoded *msg)
qdevice_net_msg_received_stattls(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
qdevice_net_log(LOG_ERR, "Received unexpected starttls message. Disconnecting from server");
@ -512,24 +528,28 @@ qdevice_net_msg_received_stattls(struct qdevice_net_instance *instance, const st
}
static int
qdevice_net_msg_received_server_error(struct qdevice_net_instance *instance, const struct msg_decoded *msg)
qdevice_net_msg_received_server_error(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
if (!msg->reply_error_code_set) {
qdevice_net_log(LOG_ERR, "Received server error without error code set. Disconnecting from server");
qdevice_net_log(LOG_ERR, "Received server error without error code set. "
"Disconnecting from server");
} else {
qdevice_net_log(LOG_ERR, "Received server error %"PRIu16". Disconnecting from server",
msg->reply_error_code);
qdevice_net_log(LOG_ERR, "Received server error %"PRIu16". "
"Disconnecting from server", msg->reply_error_code);
}
return (-1);
}
static int
qdevice_net_msg_received_set_option(struct qdevice_net_instance *instance, const struct msg_decoded *msg)
qdevice_net_msg_received_set_option(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
qdevice_net_log(LOG_ERR, "Received unexpected set option message. Disconnecting from server");
qdevice_net_log(LOG_ERR, "Received unexpected set option message. "
"Disconnecting from server");
return (-1);
}
@ -552,8 +572,180 @@ qdevice_net_timer_send_heartbeat(void *data1, void *data2)
return (-1);
}
static uint32_t
qdevice_net_autogenerate_node_id(const char *addr, int clear_node_high_byte)
{
struct addrinfo *ainfo;
struct addrinfo ahints;
int ret, i;
memset(&ahints, 0, sizeof(ahints));
ahints.ai_socktype = SOCK_DGRAM;
ahints.ai_protocol = IPPROTO_UDP;
/*
* Hardcoded AF_INET because autogenerated nodeid is valid only for ipv4
*/
ahints.ai_family = AF_INET;
ret = getaddrinfo(addr, NULL, &ahints, &ainfo);
if (ret != 0)
return (0);
if (ainfo->ai_family != AF_INET) {
freeaddrinfo(ainfo);
return (0);
}
memcpy(&i, &((struct sockaddr_in *)ainfo->ai_addr)->sin_addr, sizeof(struct in_addr));
freeaddrinfo(ainfo);
ret = htonl(i);
if (clear_node_high_byte) {
ret &= 0x7FFFFFFF;
}
return (ret);
}
static int
qdevice_net_msg_received_set_option_reply(struct qdevice_net_instance *instance, const struct msg_decoded *msg)
qdevice_net_get_nodelist(cmap_handle_t cmap_handle, struct node_list *list)
{
cs_error_t cs_err;
cmap_iter_handle_t iter_handle;
char key_name[CMAP_KEYNAME_MAXLEN + 1];
char tmp_key[CMAP_KEYNAME_MAXLEN + 1];
int res;
int ret_value;
unsigned int node_pos;
uint32_t node_id;
uint32_t data_center_id;
char *tmp_str;
char *addr0_str;
int clear_node_high_byte;
ret_value = 0;
node_list_init(list);
cs_err = cmap_iter_init(cmap_handle, "nodelist.node.", &iter_handle);
if (cs_err != CS_OK) {
return (-1);
}
while ((cs_err = cmap_iter_next(cmap_handle, iter_handle, key_name, NULL, NULL)) == CS_OK) {
res = sscanf(key_name, "nodelist.node.%u.%s", &node_pos, tmp_key);
if (res != 2) {
continue;
}
if (strcmp(tmp_key, "ring0_addr") != 0) {
continue;
}
snprintf(tmp_key, CMAP_KEYNAME_MAXLEN, "nodelist.node.%u.nodeid", node_pos);
cs_err = cmap_get_uint32(cmap_handle, tmp_key, &node_id);
if (cs_err == CS_ERR_NOT_EXIST) {
/*
* Nodeid doesn't exists -> autogenerate node id
*/
clear_node_high_byte = 0;
if (cmap_get_string(cmap_handle, "totem.clear_node_high_bit", &tmp_str) == CS_OK) {
if (strcmp (tmp_str, "yes") == 0) {
clear_node_high_byte = 1;
}
free(tmp_str);
}
if (cmap_get_string(cmap_handle, key_name, &addr0_str) != CS_OK) {
return (-1);
}
node_id = qdevice_net_autogenerate_node_id(addr0_str, clear_node_high_byte);
free(addr0_str);
} else if (cs_err != CS_OK) {
ret_value = -1;
goto iter_finalize;
}
snprintf(tmp_key, CMAP_KEYNAME_MAXLEN, "nodelist.node.%u.datacenterid", node_pos);
if (cmap_get_uint32(cmap_handle, tmp_key, &data_center_id) != CS_OK) {
data_center_id = 0;
}
if (node_list_add(list, node_id, data_center_id, TLV_NODE_STATE_NOT_SET) == NULL) {
ret_value = -1;
goto iter_finalize;
}
}
iter_finalize:
cmap_iter_finalize(cmap_handle, iter_handle);
return (ret_value);
}
static int
qdevice_net_send_config_node_list(struct qdevice_net_instance *instance, int initial)
{
struct node_list nlist;
struct send_buffer_list_entry *send_buffer;
uint64_t config_version;
int send_config_version;
/*
* Send initial node list
*/
if (qdevice_net_get_nodelist(instance->cmap_handle, &nlist) != 0) {
qdevice_net_log(LOG_ERR, "Can't get initial configuration node list.");
return (-1);
}
send_buffer = send_buffer_list_get_new(&instance->send_buffer_list);
if (send_buffer == NULL) {
qdevice_net_log(LOG_ERR, "Can't allocate send list buffer for config "
"node list msg");
node_list_free(&nlist);
return (-1);
}
if (cmap_get_uint64(instance->cmap_handle, "totem.config_version",
&config_version) == CS_OK) {
send_config_version = 1;
} else {
config_version = 0;
send_config_version = 0;
}
if (msg_create_node_list(&send_buffer->buffer, 0, 0,
(initial ? TLV_NODE_LIST_TYPE_INITIAL_CONFIG : TLV_NODE_LIST_TYPE_CHANGED_CONFIG),
0, NULL, send_config_version, config_version, &nlist) == 0) {
qdevice_net_log(LOG_ERR, "Can't allocate send buffer for config list msg");
node_list_free(&nlist);
return (-1);
}
send_buffer_list_put(&instance->send_buffer_list, send_buffer);
return (0);
}
static int
qdevice_net_msg_received_set_option_reply(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
if (qdevice_net_msg_check_seq_number(instance, msg) != 0) {
@ -561,13 +753,14 @@ qdevice_net_msg_received_set_option_reply(struct qdevice_net_instance *instance,
}
if (!msg->decision_algorithm_set || !msg->heartbeat_interval_set) {
qdevice_net_log(LOG_ERR, "Received set option reply message without required options. "
"Disconnecting from server");
qdevice_net_log(LOG_ERR, "Received set option reply message without "
"required options. Disconnecting from server");
}
if (msg->decision_algorithm != instance->decision_algorithm ||
msg->heartbeat_interval != instance->heartbeat_interval) {
qdevice_net_log(LOG_ERR, "Server doesn't accept sent decision algorithm or heartbeat interval.");
qdevice_net_log(LOG_ERR, "Server doesn't accept sent decision algorithm or "
"heartbeat interval.");
return (-1);
}
@ -576,8 +769,9 @@ qdevice_net_msg_received_set_option_reply(struct qdevice_net_instance *instance,
* Server accepted heartbeat interval -> schedule regular sending of echo request
*/
if (instance->heartbeat_interval > 0) {
instance->echo_request_timer = timer_list_add(&instance->main_timer_list, instance->heartbeat_interval,
qdevice_net_timer_send_heartbeat, (void *)instance, NULL);
instance->echo_request_timer = timer_list_add(&instance->main_timer_list,
instance->heartbeat_interval, qdevice_net_timer_send_heartbeat,
(void *)instance, NULL);
if (instance->echo_request_timer == NULL) {
qdevice_net_log(LOG_ERR, "Can't schedule regular sending of heartbeat.");
@ -586,20 +780,27 @@ qdevice_net_msg_received_set_option_reply(struct qdevice_net_instance *instance,
}
}
if (qdevice_net_send_config_node_list(instance, 1) != 0) {
return (-1);
}
return (0);
}
static int
qdevice_net_msg_received_echo_request(struct qdevice_net_instance *instance, const struct msg_decoded *msg)
qdevice_net_msg_received_echo_request(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
qdevice_net_log(LOG_ERR, "Received unexpected echo request message. Disconnecting from server");
qdevice_net_log(LOG_ERR, "Received unexpected echo request message. "
"Disconnecting from server");
return (-1);
}
static int
qdevice_net_msg_received_echo_reply(struct qdevice_net_instance *instance, const struct msg_decoded *msg)
qdevice_net_msg_received_echo_reply(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
if (qdevice_net_msg_check_echo_reply_seq_number(instance, msg) != 0) {
@ -611,6 +812,26 @@ qdevice_net_msg_received_echo_reply(struct qdevice_net_instance *instance, const
return (0);
}
static int
qdevice_net_msg_received_node_list(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
qdevice_net_log(LOG_ERR, "Received unexpected echo request message. "
"Disconnecting from server");
return (-1);
}
static int
qdevice_net_msg_received_node_list_reply(struct qdevice_net_instance *instance,
const struct msg_decoded *msg)
{
qdevice_net_log(LOG_INFO, "Received node list reply %u", msg->vote);
return (0);
}
static int
qdevice_net_msg_received(struct qdevice_net_instance *instance)
@ -662,8 +883,15 @@ qdevice_net_msg_received(struct qdevice_net_instance *instance)
case MSG_TYPE_ECHO_REPLY:
ret_val = qdevice_net_msg_received_echo_reply(instance, &msg);
break;
case MSG_TYPE_NODE_LIST:
ret_val = qdevice_net_msg_received_node_list(instance, &msg);
break;
case MSG_TYPE_NODE_LIST_REPLY:
ret_val = qdevice_net_msg_received_node_list_reply(instance, &msg);
break;
default:
qdevice_net_log(LOG_ERR, "Received unsupported message %u. Disconnecting from server", msg.type);
qdevice_net_log(LOG_ERR, "Received unsupported message %u. "
"Disconnecting from server", msg.type);
ret_val = -1;
break;
}
@ -685,8 +913,8 @@ qdevice_net_socket_read(struct qdevice_net_instance *instance)
orig_skipping_msg = instance->skipping_msg;
res = msgio_read(instance->socket, &instance->receive_buffer, &instance->msg_already_received_bytes,
&instance->skipping_msg);
res = msgio_read(instance->socket, &instance->receive_buffer,
&instance->msg_already_received_bytes, &instance->skipping_msg);
if (!orig_skipping_msg && instance->skipping_msg) {
qdevice_net_log(LOG_DEBUG, "msgio_read set skipping_msg");
@ -705,20 +933,23 @@ qdevice_net_socket_read(struct qdevice_net_instance *instance)
ret_val = -1;
break;
case -2:
qdevice_net_log_nss(LOG_ERR, "Unhandled error when reading from server. Disconnecting from server");
qdevice_net_log_nss(LOG_ERR, "Unhandled error when reading from server. "
"Disconnecting from server");
ret_val = -1;
break;
case -3:
qdevice_net_log(LOG_ERR, "Can't store message header from server. Disconnecting from server");
qdevice_net_log(LOG_ERR, "Can't store message header from server. "
"Disconnecting from server");
ret_val = -1;
break;
case -4:
qdevice_net_log(LOG_ERR, "Can't store message from server. Disconnecting from server");
qdevice_net_log(LOG_ERR, "Can't store message from server. "
"Disconnecting from server");
ret_val = -1;
break;
case -5:
qdevice_net_log(LOG_WARNING, "Server sent unsupported msg type %u. Disconnecting from server",
msg_get_type(&instance->receive_buffer));
qdevice_net_log(LOG_WARNING, "Server sent unsupported msg type %u. "
"Disconnecting from server", msg_get_type(&instance->receive_buffer));
ret_val = -1;
break;
case -6:
@ -762,8 +993,8 @@ qdevice_net_socket_write_finished(struct qdevice_net_instance *instance)
*/
if ((new_pr_fd = nss_sock_start_ssl_as_client(instance->socket, QNETD_NSS_SERVER_CN,
qdevice_net_nss_bad_cert_hook,
qdevice_net_nss_get_client_auth_data, (void *)QDEVICE_NET_NSS_CLIENT_CERT_NICKNAME,
0, NULL)) == NULL) {
qdevice_net_nss_get_client_auth_data,
(void *)QDEVICE_NET_NSS_CLIENT_CERT_NICKNAME, 0, NULL)) == NULL) {
qdevice_net_log_nss(LOG_ERR, "Can't start TLS");
return (-1);
@ -786,31 +1017,28 @@ static int
qdevice_net_socket_write(struct qdevice_net_instance *instance)
{
int res;
int send_echo_request;
struct send_buffer_list_entry *send_buffer;
enum msg_type sent_msg_type;
/*
* Echo request has extra buffer and special processing. Messages other then echo request
* has higher priority, but if echo request send was not completed
* it's necesary to complete it.
*/
send_echo_request = !(instance->sending_msg && instance->echo_request_msg_already_sent_bytes == 0);
send_buffer = send_buffer_list_get_active(&instance->send_buffer_list);
if (send_buffer == NULL) {
qdevice_net_log(LOG_CRIT, "send_buffer_list_get_active returned NULL");
if (!send_echo_request) {
res = msgio_write(instance->socket, &instance->send_buffer, &instance->msg_already_sent_bytes);
} else {
res = msgio_write(instance->socket, &instance->echo_request_send_buffer,
&instance->echo_request_msg_already_sent_bytes);
return (-1);
}
if (res == 1) {
if (!send_echo_request) {
instance->sending_msg = 0;
res = msgio_write(instance->socket, &send_buffer->buffer,
&send_buffer->msg_already_sent_bytes);
if (res == 1) {
sent_msg_type = msg_get_type(&send_buffer->buffer);
send_buffer_list_delete(&instance->send_buffer_list, send_buffer);
if (sent_msg_type != MSG_TYPE_ECHO_REQUEST) {
if (qdevice_net_socket_write_finished(instance) == -1) {
return (-1);
}
} else {
instance->sending_echo_request_msg = 0;
}
}
@ -843,7 +1071,7 @@ qdevice_net_poll(struct qdevice_net_instance *instance)
pfds[QDEVICE_NET_POLL_SOCKET].fd = instance->socket;
pfds[QDEVICE_NET_POLL_SOCKET].in_flags = PR_POLL_READ;
if (instance->sending_msg || instance->sending_echo_request_msg) {
if (!send_buffer_list_empty(&instance->send_buffer_list)) {
pfds[QDEVICE_NET_POLL_SOCKET].in_flags |= PR_POLL_WRITE;
}
pfds[QDEVICE_NET_POLL_VOTEQUORUM].fd = instance->votequorum_poll_fd;
@ -863,7 +1091,8 @@ qdevice_net_poll(struct qdevice_net_instance *instance)
break;
case QDEVICE_NET_POLL_VOTEQUORUM:
if (votequorum_dispatch(instance->votequorum_handle, CS_DISPATCH_ALL) != CS_OK) {
if (votequorum_dispatch(instance->votequorum_handle,
CS_DISPATCH_ALL) != CS_OK) {
errx(1, "Can't dispatch votequorum messages");
}
break;
@ -888,10 +1117,12 @@ qdevice_net_poll(struct qdevice_net_instance *instance)
}
if (!instance->schedule_disconnect &&
pfds[i].out_flags & (PR_POLL_ERR|PR_POLL_NVAL|PR_POLL_HUP|PR_POLL_EXCEPT)) {
pfds[i].out_flags &
(PR_POLL_ERR|PR_POLL_NVAL|PR_POLL_HUP|PR_POLL_EXCEPT)) {
switch (i) {
case QDEVICE_NET_POLL_SOCKET:
qdevice_net_log(LOG_CRIT, "POLL_ERR (%u) on main socket", pfds[i].out_flags);
qdevice_net_log(LOG_CRIT, "POLL_ERR (%u) on main socket",
pfds[i].out_flags);
return (-1);
@ -920,8 +1151,9 @@ qdevice_net_poll(struct qdevice_net_instance *instance)
static int
qdevice_net_instance_init(struct qdevice_net_instance *instance, size_t initial_receive_size,
size_t initial_send_size, size_t min_send_size, size_t max_receive_size, enum tlv_tls_supported tls_supported,
uint32_t node_id, enum tlv_decision_algorithm_type decision_algorithm, uint32_t heartbeat_interval,
size_t initial_send_size, size_t min_send_size, size_t max_receive_size,
enum tlv_tls_supported tls_supported, uint32_t node_id,
enum tlv_decision_algorithm_type decision_algorithm, uint32_t heartbeat_interval,
const char *host_addr, uint16_t host_port, const char *cluster_name)
{
@ -938,8 +1170,8 @@ qdevice_net_instance_init(struct qdevice_net_instance *instance, size_t initial_
instance->host_port = host_port;
instance->cluster_name = cluster_name;
dynar_init(&instance->receive_buffer, initial_receive_size);
dynar_init(&instance->send_buffer, initial_send_size);
dynar_init(&instance->echo_request_send_buffer, initial_send_size);
send_buffer_list_init(&instance->send_buffer_list, QDEVICE_NET_MAX_SEND_BUFFERS,
initial_send_size);
timer_list_init(&instance->main_timer_list);
instance->tls_supported = tls_supported;
@ -953,13 +1185,13 @@ qdevice_net_instance_destroy(struct qdevice_net_instance *instance)
timer_list_free(&instance->main_timer_list);
dynar_destroy(&instance->receive_buffer);
dynar_destroy(&instance->send_buffer);
dynar_destroy(&instance->echo_request_send_buffer);
send_buffer_list_free(&instance->send_buffer_list);
/*
* Close cmap and votequorum connections
*/
if (votequorum_qdevice_unregister(instance->votequorum_handle, QDEVICE_NET_VOTEQUORUM_DEVICE_NAME) != CS_OK) {
if (votequorum_qdevice_unregister(instance->votequorum_handle,
QDEVICE_NET_VOTEQUORUM_DEVICE_NAME) != CS_OK) {
qdevice_net_log_nss(LOG_WARNING, "Unable to unregister votequorum device");
}
votequorum_finalize(instance->votequorum_handle);
@ -976,8 +1208,9 @@ qdevice_net_init_cmap(cmap_handle_t *handle)
no_retries = 0;
while ((res = cmap_initialize(handle)) == CS_ERR_TRY_AGAIN && no_retries++ < MAX_CS_TRY_AGAIN) {
sleep(1);
while ((res = cmap_initialize(handle)) == CS_ERR_TRY_AGAIN &&
no_retries++ < MAX_CS_TRY_AGAIN) {
poll(NULL, 0, 1000);
}
if (res != CS_OK) {
@ -993,9 +1226,13 @@ static int
qdevice_net_parse_bool_str(const char *str)
{
if (strcasecmp(str, "yes") == 0 || strcasecmp(str, "on") == 0 || strcasecmp(str, "1") == 0) {
if (strcasecmp(str, "yes") == 0 ||
strcasecmp(str, "on") == 0 ||
strcasecmp(str, "1") == 0) {
return (1);
} else if (strcasecmp(str, "no") == 0 || strcasecmp(str, "off") == 0 || strcasecmp(str, "0") == 0) {
} else if (strcasecmp(str, "no") == 0 ||
strcasecmp(str, "off") == 0 ||
strcasecmp(str, "0") == 0) {
return (0);
}
@ -1003,7 +1240,8 @@ qdevice_net_parse_bool_str(const char *str)
}
static void
qdevice_net_instance_init_from_cmap(struct qdevice_net_instance *instance, cmap_handle_t cmap_handle)
qdevice_net_instance_init_from_cmap(struct qdevice_net_instance *instance,
cmap_handle_t cmap_handle)
{
uint32_t node_id;
enum tlv_tls_supported tls_supported;
@ -1026,7 +1264,8 @@ qdevice_net_instance_init_from_cmap(struct qdevice_net_instance *instance, cmap_
if (strcmp(str, "net") != 0) {
free(str);
errx(1, "Configured device model is not net. This qdevice provider is only for net.");
errx(1, "Configured device model is not net. "
"This qdevice provider is only for net.");
}
free(str);
@ -1067,11 +1306,12 @@ qdevice_net_instance_init_from_cmap(struct qdevice_net_instance *instance, cmap_
if (cmap_get_string(cmap_handle, "quorum.device.net.port", &str) == CS_OK) {
host_port = strtol(str, &ep, 10);
free(str);
if (host_port <= 0 || host_port > ((uint16_t)~0) || *ep != '\0') {
errx(1, "quorum.device.net.port must be in range 0-65535");
}
free(str);
} else {
host_port = QNETD_DEFAULT_HOST_PORT;
}
@ -1092,7 +1332,8 @@ qdevice_net_instance_init_from_cmap(struct qdevice_net_instance *instance, cmap_
}
heartbeat_interval = heartbeat_interval * 0.8;
if (cmap_get_uint32(cmap_handle, "quorum.device.sync_timeout", &sync_heartbeat_interval) != CS_OK) {
if (cmap_get_uint32(cmap_handle, "quorum.device.sync_timeout",
&sync_heartbeat_interval) != CS_OK) {
sync_heartbeat_interval = VOTEQUORUM_QDEVICE_DEFAULT_SYNC_TIMEOUT;
}
sync_heartbeat_interval = sync_heartbeat_interval * 0.8;
@ -1140,9 +1381,9 @@ qdevice_net_init_votequorum(struct qdevice_net_instance *instance)
no_retries = 0;
while ((res = votequorum_initialize(&votequorum_handle, &votequorum_callbacks)) == CS_ERR_TRY_AGAIN &&
no_retries++ < MAX_CS_TRY_AGAIN) {
sleep(1);
while ((res = votequorum_initialize(&votequorum_handle,
&votequorum_callbacks)) == CS_ERR_TRY_AGAIN && no_retries++ < MAX_CS_TRY_AGAIN) {
poll(NULL, 0, 1000);
}
if (res != CS_OK) {
@ -1153,7 +1394,8 @@ qdevice_net_init_votequorum(struct qdevice_net_instance *instance)
errx(1, "Can't start tracking votequorum changes. Error %s", cs_strerror(res));
}
if ((res = votequorum_qdevice_register(votequorum_handle, QDEVICE_NET_VOTEQUORUM_DEVICE_NAME)) != CS_OK) {
if ((res = votequorum_qdevice_register(votequorum_handle,
QDEVICE_NET_VOTEQUORUM_DEVICE_NAME)) != CS_OK) {
errx(1, "Can't register votequorum device. Error %s", cs_strerror(res));
}
@ -1166,11 +1408,13 @@ qdevice_net_init_votequorum(struct qdevice_net_instance *instance)
}
int
main(void)
{
struct qdevice_net_instance instance;
cmap_handle_t cmap_handle;
struct send_buffer_list_entry *send_buffer;
/*
* Init
@ -1181,14 +1425,16 @@ main(void)
qdevice_net_log_init(QDEVICE_NET_LOG_TARGET_STDERR);
qdevice_net_log_set_debug(1);
if (nss_sock_init_nss((instance.tls_supported != TLV_TLS_UNSUPPORTED ? (char *)NSS_DB_DIR : NULL)) != 0) {
if (nss_sock_init_nss((instance.tls_supported != TLV_TLS_UNSUPPORTED ?
(char *)NSS_DB_DIR : NULL)) != 0) {
err_nss();
}
/*
* Try to connect to qnetd host
*/
instance.socket = nss_sock_create_client_socket(instance.host_addr, instance.host_port, PR_AF_UNSPEC, 100);
instance.socket = nss_sock_create_client_socket(instance.host_addr, instance.host_port,
PR_AF_UNSPEC, 100);
if (instance.socket == NULL) {
err_nss();
}
@ -1202,13 +1448,18 @@ main(void)
/*
* Create and schedule send of preinit message to qnetd
*/
send_buffer = send_buffer_list_get_new(&instance.send_buffer_list);
if (send_buffer == NULL) {
errx(1, "Can't allocate send buffer list");
}
instance.expected_msg_seq_num = 1;
if (msg_create_preinit(&instance.send_buffer, instance.cluster_name, 1, instance.expected_msg_seq_num) == 0) {
if (msg_create_preinit(&send_buffer->buffer, instance.cluster_name, 1,
instance.expected_msg_seq_num) == 0) {
errx(1, "Can't allocate buffer");
}
if (qdevice_net_schedule_send(&instance) != 0) {
errx(1, "Can't schedule send of preinit msg");
}
send_buffer_list_put(&instance.send_buffer_list, send_buffer);
instance.state = QDEVICE_NET_STATE_WAITING_PREINIT_REPLY;

View File

@ -55,13 +55,14 @@
#include "tlv.h"
#include "nss-sock.h"
#include "qnetd-client.h"
#include "qnetd-clients-list.h"
#include "qnetd-client-list.h"
#include "qnetd-poll-array.h"
#include "qnetd-log.h"
#include "dynar.h"
#include "timer-list.h"
#define QNETD_LISTEN_BACKLOG 10
#define QNETD_LISTEN_BACKLOG 10
#define QNETD_MAX_CLIENT_SEND_BUFFERS 10
#define QNETD_MAX_CLIENT_SEND_SIZE (1 << 15)
#define QNETD_MAX_CLIENT_RECEIVE_SIZE (1 << 15)
@ -81,8 +82,9 @@ struct qnetd_instance {
SECKEYPrivateKey *private_key;
} server;
size_t max_client_receive_size;
size_t max_client_send_buffers;
size_t max_client_send_size;
struct qnetd_clients_list clients;
struct qnetd_client_list clients;
struct qnetd_poll_array poll_array;
enum tlv_tls_supported tls_supported;
int tls_client_cert_required;
@ -142,49 +144,42 @@ qnetd_client_log_msg_decode_error(int ret)
}
}
static int
qnetd_client_net_schedule_send(struct qnetd_client *client)
{
if (client->sending_msg) {
/*
* Client is already sending msg
*/
return (-1);
}
client->msg_already_sent_bytes = 0;
client->sending_msg = 1;
return (0);
}
static int
qnetd_client_send_err(struct qnetd_client *client, int add_msg_seq_number, uint32_t msg_seq_number,
enum tlv_reply_error_code reply)
{
struct send_buffer_list_entry *send_buffer;
if (msg_create_server_error(&client->send_buffer, add_msg_seq_number, msg_seq_number, reply) == 0) {
qnetd_log(LOG_ERR, "Can't alloc server error msg. Disconnecting client connection.");
send_buffer = send_buffer_list_get_new(&client->send_buffer_list);
if (send_buffer == NULL) {
qnetd_log(LOG_ERR, "Can't alloc server error msg from list. "
"Disconnecting client connection.");
return (-1);
}
if (msg_create_server_error(&send_buffer->buffer, add_msg_seq_number,
msg_seq_number, reply) == 0) {
qnetd_log(LOG_ERR, "Can't alloc server error msg. "
"Disconnecting client connection.");
return (-1);
};
if (qnetd_client_net_schedule_send(client) != 0) {
qnetd_log(LOG_ERR, "Can't schedule send of error message. Disconnecting client connection.");
return (-1);
}
send_buffer_list_put(&client->send_buffer_list, send_buffer);
return (0);
}
static int
qnetd_client_msg_received_preinit(struct qnetd_instance *instance, struct qnetd_client *client,
const struct msg_decoded *msg)
const struct msg_decoded *msg)
{
struct send_buffer_list_entry *send_buffer;
if (msg->cluster_name == NULL) {
qnetd_log(LOG_ERR, "Received preinit message without cluster name. Sending error reply.");
qnetd_log(LOG_ERR, "Received preinit message without cluster name. "
"Sending error reply.");
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
TLV_REPLY_ERROR_CODE_DOESNT_CONTAIN_REQUIRED_OPTION) != 0) {
@ -210,25 +205,30 @@ qnetd_client_msg_received_preinit(struct qnetd_instance *instance, struct qnetd_
client->cluster_name_len = msg->cluster_name_len;
client->preinit_received = 1;
if (msg_create_preinit_reply(&client->send_buffer, msg->seq_number_set, msg->seq_number,
send_buffer = send_buffer_list_get_new(&client->send_buffer_list);
if (send_buffer == NULL) {
qnetd_log(LOG_ERR, "Can't alloc preinit reply msg from list. "
"Disconnecting client connection.");
return (-1);
}
if (msg_create_preinit_reply(&send_buffer->buffer, msg->seq_number_set, msg->seq_number,
instance->tls_supported, instance->tls_client_cert_required) == 0) {
qnetd_log(LOG_ERR, "Can't alloc preinit reply msg. Disconnecting client connection.");
qnetd_log(LOG_ERR, "Can't alloc preinit reply msg. "
"Disconnecting client connection.");
return (-1);
};
if (qnetd_client_net_schedule_send(client) != 0) {
qnetd_log(LOG_ERR, "Can't schedule send of preinit message. Disconnecting client connection.");
return (-1);
}
send_buffer_list_put(&client->send_buffer_list, send_buffer);
return (0);
}
static int
qnetd_client_msg_received_preinit_reply(struct qnetd_instance *instance, struct qnetd_client *client,
const struct msg_decoded *msg)
qnetd_client_msg_received_preinit_reply(struct qnetd_instance *instance,
struct qnetd_client *client, const struct msg_decoded *msg)
{
qnetd_log(LOG_ERR, "Received preinit reply. Sending back error message");
@ -243,12 +243,13 @@ qnetd_client_msg_received_preinit_reply(struct qnetd_instance *instance, struct
static int
qnetd_client_msg_received_starttls(struct qnetd_instance *instance, struct qnetd_client *client,
const struct msg_decoded *msg)
const struct msg_decoded *msg)
{
PRFileDesc *new_pr_fd;
if (!client->preinit_received) {
qnetd_log(LOG_ERR, "Received starttls before preinit message. Sending error reply.");
qnetd_log(LOG_ERR, "Received starttls before preinit message. "
"Sending error reply.");
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
TLV_REPLY_ERROR_CODE_PREINIT_REQUIRED) != 0) {
@ -274,8 +275,9 @@ qnetd_client_msg_received_starttls(struct qnetd_instance *instance, struct qnetd
static int
qnetd_client_msg_received_server_error(struct qnetd_instance *instance, struct qnetd_client *client,
const struct msg_decoded *msg)
const struct msg_decoded *msg)
{
qnetd_log(LOG_ERR, "Received server error. Sending back error message");
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
@ -292,7 +294,8 @@ qnetd_client_msg_received_server_error(struct qnetd_instance *instance, struct q
* -2 - Error reply sent, but no need to disconnect client
*/
static int
qnetd_client_check_tls(struct qnetd_instance *instance, struct qnetd_client *client, const struct msg_decoded *msg)
qnetd_client_check_tls(struct qnetd_instance *instance, struct qnetd_client *client,
const struct msg_decoded *msg)
{
int check_certificate;
int tls_required;
@ -309,7 +312,8 @@ qnetd_client_check_tls(struct qnetd_instance *instance, struct qnetd_client *cli
case TLV_TLS_SUPPORTED:
tls_required = 0;
if (client->tls_started && instance->tls_client_cert_required && !client->tls_peer_certificate_verified) {
if (client->tls_started && instance->tls_client_cert_required &&
!client->tls_peer_certificate_verified) {
check_certificate = 1;
}
break;
@ -326,7 +330,8 @@ qnetd_client_check_tls(struct qnetd_instance *instance, struct qnetd_client *cli
}
if (tls_required && !client->tls_started) {
qnetd_log(LOG_ERR, "TLS is required but doesn't started yet. Sending back error message");
qnetd_log(LOG_ERR, "TLS is required but doesn't started yet. "
"Sending back error message");
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
TLV_REPLY_ERROR_CODE_TLS_REQUIRED) != 0) {
@ -340,13 +345,15 @@ qnetd_client_check_tls(struct qnetd_instance *instance, struct qnetd_client *cli
peer_cert = SSL_PeerCertificate(client->socket);
if (peer_cert == NULL) {
qnetd_log(LOG_ERR, "Client doesn't sent valid certificate. Disconnecting client");
qnetd_log(LOG_ERR, "Client doesn't sent valid certificate. "
"Disconnecting client");
return (-1);
}
if (CERT_VerifyCertName(peer_cert, client->cluster_name) != SECSuccess) {
qnetd_log(LOG_ERR, "Client doesn't sent certificate with valid CN. Disconnecting client");
qnetd_log(LOG_ERR, "Client doesn't sent certificate with valid CN. "
"Disconnecting client");
CERT_DestroyCertificate(peer_cert);
@ -363,13 +370,14 @@ qnetd_client_check_tls(struct qnetd_instance *instance, struct qnetd_client *cli
static int
qnetd_client_msg_received_init(struct qnetd_instance *instance, struct qnetd_client *client,
const struct msg_decoded *msg)
const struct msg_decoded *msg)
{
int res;
enum msg_type *supported_msgs;
size_t no_supported_msgs;
enum tlv_opt_type *supported_opts;
size_t no_supported_opts;
struct send_buffer_list_entry *send_buffer;
supported_msgs = NULL;
supported_opts = NULL;
@ -392,7 +400,8 @@ qnetd_client_msg_received_init(struct qnetd_instance *instance, struct qnetd_cli
}
if (!msg->node_id_set) {
qnetd_log(LOG_ERR, "Received init message without node id set. Sending error reply.");
qnetd_log(LOG_ERR, "Received init message without node id set. "
"Sending error reply.");
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
TLV_REPLY_ERROR_CODE_DOESNT_CONTAIN_REQUIRED_OPTION) != 0) {
@ -409,7 +418,8 @@ qnetd_client_msg_received_init(struct qnetd_instance *instance, struct qnetd_cli
*/
/*
for (i = 0; i < msg->no_supported_messages; i++) {
qnetd_log(LOG_DEBUG, "Client supports %u message", (int)msg->supported_messages[i]);
qnetd_log(LOG_DEBUG, "Client supports %u message",
(int)msg->supported_messages[i]);
}
*/
@ -426,7 +436,8 @@ qnetd_client_msg_received_init(struct qnetd_instance *instance, struct qnetd_cli
*/
/*
for (i = 0; i < msg->no_supported_options; i++) {
qnetd_log(LOG_DEBUG, "Client supports %u option", (int)msg->supported_messages[i]);
qnetd_log(LOG_DEBUG, "Client supports %u option",
(int)msg->supported_messages[i]);
}
*/
@ -440,27 +451,32 @@ qnetd_client_msg_received_init(struct qnetd_instance *instance, struct qnetd_cli
client->node_id = msg->node_id;
client->init_received = 1;
if (msg_create_init_reply(&client->send_buffer, msg->seq_number_set, msg->seq_number,
send_buffer = send_buffer_list_get_new(&client->send_buffer_list);
if (send_buffer == NULL) {
qnetd_log(LOG_ERR, "Can't alloc init reply msg from list. "
"Disconnecting client connection.");
return (-1);
}
if (msg_create_init_reply(&send_buffer->buffer, msg->seq_number_set, msg->seq_number,
supported_msgs, no_supported_msgs, supported_opts, no_supported_opts,
instance->max_client_receive_size, instance->max_client_send_size,
qnetd_static_supported_decision_algorithms, QNETD_STATIC_SUPPORTED_DECISION_ALGORITHMS_SIZE) == -1) {
qnetd_static_supported_decision_algorithms,
QNETD_STATIC_SUPPORTED_DECISION_ALGORITHMS_SIZE) == -1) {
qnetd_log(LOG_ERR, "Can't alloc init reply msg. Disconnecting client connection.");
return (-1);
}
if (qnetd_client_net_schedule_send(client) != 0) {
qnetd_log(LOG_ERR, "Can't schedule send of init reply message. Disconnecting client connection.");
return (-1);
}
send_buffer_list_put(&client->send_buffer_list, send_buffer);
return (0);
}
static int
qnetd_client_msg_received_init_reply(struct qnetd_instance *instance, struct qnetd_client *client,
const struct msg_decoded *msg)
const struct msg_decoded *msg)
{
qnetd_log(LOG_ERR, "Received init reply. Sending back error message");
@ -473,8 +489,8 @@ qnetd_client_msg_received_init_reply(struct qnetd_instance *instance, struct qne
}
static int
qnetd_client_msg_received_set_option_reply(struct qnetd_instance *instance, struct qnetd_client *client,
const struct msg_decoded *msg)
qnetd_client_msg_received_set_option_reply(struct qnetd_instance *instance,
struct qnetd_client *client, const struct msg_decoded *msg)
{
qnetd_log(LOG_ERR, "Received set option reply. Sending back error message");
@ -488,17 +504,19 @@ qnetd_client_msg_received_set_option_reply(struct qnetd_instance *instance, stru
static int
qnetd_client_msg_received_set_option(struct qnetd_instance *instance, struct qnetd_client *client,
const struct msg_decoded *msg)
const struct msg_decoded *msg)
{
int res;
size_t zi;
struct send_buffer_list_entry *send_buffer;
if ((res = qnetd_client_check_tls(instance, client, msg)) != 0) {
return (res == -1 ? -1 : 0);
}
if (!client->init_received) {
qnetd_log(LOG_ERR, "Received set option message before init message. Sending error reply.");
qnetd_log(LOG_ERR, "Received set option message before init message. "
"Sending error reply.");
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
TLV_REPLY_ERROR_CODE_INIT_REQUIRED) != 0) {
@ -515,14 +533,15 @@ qnetd_client_msg_received_set_option(struct qnetd_instance *instance, struct qne
res = 0;
for (zi = 0; zi < QNETD_STATIC_SUPPORTED_DECISION_ALGORITHMS_SIZE && !res; zi++) {
if (qnetd_static_supported_decision_algorithms[zi] == msg->decision_algorithm) {
if (qnetd_static_supported_decision_algorithms[zi] ==
msg->decision_algorithm) {
res = 1;
}
}
if (!res) {
qnetd_log(LOG_ERR, "Client requested unsupported decision algorithm %u. Sending error reply.",
msg->decision_algorithm);
qnetd_log(LOG_ERR, "Client requested unsupported decision algorithm %u. "
"Sending error reply.", msg->decision_algorithm);
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
TLV_REPLY_ERROR_CODE_UNSUPPORTED_DECISION_ALGORITHM) != 0) {
@ -539,10 +558,11 @@ qnetd_client_msg_received_set_option(struct qnetd_instance *instance, struct qne
/*
* Check if heartbeat interval is valid
*/
if (msg->heartbeat_interval != 0 && (msg->heartbeat_interval < QNETD_HEARTBEAT_INTERVAL_MIN ||
if (msg->heartbeat_interval != 0 &&
(msg->heartbeat_interval < QNETD_HEARTBEAT_INTERVAL_MIN ||
msg->heartbeat_interval > QNETD_HEARTBEAT_INTERVAL_MAX)) {
qnetd_log(LOG_ERR, "Client requested invalid heartbeat interval %u. Sending error reply.",
msg->heartbeat_interval);
qnetd_log(LOG_ERR, "Client requested invalid heartbeat interval %u. "
"Sending error reply.", msg->heartbeat_interval);
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
TLV_REPLY_ERROR_CODE_INVALID_HEARTBEAT_INTERVAL) != 0) {
@ -555,18 +575,23 @@ qnetd_client_msg_received_set_option(struct qnetd_instance *instance, struct qne
client->heartbeat_interval = msg->heartbeat_interval;
}
if (msg_create_set_option_reply(&client->send_buffer, msg->seq_number_set, msg->seq_number,
send_buffer = send_buffer_list_get_new(&client->send_buffer_list);
if (send_buffer == NULL) {
qnetd_log(LOG_ERR, "Can't alloc set option reply msg from list. "
"Disconnecting client connection.");
return (-1);
}
if (msg_create_set_option_reply(&send_buffer->buffer, msg->seq_number_set, msg->seq_number,
client->decision_algorithm, client->heartbeat_interval) == -1) {
qnetd_log(LOG_ERR, "Can't alloc set option reply msg. Disconnecting client connection.");
qnetd_log(LOG_ERR, "Can't alloc set option reply msg. "
"Disconnecting client connection.");
return (-1);
}
if (qnetd_client_net_schedule_send(client) != 0) {
qnetd_log(LOG_ERR, "Can't schedule send of set option reply message. Disconnecting client connection.");
return (-1);
}
send_buffer_list_put(&client->send_buffer_list, send_buffer);
return (0);
}
@ -590,13 +615,15 @@ qnetd_client_msg_received_echo_request(struct qnetd_instance *instance, struct q
const struct msg_decoded *msg, const struct dynar *msg_orig)
{
int res;
struct send_buffer_list_entry *send_buffer;
if ((res = qnetd_client_check_tls(instance, client, msg)) != 0) {
return (res == -1 ? -1 : 0);
}
if (!client->init_received) {
qnetd_log(LOG_ERR, "Received echo request before init message. Sending error reply.");
qnetd_log(LOG_ERR, "Received echo request before init message. "
"Sending error reply.");
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
TLV_REPLY_ERROR_CODE_INIT_REQUIRED) != 0) {
@ -606,15 +633,89 @@ qnetd_client_msg_received_echo_request(struct qnetd_instance *instance, struct q
return (0);
}
if (msg_create_echo_reply(&client->send_buffer, msg_orig) == -1) {
send_buffer = send_buffer_list_get_new(&client->send_buffer_list);
if (send_buffer == NULL) {
qnetd_log(LOG_ERR, "Can't alloc echo reply msg from list. "
"Disconnecting client connection.");
return (-1);
}
if (msg_create_echo_reply(&send_buffer->buffer, msg_orig) == -1) {
qnetd_log(LOG_ERR, "Can't alloc echo reply msg. Disconnecting client connection.");
return (-1);
}
if (qnetd_client_net_schedule_send(client) != 0) {
qnetd_log(LOG_ERR, "Can't schedule send of echo reply message. Disconnecting client connection.");
send_buffer_list_put(&client->send_buffer_list, send_buffer);
return (0);
}
static int
qnetd_client_msg_received_node_list(struct qnetd_instance *instance, struct qnetd_client *client,
const struct msg_decoded *msg)
{
int res;
struct send_buffer_list_entry *send_buffer;
if ((res = qnetd_client_check_tls(instance, client, msg)) != 0) {
return (res == -1 ? -1 : 0);
}
if (!client->init_received) {
qnetd_log(LOG_ERR, "Received set option message before init message. "
"Sending error reply.");
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
TLV_REPLY_ERROR_CODE_INIT_REQUIRED) != 0) {
return (-1);
}
return (0);
}
if (!msg->node_list_type_set) {
qnetd_log(LOG_ERR, "Received node list message without node list type set. "
"Sending error reply.");
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
TLV_REPLY_ERROR_CODE_DOESNT_CONTAIN_REQUIRED_OPTION) != 0) {
return (-1);
}
return (0);
}
send_buffer = send_buffer_list_get_new(&client->send_buffer_list);
if (send_buffer == NULL) {
qnetd_log(LOG_ERR, "Can't alloc node list reply msg from list. "
"Disconnecting client connection.");
return (-1);
}
if (msg_create_node_list_reply(&send_buffer->buffer, msg->seq_number_set, msg->seq_number,
TLV_VOTE_ACK) == -1) {
qnetd_log(LOG_ERR, "Can't alloc node list reply msg. "
"Disconnecting client connection.");
return (-1);
}
send_buffer_list_put(&client->send_buffer_list, send_buffer);
return (0);
}
static int
qnetd_client_msg_received_node_list_reply(struct qnetd_instance *instance, struct qnetd_client *client,
const struct msg_decoded *msg)
{
qnetd_log(LOG_ERR, "Received node list reply. Sending back error message");
if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
TLV_REPLY_ERROR_CODE_UNEXPECTED_MESSAGE) != 0) {
return (-1);
}
@ -674,14 +775,21 @@ qnetd_client_msg_received(struct qnetd_instance *instance, struct qnetd_client *
ret_val = qnetd_client_msg_received_set_option_reply(instance, client, &msg);
break;
case MSG_TYPE_ECHO_REQUEST:
ret_val = qnetd_client_msg_received_echo_request(instance, client, &msg, &client->receive_buffer);
ret_val = qnetd_client_msg_received_echo_request(instance, client, &msg,
&client->receive_buffer);
break;
case MSG_TYPE_ECHO_REPLY:
ret_val = qnetd_client_msg_received_echo_reply(instance, client, &msg);
break;
case MSG_TYPE_NODE_LIST:
ret_val = qnetd_client_msg_received_node_list(instance, client, &msg);
break;
case MSG_TYPE_NODE_LIST_REPLY:
ret_val = qnetd_client_msg_received_node_list_reply(instance, client, &msg);
break;
default:
qnetd_log(LOG_ERR, "Unsupported message %u received from client. Sending back error message",
msg.type);
qnetd_log(LOG_ERR, "Unsupported message %u received from client. "
"Sending back error message", msg.type);
if (qnetd_client_send_err(client, msg.seq_number_set, msg.seq_number,
TLV_REPLY_ERROR_CODE_UNSUPPORTED_MESSAGE) != 0) {
@ -711,11 +819,20 @@ static int
qnetd_client_net_write(struct qnetd_instance *instance, struct qnetd_client *client)
{
int res;
struct send_buffer_list_entry *send_buffer;
res = msgio_write(client->socket, &client->send_buffer, &client->msg_already_sent_bytes);
send_buffer = send_buffer_list_get_active(&client->send_buffer_list);
if (send_buffer == NULL) {
qnetd_log_nss(LOG_CRIT, "send_buffer_list_get_active returned NULL");
return (-1);
}
res = msgio_write(client->socket, &send_buffer->buffer,
&send_buffer->msg_already_sent_bytes);
if (res == 1) {
client->sending_msg = 0;
send_buffer_list_delete(&client->send_buffer_list, send_buffer);
if (qnetd_client_net_write_finished(instance, client) == -1) {
return (-1);
@ -750,8 +867,8 @@ qnetd_client_net_read(struct qnetd_instance *instance, struct qnetd_client *clie
orig_skipping_msg = client->skipping_msg;
res = msgio_read(client->socket, &client->receive_buffer, &client->msg_already_received_bytes,
&client->skipping_msg);
res = msgio_read(client->socket, &client->receive_buffer,
&client->msg_already_received_bytes, &client->skipping_msg);
if (!orig_skipping_msg && client->skipping_msg) {
qnetd_log(LOG_DEBUG, "msgio_read set skipping_msg");
@ -770,7 +887,8 @@ qnetd_client_net_read(struct qnetd_instance *instance, struct qnetd_client *clie
ret_val = -1;
break;
case -2:
qnetd_log_nss(LOG_ERR, "Unhandled error when reading from client. Disconnecting client");
qnetd_log_nss(LOG_ERR, "Unhandled error when reading from client. "
"Disconnecting client");
ret_val = -1;
break;
case -3:
@ -826,7 +944,8 @@ qnetd_client_accept(struct qnetd_instance *instance)
PRFileDesc *client_socket;
struct qnetd_client *client;
if ((client_socket = PR_Accept(instance->server.socket, &client_addr, PR_INTERVAL_NO_TIMEOUT)) == NULL) {
if ((client_socket = PR_Accept(instance->server.socket, &client_addr,
PR_INTERVAL_NO_TIMEOUT)) == NULL) {
qnetd_log_nss(LOG_ERR, "Can't accept connection");
return (-1);
}
@ -836,8 +955,9 @@ qnetd_client_accept(struct qnetd_instance *instance)
return (-1);
}
client = qnetd_clients_list_add(&instance->clients, client_socket, &client_addr,
instance->max_client_receive_size, instance->max_client_send_size);
client = qnetd_client_list_add(&instance->clients, client_socket, &client_addr,
instance->max_client_receive_size, instance->max_client_send_buffers,
instance->max_client_send_size);
if (client == NULL) {
qnetd_log(LOG_ERR, "Can't add client to list");
return (-2);
@ -851,7 +971,7 @@ qnetd_client_disconnect(struct qnetd_instance *instance, struct qnetd_client *cl
{
PR_Close(client->socket);
qnetd_clients_list_del(&instance->clients, client);
qnetd_client_list_del(&instance->clients, client);
}
static int
@ -867,7 +987,7 @@ qnetd_poll(struct qnetd_instance *instance)
client = NULL;
client_disconnect = 0;
pfds = qnetd_poll_array_create_from_clients_list(&instance->poll_array,
pfds = qnetd_poll_array_create_from_client_list(&instance->poll_array,
&instance->clients, instance->server.socket, PR_POLL_READ);
if (pfds == NULL) {
@ -921,14 +1041,16 @@ qnetd_poll(struct qnetd_instance *instance)
}
if (!client_disconnect &&
pfds[i].out_flags & (PR_POLL_ERR|PR_POLL_NVAL|PR_POLL_HUP|PR_POLL_EXCEPT)) {
pfds[i].out_flags &
(PR_POLL_ERR|PR_POLL_NVAL|PR_POLL_HUP|PR_POLL_EXCEPT)) {
if (i == 0) {
if (pfds[i].out_flags != PR_POLL_NVAL) {
/*
* Poll ERR on listening socket is fatal error. POLL_NVAL is
* used as a signal to quit poll loop.
* Poll ERR on listening socket is fatal error.
* POLL_NVAL is used as a signal to quit poll loop.
*/
qnetd_log(LOG_CRIT, "POLL_ERR (%u) on listening socket", pfds[i].out_flags);
qnetd_log(LOG_CRIT, "POLL_ERR (%u) on listening "
"socket", pfds[i].out_flags);
} else {
qnetd_log(LOG_DEBUG, "Listening socket is closed");
}
@ -936,8 +1058,8 @@ qnetd_poll(struct qnetd_instance *instance)
return (-1);
} else {
qnetd_log(LOG_DEBUG, "POLL_ERR (%u) on client socket. Disconnecting.",
pfds[i].out_flags);
qnetd_log(LOG_DEBUG, "POLL_ERR (%u) on client socket. "
"Disconnecting.", pfds[i].out_flags);
client_disconnect = 1;
}
@ -974,15 +1096,17 @@ qnetd_instance_init_certs(struct qnetd_instance *instance)
static int
qnetd_instance_init(struct qnetd_instance *instance, size_t max_client_receive_size,
size_t max_client_send_size, enum tlv_tls_supported tls_supported, int tls_client_cert_required)
size_t max_client_send_buffers, size_t max_client_send_size,
enum tlv_tls_supported tls_supported, int tls_client_cert_required)
{
memset(instance, 0, sizeof(*instance));
qnetd_poll_array_init(&instance->poll_array);
qnetd_clients_list_init(&instance->clients);
qnetd_client_list_init(&instance->clients);
instance->max_client_receive_size = max_client_receive_size;
instance->max_client_send_buffers = max_client_send_buffers;
instance->max_client_send_size = max_client_send_size;
instance->tls_supported = tls_supported;
@ -1007,7 +1131,7 @@ qnetd_instance_destroy(struct qnetd_instance *instance)
}
qnetd_poll_array_destroy(&instance->poll_array);
qnetd_clients_list_free(&instance->clients);
qnetd_client_list_free(&instance->clients);
return (0);
}
@ -1090,7 +1214,8 @@ main(int argc, char *argv[])
cli_parse(argc, argv, &host_addr, &host_port);
if (qnetd_instance_init(&instance, QNETD_MAX_CLIENT_RECEIVE_SIZE, QNETD_MAX_CLIENT_SEND_SIZE,
if (qnetd_instance_init(&instance, QNETD_MAX_CLIENT_RECEIVE_SIZE,
QNETD_MAX_CLIENT_SEND_BUFFERS, QNETD_MAX_CLIENT_SEND_SIZE,
QNETD_TLS_SUPPORTED, QNETD_TLS_CLIENT_CERT_REQUIRED) == -1) {
errx(1, "Can't initialize qnetd");
}
@ -1101,7 +1226,8 @@ main(int argc, char *argv[])
qnetd_err_nss();
}
instance.server.socket = nss_sock_create_listen_socket(instance.host_addr, instance.host_port, PR_AF_INET6);
instance.server.socket = nss_sock_create_listen_socket(instance.host_addr,
instance.host_port, PR_AF_INET6);
if (instance.server.socket == NULL) {
qnetd_err_nss();
}

View File

@ -44,7 +44,7 @@
#define MSG_TYPE_LENGTH 2
#define MSG_LENGTH_LENGTH 4
#define MSG_STATIC_SUPPORTED_MESSAGES_SIZE 10
#define MSG_STATIC_SUPPORTED_MESSAGES_SIZE 12
enum msg_type msg_static_supported_messages[MSG_STATIC_SUPPORTED_MESSAGES_SIZE] = {
MSG_TYPE_PREINIT,
@ -57,6 +57,8 @@ enum msg_type msg_static_supported_messages[MSG_STATIC_SUPPORTED_MESSAGES_SIZE]
MSG_TYPE_SET_OPTION_REPLY,
MSG_TYPE_ECHO_REQUEST,
MSG_TYPE_ECHO_REPLY,
MSG_TYPE_NODE_LIST,
MSG_TYPE_NODE_LIST_REPLY,
};
size_t
@ -134,7 +136,8 @@ msg_get_len(const struct dynar *msg)
size_t
msg_create_preinit(struct dynar *msg, const char *cluster_name, int add_msg_seq_number, uint32_t msg_seq_number)
msg_create_preinit(struct dynar *msg, const char *cluster_name, int add_msg_seq_number,
uint32_t msg_seq_number)
{
dynar_clean(msg);
@ -321,7 +324,8 @@ msg_create_init_reply(struct dynar *msg, int add_msg_seq_number, uint32_t msg_se
const enum msg_type *supported_msgs, size_t no_supported_msgs,
const enum tlv_opt_type *supported_opts, size_t no_supported_opts,
size_t server_maximum_request_size, size_t server_maximum_reply_size,
const enum tlv_decision_algorithm_type *supported_decision_algorithms, size_t no_supported_decision_algorithms)
const enum tlv_decision_algorithm_type *supported_decision_algorithms,
size_t no_supported_decision_algorithms)
{
uint16_t *u16a;
int res;
@ -494,6 +498,86 @@ small_buf_err:
return (0);
}
size_t
msg_create_node_list(struct dynar *msg, int add_msg_seq_number,
uint32_t msg_seq_number, enum tlv_node_list_type node_list_type,
int add_ring_id, const struct tlv_ring_id *ring_id,
int add_config_version, uint64_t config_version, const struct node_list *nodes)
{
struct node_list_entry *node_info;
struct tlv_node_info tlv_ni;
dynar_clean(msg);
msg_add_type(msg, MSG_TYPE_NODE_LIST);
msg_add_len(msg);
if (add_msg_seq_number) {
if (tlv_add_msg_seq_number(msg, msg_seq_number) == -1) {
goto small_buf_err;
}
}
if (tlv_add_node_list_type(msg, node_list_type) == -1) {
goto small_buf_err;
}
if (add_ring_id) {
if (tlv_add_ring_id(msg, ring_id) == -1) {
goto small_buf_err;
}
}
if (add_config_version) {
if (tlv_add_config_version(msg, config_version) == -1) {
goto small_buf_err;
}
}
TAILQ_FOREACH(node_info, nodes, entries) {
node_list_entry_to_tlv_node_info(node_info, &tlv_ni);
if (tlv_add_node_info(msg, &tlv_ni) == -1) {
goto small_buf_err;
}
}
msg_set_len(msg, dynar_size(msg) - (MSG_TYPE_LENGTH + MSG_LENGTH_LENGTH));
return (dynar_size(msg));
small_buf_err:
return (0);
}
size_t
msg_create_node_list_reply(struct dynar *msg, int add_msg_seq_number, uint32_t msg_seq_number,
enum tlv_vote vote)
{
dynar_clean(msg);
msg_add_type(msg, MSG_TYPE_NODE_LIST_REPLY);
msg_add_len(msg);
if (add_msg_seq_number) {
if (tlv_add_msg_seq_number(msg, msg_seq_number) == -1) {
goto small_buf_err;
}
}
if (tlv_add_vote(msg, vote) == -1) {
goto small_buf_err;
}
msg_set_len(msg, dynar_size(msg) - (MSG_TYPE_LENGTH + MSG_LENGTH_LENGTH));
return (dynar_size(msg));
small_buf_err:
return (0);
}
int
msg_is_valid_msg_type(const struct dynar *msg)
{
@ -516,6 +600,8 @@ msg_decoded_init(struct msg_decoded *decoded_msg)
{
memset(decoded_msg, 0, sizeof(*decoded_msg));
node_list_init(&decoded_msg->nodes);
}
void
@ -525,6 +611,7 @@ msg_decoded_destroy(struct msg_decoded *decoded_msg)
free(decoded_msg->cluster_name);
free(decoded_msg->supported_messages);
free(decoded_msg->supported_options);
node_list_free(&decoded_msg->nodes);
msg_decoded_init(decoded_msg);
}
@ -542,6 +629,9 @@ msg_decode(const struct dynar *msg, struct msg_decoded *decoded_msg)
struct tlv_iterator tlv_iter;
uint16_t *u16a;
uint32_t u32;
uint64_t u64;
struct tlv_ring_id ring_id;
struct tlv_node_info node_info;
size_t zi;
enum tlv_opt_type opt_type;
int iter_res;
@ -558,28 +648,31 @@ msg_decode(const struct dynar *msg, struct msg_decoded *decoded_msg)
switch (opt_type) {
case TLV_OPT_MSG_SEQ_NUMBER:
if (tlv_iter_decode_u32(&tlv_iter, &decoded_msg->seq_number) != 0) {
return (-1);
if ((res = tlv_iter_decode_u32(&tlv_iter, &u32)) != 0) {
return (res);
}
decoded_msg->seq_number_set = 1;
decoded_msg->seq_number = u32;
break;
case TLV_OPT_CLUSTER_NAME:
if (tlv_iter_decode_str(&tlv_iter, &decoded_msg->cluster_name,
&decoded_msg->cluster_name_len) != 0) {
if ((res = tlv_iter_decode_str(&tlv_iter, &decoded_msg->cluster_name,
&decoded_msg->cluster_name_len)) != 0) {
return (-2);
}
break;
case TLV_OPT_TLS_SUPPORTED:
if ((res = tlv_iter_decode_tls_supported(&tlv_iter, &decoded_msg->tls_supported)) != 0) {
if ((res = tlv_iter_decode_tls_supported(&tlv_iter,
&decoded_msg->tls_supported)) != 0) {
return (res);
}
decoded_msg->tls_supported_set = 1;
break;
case TLV_OPT_TLS_CLIENT_CERT_REQUIRED:
if (tlv_iter_decode_client_cert_required(&tlv_iter, &decoded_msg->tls_client_cert_required) != 0) {
return (-1);
if ((res = tlv_iter_decode_client_cert_required(&tlv_iter,
&decoded_msg->tls_client_cert_required)) != 0) {
return (res);
}
decoded_msg->tls_client_cert_required_set = 1;
@ -592,7 +685,9 @@ msg_decode(const struct dynar *msg, struct msg_decoded *decoded_msg)
return (res);
}
decoded_msg->supported_messages = malloc(sizeof(enum msg_type) * decoded_msg->no_supported_messages);
decoded_msg->supported_messages =
malloc(sizeof(enum msg_type) * decoded_msg->no_supported_messages);
if (decoded_msg->supported_messages == NULL) {
free(u16a);
return (-2);
@ -607,37 +702,39 @@ msg_decode(const struct dynar *msg, struct msg_decoded *decoded_msg)
case TLV_OPT_SUPPORTED_OPTIONS:
free(decoded_msg->supported_options);
if ((res = tlv_iter_decode_supported_options(&tlv_iter, &decoded_msg->supported_options,
if ((res = tlv_iter_decode_supported_options(&tlv_iter,
&decoded_msg->supported_options,
&decoded_msg->no_supported_options)) != 0) {
return (res);
}
break;
case TLV_OPT_REPLY_ERROR_CODE:
if (tlv_iter_decode_reply_error_code(&tlv_iter, &decoded_msg->reply_error_code) != 0) {
return (-1);
if ((res = tlv_iter_decode_reply_error_code(&tlv_iter,
&decoded_msg->reply_error_code)) != 0) {
return (res);
}
decoded_msg->reply_error_code_set = 1;
break;
case TLV_OPT_SERVER_MAXIMUM_REQUEST_SIZE:
if (tlv_iter_decode_u32(&tlv_iter, &u32) != 0) {
return (-1);
if ((res = tlv_iter_decode_u32(&tlv_iter, &u32)) != 0) {
return (res);
}
decoded_msg->server_maximum_request_size_set = 1;
decoded_msg->server_maximum_request_size = u32;
break;
case TLV_OPT_SERVER_MAXIMUM_REPLY_SIZE:
if (tlv_iter_decode_u32(&tlv_iter, &u32) != 0) {
return (-1);
if ((res = tlv_iter_decode_u32(&tlv_iter, &u32)) != 0) {
return (res);
}
decoded_msg->server_maximum_reply_size_set = 1;
decoded_msg->server_maximum_reply_size = u32;
break;
case TLV_OPT_NODE_ID:
if (tlv_iter_decode_u32(&tlv_iter, &u32) != 0) {
return (-1);
if ((res = tlv_iter_decode_u32(&tlv_iter, &u32)) != 0) {
return (res);
}
decoded_msg->node_id_set = 1;
@ -653,20 +750,74 @@ msg_decode(const struct dynar *msg, struct msg_decoded *decoded_msg)
}
break;
case TLV_OPT_DECISION_ALGORITHM:
if (tlv_iter_decode_decision_algorithm(&tlv_iter, &decoded_msg->decision_algorithm) != 0) {
return (-1);
if ((res = tlv_iter_decode_decision_algorithm(&tlv_iter,
&decoded_msg->decision_algorithm)) != 0) {
return (res);
}
decoded_msg->decision_algorithm_set = 1;
break;
case TLV_OPT_HEARTBEAT_INTERVAL:
if (tlv_iter_decode_u32(&tlv_iter, &u32) != 0) {
return (-1);
if ((res = tlv_iter_decode_u32(&tlv_iter, &u32)) != 0) {
return (res);
}
decoded_msg->heartbeat_interval_set = 1;
decoded_msg->heartbeat_interval = u32;
break;
case TLV_OPT_RING_ID:
if ((res = tlv_iter_decode_ring_id(&tlv_iter, &ring_id)) != 0) {
return (res);
}
decoded_msg->ring_id_set = 1;
memcpy(&decoded_msg->ring_id, &ring_id, sizeof(ring_id));
break;
case TLV_OPT_CONFIG_VERSION:
if ((res = tlv_iter_decode_u64(&tlv_iter, &u64)) != 0) {
return (res);
}
decoded_msg->config_version_set = 1;
decoded_msg->config_version = u64;
break;
case TLV_OPT_DATA_CENTER_ID:
if ((res = tlv_iter_decode_u32(&tlv_iter, &u32)) != 0) {
return (res);
}
decoded_msg->data_center_id = u32;
break;
case TLV_OPT_NODE_STATE:
if ((res = tlv_iter_decode_node_state(&tlv_iter,
&decoded_msg->node_state)) != 0) {
return (res);
}
break;
case TLV_OPT_NODE_INFO:
if ((res = tlv_iter_decode_node_info(&tlv_iter, &node_info)) != 0) {
return (res);
}
if (node_list_add_from_node_info(&decoded_msg->nodes, &node_info) == NULL) {
return (-2);
}
break;
case TLV_OPT_NODE_LIST_TYPE:
if ((res = tlv_iter_decode_node_list_type(&tlv_iter,
&decoded_msg->node_list_type)) != 0) {
return (res);
}
decoded_msg->node_list_type_set = 1;
break;
case TLV_OPT_VOTE:
if ((res = tlv_iter_decode_vote(&tlv_iter, &decoded_msg->vote)) != 0) {
return (res);
}
decoded_msg->vote_set = 1;
break;
default:
/*
* Unknown option

View File

@ -40,6 +40,7 @@
#include "dynar.h"
#include "tlv.h"
#include "node-list.h"
#ifdef __cplusplus
extern "C" {
@ -56,18 +57,21 @@ enum msg_type {
MSG_TYPE_SET_OPTION_REPLY = 7,
MSG_TYPE_ECHO_REQUEST = 8,
MSG_TYPE_ECHO_REPLY = 9,
MSG_TYPE_NODE_LIST = 10,
MSG_TYPE_NODE_LIST_REPLY = 11,
};
struct msg_decoded {
enum msg_type type;
uint8_t seq_number_set;
uint32_t seq_number; // Only valid if seq_number_set != 0
uint32_t seq_number; // Only valid if seq_number_set != 0
size_t cluster_name_len;
char *cluster_name; // Valid only if != NULL. Trailing \0 is added but not counted in cluster_name_len
// Valid only if != NULL. Trailing \0 is added but not counted in cluster_name_len
char *cluster_name;
uint8_t tls_supported_set;
enum tlv_tls_supported tls_supported; // Valid only if tls_supported_set != 0.
uint8_t tls_client_cert_required_set;
uint8_t tls_client_cert_required; // Valid only if tls_client_cert_required_set != 0
uint8_t tls_client_cert_required; // Valid only if tls_client_cert_required_set != 0
size_t no_supported_messages;
enum msg_type *supported_messages; // Valid only if != NULL
size_t no_supported_options;
@ -75,17 +79,31 @@ struct msg_decoded {
uint8_t reply_error_code_set;
enum tlv_reply_error_code reply_error_code; // Valid only if reply_error_code_set != 0
uint8_t server_maximum_request_size_set;
size_t server_maximum_request_size; // Valid only if server_maximum_request_size_set != 0
// Valid only if server_maximum_request_size_set != 0
size_t server_maximum_request_size;
uint8_t server_maximum_reply_size_set;
size_t server_maximum_reply_size; // Valid only if server_maximum_reply_size_set != 0
size_t server_maximum_reply_size; // Valid only if server_maximum_reply_size_set != 0
uint8_t node_id_set;
uint32_t node_id;
size_t no_supported_decision_algorithms;
enum tlv_decision_algorithm_type *supported_decision_algorithms; // Valid only if != NULL
// Valid only if != NULL
enum tlv_decision_algorithm_type *supported_decision_algorithms;
uint8_t decision_algorithm_set;
enum tlv_decision_algorithm_type decision_algorithm; // Valid only if decision_algorithm_set != 0
// Valid only if decision_algorithm_set != 0
enum tlv_decision_algorithm_type decision_algorithm;
uint8_t heartbeat_interval_set;
uint32_t heartbeat_interval; // Valid only if heartbeat_interval_set != 0
uint32_t heartbeat_interval; // Valid only if heartbeat_interval_set != 0
uint8_t ring_id_set;
struct tlv_ring_id ring_id; // Valid only if ring_id_set != 0
uint8_t config_version_set;
uint64_t config_version; // Valid only if config_version_set != 0
uint32_t data_center_id; // Valid only if != 0
enum tlv_node_state node_state; // Valid only if != TLV_NODE_STATE_NOT_SET
struct node_list nodes; // Valid only if node_list_is_empty(nodes) != 0
int node_list_type_set;
enum tlv_node_list_type node_list_type; // Valid only if node_list_type_set != 0
int vote_set;
enum tlv_vote vote; // Valid only if vote_set != 0
};
extern size_t msg_create_preinit(struct dynar *msg, const char *cluster_name,
@ -97,18 +115,19 @@ extern size_t msg_create_preinit_reply(struct dynar *msg, int add_msg_seq_numbe
extern size_t msg_create_starttls(struct dynar *msg, int add_msg_seq_number,
uint32_t msg_seq_number);
extern size_t msg_create_init(struct dynar *msg, int add_msg_seq_number, uint32_t msg_seq_number,
const enum msg_type *supported_msgs, size_t no_supported_msgs,
extern size_t msg_create_init(struct dynar *msg, int add_msg_seq_number,
uint32_t msg_seq_number, const enum msg_type *supported_msgs, size_t no_supported_msgs,
const enum tlv_opt_type *supported_opts, size_t no_supported_opts, uint32_t node_id);
extern size_t msg_create_server_error(struct dynar *msg, int add_msg_seq_number, uint32_t msg_seq_number,
enum tlv_reply_error_code reply_error_code);
extern size_t msg_create_server_error(struct dynar *msg, int add_msg_seq_number,
uint32_t msg_seq_number, enum tlv_reply_error_code reply_error_code);
extern size_t msg_create_init_reply(struct dynar *msg, int add_msg_seq_number, uint32_t msg_seq_number,
const enum msg_type *supported_msgs, size_t no_supported_msgs,
extern size_t msg_create_init_reply(struct dynar *msg, int add_msg_seq_number,
uint32_t msg_seq_number, const enum msg_type *supported_msgs, size_t no_supported_msgs,
const enum tlv_opt_type *supported_opts, size_t no_supported_opts,
size_t server_maximum_request_size, size_t server_maximum_reply_size,
const enum tlv_decision_algorithm_type *supported_decision_algorithms, size_t no_supported_decision_algorithms);
const enum tlv_decision_algorithm_type *supported_decision_algorithms,
size_t no_supported_decision_algorithms);
extern size_t msg_create_set_option(struct dynar *msg,
int add_msg_seq_number, uint32_t msg_seq_number,
@ -119,9 +138,20 @@ extern size_t msg_create_set_option_reply(struct dynar *msg,
int add_msg_seq_number, uint32_t msg_seq_number,
enum tlv_decision_algorithm_type decision_algorithm, uint32_t heartbeat_interval);
extern size_t msg_create_echo_request(struct dynar *msg, int add_msg_seq_number, uint32_t msg_seq_number);
extern size_t msg_create_echo_request(struct dynar *msg, int add_msg_seq_number,
uint32_t msg_seq_number);
extern size_t msg_create_echo_reply(struct dynar *msg, const struct dynar *echo_request_msg);
extern size_t msg_create_echo_reply(struct dynar *msg,
const struct dynar *echo_request_msg);
extern size_t msg_create_node_list(struct dynar *msg,
int add_msg_seq_number, uint32_t msg_seq_number, enum tlv_node_list_type node_list_type,
int add_ring_id, const struct tlv_ring_id *ring_id,
int add_config_version, uint64_t config_version,
const struct node_list *nodes);
extern size_t msg_create_node_list_reply(struct dynar *msg, int add_msg_seq_number,
uint32_t msg_seq_number, enum tlv_vote vote);
extern size_t msg_get_header_length(void);

View File

@ -87,7 +87,10 @@ msgio_send_blocking(PRFileDesc *sock, const char *msg, size_t msg_len)
}
/*
* -1 means send returned 0, -2 unhandled error. 0 = success but whole buffer is still not sent, 1 = all data was sent
* -1 = send returned 0,
* -2 = unhandled error.
* 0 = success but whole buffer is still not sent
* 1 = all data was sent
*/
int
msgio_write(PRFileDesc *sock, const struct dynar *msg, size_t *already_sent_bytes)
@ -100,7 +103,8 @@ msgio_write(PRFileDesc *sock, const struct dynar *msg, size_t *already_sent_byte
to_send = MSGIO_LOCAL_BUF_SIZE;
}
sent = PR_Send(sock, dynar_data(msg) + *already_sent_bytes, to_send, 0, PR_INTERVAL_NO_TIMEOUT);
sent = PR_Send(sock, dynar_data(msg) + *already_sent_bytes, to_send, 0,
PR_INTERVAL_NO_TIMEOUT);
if (sent > 0) {
*already_sent_bytes += sent;
@ -183,7 +187,8 @@ msgio_read(PRFileDesc *sock, struct dynar *msg, size_t *already_received_bytes,
if (!msg_is_valid_msg_type(msg)) {
*skipping_msg = 1;
ret = -5;
} else if (msg_get_header_length() + msg_get_len(msg) > dynar_max_size(msg)) {
} else if ((msg_get_header_length() + msg_get_len(msg)) >
dynar_max_size(msg)) {
*skipping_msg = 1;
ret = -6;
}

View File

@ -50,7 +50,8 @@ extern ssize_t msgio_send_blocking(PRFileDesc *sock, const char *msg, size_t msg
extern int msgio_write(PRFileDesc *sock, const struct dynar *msg, size_t *already_sent_bytes);
extern int msgio_read(PRFileDesc *sock, struct dynar *msg, size_t *already_received_bytes, int *skipping_msg);
extern int msgio_read(PRFileDesc *sock, struct dynar *msg, size_t *already_received_bytes,
int *skipping_msg);
#ifdef __cplusplus
}

121
qdevices/node-list.c Normal file
View File

@ -0,0 +1,121 @@
/*
* Copyright (c) 2015 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Jan Friesse (jfriesse@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the Red Hat, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <inttypes.h>
#include <stdlib.h>
#include <string.h>
#include "node-list.h"
void
node_list_init(struct node_list *list)
{
TAILQ_INIT(list);
}
struct node_list_entry *
node_list_add(struct node_list *list, uint32_t node_id, uint32_t data_center_id,
enum tlv_node_state node_state)
{
struct node_list_entry *node;
node = (struct node_list_entry *)malloc(sizeof(*node));
if (node == NULL) {
return (NULL);
}
memset(node, 0, sizeof(*node));
node->node_id = node_id;
node->data_center_id = data_center_id;
node->node_state = node_state;
TAILQ_INSERT_TAIL(list, node, entries);
return (node);
}
struct node_list_entry *
node_list_add_from_node_info(struct node_list *list, const struct tlv_node_info *node_info)
{
return (node_list_add(list, node_info->node_id, node_info->data_center_id,
node_info->node_state));
}
void
node_list_entry_to_tlv_node_info(const struct node_list_entry *node,
struct tlv_node_info *node_info)
{
node_info->node_id = node->node_id;
node_info->data_center_id = node->data_center_id;
node_info->node_state = node->node_state;
}
void
node_list_free(struct node_list *list)
{
struct node_list_entry *node;
struct node_list_entry *node_next;
node = TAILQ_FIRST(list);
while (node != NULL) {
node_next = TAILQ_NEXT(node, entries);
free(node);
node = node_next;
}
TAILQ_INIT(list);
}
void
node_list_del(struct node_list *list, struct node_list_entry *node)
{
TAILQ_REMOVE(list, node, entries);
free(node);
}
int
node_list_is_empty(struct node_list *list)
{
return (TAILQ_EMPTY(list));
}

80
qdevices/node-list.h Normal file
View File

@ -0,0 +1,80 @@
/*
* Copyright (c) 2015 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Jan Friesse (jfriesse@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the Red Hat, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _NODE_LIST_H_
#define _NODE_LIST_H_
#include <sys/types.h>
#include <sys/queue.h>
#include <inttypes.h>
#include "tlv.h"
#ifdef __cplusplus
extern "C" {
#endif
struct node_list_entry {
uint32_t node_id;
uint32_t data_center_id;
enum tlv_node_state node_state;
TAILQ_ENTRY(node_list_entry) entries;
};
TAILQ_HEAD(node_list, node_list_entry);
extern void node_list_init(struct node_list *list);
extern struct node_list_entry *node_list_add(struct node_list *list,
uint32_t node_id, uint32_t data_center_id, enum tlv_node_state node_state);
extern struct node_list_entry *node_list_add_from_node_info(
struct node_list *list, const struct tlv_node_info *node_info);
extern void node_list_free(struct node_list *list);
extern void node_list_del(struct node_list *list,
struct node_list_entry *node);
extern int node_list_is_empty(struct node_list *list);
extern void node_list_entry_to_tlv_node_info(
const struct node_list_entry *node, struct tlv_node_info *node_info);
#ifdef __cplusplus
}
#endif
#endif /* _NODE_LIST_H_ */

View File

@ -146,7 +146,8 @@ nss_sock_create_listen_socket(const char *hostname, uint16_t port, PRIntn af)
addr_iter = NULL;
while ((addr_iter = PR_EnumerateAddrInfo(addr_iter, addr_info, port, &addr)) != NULL) {
while ((addr_iter = PR_EnumerateAddrInfo(addr_iter, addr_info, port,
&addr)) != NULL) {
if (addr.raw.family == af) {
sock = nss_sock_create_socket(af, 1);
if (sock == NULL) {
@ -188,7 +189,8 @@ nss_sock_create_listen_socket(const char *hostname, uint16_t port, PRIntn af)
* PR_AF_INET.
*/
PRFileDesc *
nss_sock_create_client_socket(const char *hostname, uint16_t port, PRIntn af, PRIntervalTime timeout)
nss_sock_create_client_socket(const char *hostname, uint16_t port, PRIntn af,
PRIntervalTime timeout)
{
PRNetAddr addr;
PRFileDesc *sock;
@ -241,8 +243,9 @@ nss_sock_create_client_socket(const char *hostname, uint16_t port, PRIntn af, PR
* verification fails.
*/
PRFileDesc *
nss_sock_start_ssl_as_client(PRFileDesc *input_sock, const char *ssl_url, SSLBadCertHandler bad_cert_hook,
SSLGetClientAuthData client_auth_hook, void *client_auth_hook_arg, int force_handshake, int *reset_would_block)
nss_sock_start_ssl_as_client(PRFileDesc *input_sock, const char *ssl_url,
SSLBadCertHandler bad_cert_hook, SSLGetClientAuthData client_auth_hook,
void *client_auth_hook_arg, int force_handshake, int *reset_would_block)
{
PRFileDesc *ssl_sock;
@ -269,7 +272,8 @@ nss_sock_start_ssl_as_client(PRFileDesc *input_sock, const char *ssl_url, SSLBad
}
if (client_auth_hook != NULL &&
(SSL_GetClientAuthDataHook(ssl_sock, client_auth_hook, client_auth_hook_arg) != SECSuccess)) {
(SSL_GetClientAuthDataHook(ssl_sock, client_auth_hook,
client_auth_hook_arg) != SECSuccess)) {
return (NULL);
}
@ -292,8 +296,9 @@ nss_sock_start_ssl_as_client(PRFileDesc *input_sock, const char *ssl_url, SSLBad
}
PRFileDesc *
nss_sock_start_ssl_as_server(PRFileDesc *input_sock, CERTCertificate *server_cert, SECKEYPrivateKey *server_key,
int require_client_cert, int force_handshake, int *reset_would_block)
nss_sock_start_ssl_as_server(PRFileDesc *input_sock, CERTCertificate *server_cert,
SECKEYPrivateKey *server_key, int require_client_cert, int force_handshake,
int *reset_would_block)
{
PRFileDesc *ssl_sock;
@ -306,7 +311,8 @@ nss_sock_start_ssl_as_server(PRFileDesc *input_sock, CERTCertificate *server_cer
return (NULL);
}
if (SSL_ConfigSecureServer(ssl_sock, server_cert, server_key, NSS_FindCertKEAType(server_cert)) != SECSuccess) {
if (SSL_ConfigSecureServer(ssl_sock, server_cert, server_key,
NSS_FindCertKEAType(server_cert)) != SECSuccess) {
return (NULL);
}

View File

@ -43,16 +43,22 @@ extern "C" {
#endif
extern int nss_sock_init_nss(char *config_dir);
extern PRFileDesc *nss_sock_create_listen_socket(const char *hostname, uint16_t port, PRIntn af);
extern PRFileDesc *nss_sock_create_listen_socket(const char *hostname, uint16_t port,
PRIntn af);
extern int nss_sock_set_nonblocking(PRFileDesc *sock);
extern PRFileDesc *nss_sock_create_client_socket(const char *hostname, uint16_t port, PRIntn af, PRIntervalTime timeout);
extern PRFileDesc *nss_sock_create_client_socket(const char *hostname, uint16_t port,
PRIntn af, PRIntervalTime timeout);
extern PRFileDesc *nss_sock_start_ssl_as_client(PRFileDesc *input_sock, const char *ssl_url,
SSLBadCertHandler bad_cert_hook, SSLGetClientAuthData client_auth_hook, void *client_auth_hook_arg,
int force_handshake, int *reset_would_block);
SSLBadCertHandler bad_cert_hook, SSLGetClientAuthData client_auth_hook,
void *client_auth_hook_arg, int force_handshake, int *reset_would_block);
extern PRFileDesc *nss_sock_start_ssl_as_server(PRFileDesc *input_sock, CERTCertificate *server_cert,
SECKEYPrivateKey *server_key, int require_client_cert, int force_handshake, int *reset_would_block);
extern PRFileDesc *nss_sock_start_ssl_as_server(PRFileDesc *input_sock,
CERTCertificate *server_cert, SECKEYPrivateKey *server_key, int require_client_cert,
int force_handshake, int *reset_would_block);
#ifdef __cplusplus
}

View File

@ -39,18 +39,18 @@
#include <stdlib.h>
#include <string.h>
#include "qnetd-clients-list.h"
#include "qnetd-client-list.h"
void
qnetd_clients_list_init(struct qnetd_clients_list *clients_list)
qnetd_client_list_init(struct qnetd_client_list *client_list)
{
TAILQ_INIT(clients_list);
TAILQ_INIT(client_list);
}
struct qnetd_client *
qnetd_clients_list_add(struct qnetd_clients_list *clients_list, PRFileDesc *sock, PRNetAddr *addr,
size_t max_receive_size, size_t max_send_size)
qnetd_client_list_add(struct qnetd_client_list *client_list, PRFileDesc *sock, PRNetAddr *addr,
size_t max_receive_size, size_t max_send_buffers, size_t max_send_size)
{
struct qnetd_client *client;
@ -59,20 +59,20 @@ qnetd_clients_list_add(struct qnetd_clients_list *clients_list, PRFileDesc *sock
return (NULL);
}
qnetd_client_init(client, sock, addr, max_receive_size, max_send_size);
qnetd_client_init(client, sock, addr, max_receive_size, max_send_buffers, max_send_size);
TAILQ_INSERT_TAIL(clients_list, client, entries);
TAILQ_INSERT_TAIL(client_list, client, entries);
return (client);
}
void
qnetd_clients_list_free(struct qnetd_clients_list *clients_list)
qnetd_client_list_free(struct qnetd_client_list *client_list)
{
struct qnetd_client *client;
struct qnetd_client *client_next;
client = TAILQ_FIRST(clients_list);
client = TAILQ_FIRST(client_list);
while (client != NULL) {
client_next = TAILQ_NEXT(client, entries);
@ -82,14 +82,14 @@ qnetd_clients_list_free(struct qnetd_clients_list *clients_list)
client = client_next;
}
TAILQ_INIT(clients_list);
TAILQ_INIT(client_list);
}
void
qnetd_clients_list_del(struct qnetd_clients_list *clients_list, struct qnetd_client *client)
qnetd_client_list_del(struct qnetd_client_list *client_list, struct qnetd_client *client)
{
TAILQ_REMOVE(clients_list, client, entries);
TAILQ_REMOVE(client_list, client, entries);
qnetd_client_destroy(client);
free(client);
}

View File

@ -32,8 +32,8 @@
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _QNETD_CLIENTS_LIST_H_
#define _QNETD_CLIENTS_LIST_H_
#ifndef _QNETD_CLIENT_LIST_H_
#define _QNETD_CLIENT_LIST_H_
#include <sys/types.h>
#include <inttypes.h>
@ -44,20 +44,21 @@
extern "C" {
#endif
TAILQ_HEAD(qnetd_clients_list, qnetd_client);
TAILQ_HEAD(qnetd_client_list, qnetd_client);
extern void qnetd_clients_list_init(struct qnetd_clients_list *clients_list);
extern void qnetd_client_list_init(struct qnetd_client_list *client_list);
extern struct qnetd_client *qnetd_clients_list_add(struct qnetd_clients_list *clients_list,
PRFileDesc *sock, PRNetAddr *addr, size_t max_receive_size, size_t max_send_size);
extern struct qnetd_client *qnetd_client_list_add(struct qnetd_client_list *client_list,
PRFileDesc *sock, PRNetAddr *addr, size_t max_receive_size, size_t max_send_buffers,
size_t max_send_size);
extern void qnetd_clients_list_free(struct qnetd_clients_list *clients_list);
extern void qnetd_client_list_free(struct qnetd_client_list *client_list);
extern void qnetd_clients_list_del(struct qnetd_clients_list *clients_list,
extern void qnetd_client_list_del(struct qnetd_client_list *client_list,
struct qnetd_client *client);
#ifdef __cplusplus
}
#endif
#endif /* _QNETD_CLIENTS_LIST_H_ */
#endif /* _QNETD_CLIENT_LIST_H_ */

View File

@ -40,14 +40,14 @@
void
qnetd_client_init(struct qnetd_client *client, PRFileDesc *sock, PRNetAddr *addr,
size_t max_receive_size, size_t max_send_size)
size_t max_receive_size, size_t max_send_buffers, size_t max_send_size)
{
memset(client, 0, sizeof(*client));
client->socket = sock;
memcpy(&client->addr, addr, sizeof(*addr));
dynar_init(&client->receive_buffer, max_receive_size);
dynar_init(&client->send_buffer, max_send_size);
send_buffer_list_init(&client->send_buffer_list, max_send_buffers, max_send_size);
}
void
@ -55,5 +55,5 @@ qnetd_client_destroy(struct qnetd_client *client)
{
dynar_destroy(&client->receive_buffer);
dynar_destroy(&client->send_buffer);
send_buffer_list_free(&client->send_buffer_list);
}

View File

@ -43,6 +43,7 @@
#include <nspr.h>
#include "dynar.h"
#include "tlv.h"
#include "send-buffer-list.h"
#ifdef __cplusplus
extern "C" {
@ -52,10 +53,8 @@ struct qnetd_client {
PRFileDesc *socket;
PRNetAddr addr;
struct dynar receive_buffer;
struct dynar send_buffer;
struct send_buffer_list send_buffer_list;
size_t msg_already_received_bytes;
size_t msg_already_sent_bytes;
int sending_msg; // Have message to sent
int skipping_msg; // When incorrect message was received skip it
int tls_started; // Set after TLS started
int tls_peer_certificate_verified; // Certificate is verified only once
@ -71,8 +70,8 @@ struct qnetd_client {
TAILQ_ENTRY(qnetd_client) entries;
};
extern void qnetd_client_init(struct qnetd_client *client, PRFileDesc *sock, PRNetAddr *addr,
size_t max_receive_size, size_t max_send_size);
extern void qnetd_client_init(struct qnetd_client *client, PRFileDesc *sock,
PRNetAddr *addr, size_t max_receive_size, size_t max_send_buffers, size_t max_send_size);
extern void qnetd_client_destroy(struct qnetd_client *client);

View File

@ -125,8 +125,8 @@ qnetd_poll_array_get(const struct qnetd_poll_array *poll_array, unsigned int pos
}
PRPollDesc *
qnetd_poll_array_create_from_clients_list(struct qnetd_poll_array *poll_array,
const struct qnetd_clients_list *clients_list,
qnetd_poll_array_create_from_client_list(struct qnetd_poll_array *poll_array,
const struct qnetd_client_list *client_list,
PRFileDesc *extra_fd, PRInt16 extra_fd_in_flags)
{
struct qnetd_client *client;
@ -145,14 +145,14 @@ qnetd_poll_array_create_from_clients_list(struct qnetd_poll_array *poll_array,
poll_desc->out_flags = 0;
}
TAILQ_FOREACH(client, clients_list, entries) {
TAILQ_FOREACH(client, client_list, entries) {
poll_desc = qnetd_poll_array_add(poll_array);
if (poll_desc == NULL) {
return (NULL);
}
poll_desc->fd = client->socket;
poll_desc->in_flags = PR_POLL_READ;
if (client->sending_msg) {
if (!send_buffer_list_empty(&client->send_buffer_list)) {
poll_desc->in_flags |= PR_POLL_WRITE;
}
poll_desc->out_flags = 0;

View File

@ -41,7 +41,7 @@
#include <nspr.h>
#include "qnetd-client.h"
#include "qnetd-clients-list.h"
#include "qnetd-client-list.h"
#ifdef __cplusplus
extern "C" {
@ -64,10 +64,12 @@ extern unsigned int qnetd_poll_array_size(struct qnetd_poll_array *poll_array);
extern PRPollDesc *qnetd_poll_array_add(struct qnetd_poll_array *poll_array);
extern PRPollDesc *qnetd_poll_array_get(const struct qnetd_poll_array *poll_array, unsigned int pos);
extern PRPollDesc *qnetd_poll_array_get(const struct qnetd_poll_array *poll_array,
unsigned int pos);
extern PRPollDesc *qnetd_poll_array_create_from_clients_list(struct qnetd_poll_array *poll_array,
const struct qnetd_clients_list *clients_list, PRFileDesc *extra_fd, PRInt16 extra_fd_in_flags);
extern PRPollDesc *qnetd_poll_array_create_from_client_list(
struct qnetd_poll_array *poll_array, const struct qnetd_client_list *client_list,
PRFileDesc *extra_fd, PRInt16 extra_fd_in_flags);
#ifdef __cplusplus
}

169
qdevices/send-buffer-list.c Normal file
View File

@ -0,0 +1,169 @@
/*
* Copyright (c) 2015 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Jan Friesse (jfriesse@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the Red Hat, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include "send-buffer-list.h"
void
send_buffer_list_init(struct send_buffer_list *sblist, size_t max_list_entries,
size_t max_buffer_size)
{
memset(sblist, 0, sizeof(*sblist));
sblist->max_list_entries = max_list_entries;
sblist->allocated_list_entries = 0;
sblist->max_buffer_size = max_buffer_size;
TAILQ_INIT(&sblist->list);
TAILQ_INIT(&sblist->free_list);
}
struct send_buffer_list_entry *
send_buffer_list_get_new(struct send_buffer_list *sblist)
{
struct send_buffer_list_entry *entry;
if (!TAILQ_EMPTY(&sblist->free_list)) {
/*
* Use free list entry
*/
entry = TAILQ_FIRST(&sblist->free_list);
TAILQ_REMOVE(&sblist->free_list, entry, entries);
dynar_clean(&entry->buffer);
dynar_set_max_size(&entry->buffer, sblist->max_buffer_size);
} else {
if (sblist->allocated_list_entries + 1 > sblist->max_list_entries) {
return (NULL);
}
sblist->allocated_list_entries++;
/*
* Alloc new entry
*/
entry = malloc(sizeof(*entry));
if (entry == NULL) {
return (NULL);
}
dynar_init(&entry->buffer, sblist->max_buffer_size);
}
entry->msg_already_sent_bytes = 0;
return (entry);
}
void
send_buffer_list_put(struct send_buffer_list *sblist, struct send_buffer_list_entry *sblist_entry)
{
TAILQ_INSERT_TAIL(&sblist->list, sblist_entry, entries);
}
struct send_buffer_list_entry *
send_buffer_list_get_active(const struct send_buffer_list *sblist)
{
struct send_buffer_list_entry *entry;
entry = TAILQ_FIRST(&sblist->list);
return (entry);
}
void
send_buffer_list_delete(struct send_buffer_list *sblist,
struct send_buffer_list_entry *sblist_entry)
{
/*
* Move item to free list
*/
TAILQ_REMOVE(&sblist->list, sblist_entry, entries);
TAILQ_INSERT_HEAD(&sblist->free_list, sblist_entry, entries);
}
int
send_buffer_list_empty(const struct send_buffer_list *sblist)
{
return (TAILQ_EMPTY(&sblist->list));
}
void
send_buffer_list_free(struct send_buffer_list *sblist)
{
struct send_buffer_list_entry *entry;
struct send_buffer_list_entry *entry_next;
entry = TAILQ_FIRST(&sblist->list);
while (entry != NULL) {
entry_next = TAILQ_NEXT(entry, entries);
dynar_destroy(&entry->buffer);
free(entry);
entry = entry_next;
}
entry = TAILQ_FIRST(&sblist->free_list);
while (entry != NULL) {
entry_next = TAILQ_NEXT(entry, entries);
dynar_destroy(&entry->buffer);
free(entry);
entry = entry_next;
}
}
void
send_buffer_list_set_max_buffer_size(struct send_buffer_list *sblist, size_t max_buffer_size)
{
sblist->max_buffer_size = max_buffer_size;
}
void
send_buffer_list_set_max_list_entries(struct send_buffer_list *sblist, size_t max_list_entries)
{
sblist->max_list_entries = max_list_entries;
}

View File

@ -0,0 +1,92 @@
/*
* Copyright (c) 2015 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Jan Friesse (jfriesse@redhat.com)
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the Red Hat, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _SEND_BUFFER_LIST_H_
#define _SEND_BUFFER_LIST_H_
#include <sys/queue.h>
#include "dynar.h"
#ifdef __cplusplus
extern "C" {
#endif
struct send_buffer_list_entry {
struct dynar buffer;
size_t msg_already_sent_bytes;
TAILQ_ENTRY(send_buffer_list_entry) entries;
};
struct send_buffer_list {
size_t max_list_entries;
size_t allocated_list_entries;
size_t max_buffer_size;
TAILQ_HEAD(, send_buffer_list_entry) list;
TAILQ_HEAD(, send_buffer_list_entry) free_list;
};
extern void send_buffer_list_init(struct send_buffer_list *sblist,
size_t max_list_entries, size_t max_buffer_size);
extern struct send_buffer_list_entry *send_buffer_list_get_new(struct send_buffer_list *sblist);
extern void send_buffer_list_put(struct send_buffer_list *sblist,
struct send_buffer_list_entry *sblist_entry);
extern struct send_buffer_list_entry *send_buffer_list_get_active(
const struct send_buffer_list *sblist);
extern void send_buffer_list_delete(struct send_buffer_list *sblist,
struct send_buffer_list_entry *sblist_entry);
extern int send_buffer_list_empty(
const struct send_buffer_list *sblist);
extern void send_buffer_list_free(struct send_buffer_list *sblist);
extern void send_buffer_list_set_max_buffer_size(
struct send_buffer_list *sblist, size_t max_buffer_size);
extern void send_buffer_list_set_max_list_entries(
struct send_buffer_list *sblist, size_t max_list_entries);
#ifdef __cplusplus
}
#endif
#endif /* _SEND_BUFFER_LIST_H_ */

View File

@ -48,7 +48,8 @@ timer_list_init(struct timer_list *tlist)
}
struct timer_list_entry *
timer_list_add(struct timer_list *tlist, PRUint32 interval, timer_list_cb_fn func, void *data1, void *data2)
timer_list_add(struct timer_list *tlist, PRUint32 interval, timer_list_cb_fn func, void *data1,
void *data2)
{
struct timer_list_entry *entry;

View File

@ -62,8 +62,8 @@ struct timer_list {
extern void timer_list_init(struct timer_list *tlist);
extern struct timer_list_entry *timer_list_add(struct timer_list *tlist, PRUint32 interval,
timer_list_cb_fn func, void *data1, void *data2);
extern struct timer_list_entry *timer_list_add(struct timer_list *tlist,
PRUint32 interval, timer_list_cb_fn func, void *data1, void *data2);
extern void timer_list_delete(struct timer_list *tlist,
struct timer_list_entry *entry);

View File

@ -39,12 +39,24 @@
#include <stdlib.h>
#include <string.h>
/*
* 64-bit variant of ntoh is not exactly standard...
*/
#if defined(__linux__)
#include <endian.h>
#elif defined(__FreeBSD__) || defined(__NetBSD__)
#include <sys/endian.h>
#elif defined(__OpenBSD__)
#include <sys/types.h>
#define be64toh(x) betoh64(x)
#endif
#include "tlv.h"
#define TLV_TYPE_LENGTH 2
#define TLV_LENGTH_LENGTH 2
#define TLV_STATIC_SUPPORTED_OPTIONS_SIZE 13
#define TLV_STATIC_SUPPORTED_OPTIONS_SIZE 20
enum tlv_opt_type tlv_static_supported_options[TLV_STATIC_SUPPORTED_OPTIONS_SIZE] = {
TLV_OPT_MSG_SEQ_NUMBER,
@ -60,6 +72,13 @@ enum tlv_opt_type tlv_static_supported_options[TLV_STATIC_SUPPORTED_OPTIONS_SIZE
TLV_OPT_SUPPORTED_DECISION_ALGORITHMS,
TLV_OPT_DECISION_ALGORITHM,
TLV_OPT_HEARTBEAT_INTERVAL,
TLV_OPT_RING_ID,
TLV_OPT_CONFIG_VERSION,
TLV_OPT_DATA_CENTER_ID,
TLV_OPT_NODE_STATE,
TLV_OPT_NODE_INFO,
TLV_OPT_NODE_LIST_TYPE,
TLV_OPT_VOTE,
};
int
@ -109,6 +128,16 @@ tlv_add_u16(struct dynar *msg, enum tlv_opt_type opt_type, uint16_t u16)
return (tlv_add(msg, opt_type, sizeof(nu16), &nu16));
}
int
tlv_add_u64(struct dynar *msg, enum tlv_opt_type opt_type, uint64_t u64)
{
uint64_t nu64;
nu64 = htobe64(u64);
return (tlv_add(msg, opt_type, sizeof(nu64), &nu64));
}
int
tlv_add_string(struct dynar *msg, enum tlv_opt_type opt_type, const char *str)
{
@ -145,7 +174,8 @@ tlv_add_tls_client_cert_required(struct dynar *msg, int tls_client_cert_required
}
int
tlv_add_u16_array(struct dynar *msg, enum tlv_opt_type opt_type, const uint16_t *array, size_t array_size)
tlv_add_u16_array(struct dynar *msg, enum tlv_opt_type opt_type, const uint16_t *array,
size_t array_size)
{
size_t i;
uint16_t *nu16a;
@ -195,8 +225,8 @@ tlv_add_supported_options(struct dynar *msg, const enum tlv_opt_type *supported_
}
int
tlv_add_supported_decision_algorithms(struct dynar *msg, const enum tlv_decision_algorithm_type *supported_algorithms,
size_t no_supported_algorithms)
tlv_add_supported_decision_algorithms(struct dynar *msg,
const enum tlv_decision_algorithm_type *supported_algorithms, size_t no_supported_algorithms)
{
uint16_t *u16a;
size_t i;
@ -211,7 +241,8 @@ tlv_add_supported_decision_algorithms(struct dynar *msg, const enum tlv_decision
u16a[i] = (uint16_t)supported_algorithms[i];
}
res = (tlv_add_u16_array(msg, TLV_OPT_SUPPORTED_DECISION_ALGORITHMS, u16a, no_supported_algorithms));
res = (tlv_add_u16_array(msg, TLV_OPT_SUPPORTED_DECISION_ALGORITHMS, u16a,
no_supported_algorithms));
free(u16a);
@ -260,13 +291,113 @@ tlv_add_heartbeat_interval(struct dynar *msg, uint32_t heartbeat_interval)
return (tlv_add_u32(msg, TLV_OPT_HEARTBEAT_INTERVAL, heartbeat_interval));
}
int
tlv_add_ring_id(struct dynar *msg, const struct tlv_ring_id *ring_id)
{
uint64_t nu64;
uint32_t nu32;
char tmp_buf[12];
nu32 = htonl(ring_id->node_id);
nu64 = htobe64(ring_id->seq);
memcpy(tmp_buf, &nu32, sizeof(nu32));
memcpy(tmp_buf + sizeof(nu32), &nu64, sizeof(nu64));
return (tlv_add(msg, TLV_OPT_RING_ID, sizeof(tmp_buf), tmp_buf));
}
int
tlv_add_config_version(struct dynar *msg, uint64_t config_version)
{
return (tlv_add_u64(msg, TLV_OPT_CONFIG_VERSION, config_version));
}
int
tlv_add_data_center_id(struct dynar *msg, uint32_t data_center_id)
{
return (tlv_add_u32(msg, TLV_OPT_DATA_CENTER_ID, data_center_id));
}
int
tlv_add_node_state(struct dynar *msg, enum tlv_node_state node_state)
{
return (tlv_add_u8(msg, TLV_OPT_NODE_STATE, node_state));
}
int
tlv_add_node_info(struct dynar *msg, const struct tlv_node_info *node_info)
{
struct dynar opt_value;
int res;
res = 0;
/*
* Create sub message,
*/
dynar_init(&opt_value, 1024);
if ((res = tlv_add_node_id(&opt_value, node_info->node_id)) != 0) {
goto exit_dynar_destroy;
}
if (node_info->data_center_id != 0) {
if ((res = tlv_add_data_center_id(&opt_value, node_info->data_center_id)) != 0) {
goto exit_dynar_destroy;
}
}
if (node_info->node_state != TLV_NODE_STATE_NOT_SET) {
if ((res = tlv_add_node_state(&opt_value, node_info->node_state)) != 0) {
goto exit_dynar_destroy;
}
}
res = tlv_add(msg, TLV_OPT_NODE_INFO, dynar_size(&opt_value), dynar_data(&opt_value));
if (res != 0) {
goto exit_dynar_destroy;
}
exit_dynar_destroy:
dynar_destroy(&opt_value);
return (res);
}
int
tlv_add_node_list_type(struct dynar *msg, enum tlv_node_list_type node_list_type)
{
return (tlv_add_u8(msg, TLV_OPT_NODE_LIST_TYPE, node_list_type));
}
int
tlv_add_vote(struct dynar *msg, enum tlv_vote vote)
{
return (tlv_add_u8(msg, TLV_OPT_VOTE, vote));
}
void
tlv_iter_init_str(const char *msg, size_t msg_len, size_t msg_header_len,
struct tlv_iterator *tlv_iter)
{
tlv_iter->msg = msg;
tlv_iter->msg_len = msg_len;
tlv_iter->current_pos = 0;
tlv_iter->msg_header_len = msg_header_len;
tlv_iter->iter_next_called = 0;
}
void
tlv_iter_init(const struct dynar *msg, size_t msg_header_len, struct tlv_iterator *tlv_iter)
{
tlv_iter->msg = msg;
tlv_iter->current_pos = 0;
tlv_iter->msg_header_len = msg_header_len;
tlv_iter_init_str(dynar_data(msg), dynar_size(msg), msg_header_len, tlv_iter);
}
enum tlv_opt_type
@ -275,7 +406,7 @@ tlv_iter_get_type(const struct tlv_iterator *tlv_iter)
uint16_t ntype;
uint16_t type;
memcpy(&ntype, dynar_data(tlv_iter->msg) + tlv_iter->current_pos, sizeof(ntype));
memcpy(&ntype, tlv_iter->msg + tlv_iter->current_pos, sizeof(ntype));
type = ntohs(ntype);
return (type);
@ -287,7 +418,7 @@ tlv_iter_get_len(const struct tlv_iterator *tlv_iter)
uint16_t nlen;
uint16_t len;
memcpy(&nlen, dynar_data(tlv_iter->msg) + tlv_iter->current_pos + TLV_TYPE_LENGTH, sizeof(nlen));
memcpy(&nlen, tlv_iter->msg + tlv_iter->current_pos + TLV_TYPE_LENGTH, sizeof(nlen));
len = ntohs(nlen);
return (len);
@ -297,7 +428,7 @@ const char *
tlv_iter_get_data(const struct tlv_iterator *tlv_iter)
{
return (dynar_data(tlv_iter->msg) + tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH);
return (tlv_iter->msg + tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH);
}
int
@ -305,7 +436,8 @@ tlv_iter_next(struct tlv_iterator *tlv_iter)
{
uint16_t len;
if (tlv_iter->current_pos == 0) {
if (tlv_iter->iter_next_called == 0) {
tlv_iter->iter_next_called = 1;
tlv_iter->current_pos = tlv_iter->msg_header_len;
goto check_tlv_validity;
@ -313,7 +445,8 @@ tlv_iter_next(struct tlv_iterator *tlv_iter)
len = tlv_iter_get_len(tlv_iter);
if (tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len >= dynar_size(tlv_iter->msg)) {
if (tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len >=
tlv_iter->msg_len) {
return (0);
}
@ -325,7 +458,7 @@ check_tlv_validity:
*/
len = tlv_iter_get_len(tlv_iter);
if (tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len > dynar_size(tlv_iter->msg)) {
if (tlv_iter->current_pos + TLV_TYPE_LENGTH + TLV_LENGTH_LENGTH + len > tlv_iter->msg_len) {
return (-1);
}
@ -433,8 +566,8 @@ tlv_iter_decode_u16_array(struct tlv_iterator *tlv_iter, uint16_t **u16a, size_t
}
int
tlv_iter_decode_supported_options(struct tlv_iterator *tlv_iter, enum tlv_opt_type **supported_options,
size_t *no_supported_options)
tlv_iter_decode_supported_options(struct tlv_iterator *tlv_iter,
enum tlv_opt_type **supported_options, size_t *no_supported_options)
{
uint16_t *u16a;
enum tlv_opt_type *tlv_opt_array;
@ -465,7 +598,8 @@ tlv_iter_decode_supported_options(struct tlv_iterator *tlv_iter, enum tlv_opt_ty
int
tlv_iter_decode_supported_decision_algorithms(struct tlv_iterator *tlv_iter,
enum tlv_decision_algorithm_type **supported_decision_algorithms, size_t *no_supported_decision_algorithms)
enum tlv_decision_algorithm_type **supported_decision_algorithms,
size_t *no_supported_decision_algorithms)
{
uint16_t *u16a;
enum tlv_decision_algorithm_type *tlv_decision_algorithm_type_array;
@ -517,7 +651,28 @@ tlv_iter_decode_u16(struct tlv_iterator *tlv_iter, uint16_t *u16)
}
int
tlv_iter_decode_reply_error_code(struct tlv_iterator *tlv_iter, enum tlv_reply_error_code *reply_error_code)
tlv_iter_decode_u64(struct tlv_iterator *tlv_iter, uint64_t *u64)
{
const char *opt_data;
uint64_t opt_len;
uint64_t nu64;
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(nu64)) {
return (-1);
}
memcpy(&nu64, opt_data, sizeof(nu64));
*u64 = be64toh(nu64);
return (0);
}
int
tlv_iter_decode_reply_error_code(struct tlv_iterator *tlv_iter,
enum tlv_reply_error_code *reply_error_code)
{
return (tlv_iter_decode_u16(tlv_iter, (uint16_t *)reply_error_code));
@ -527,24 +682,28 @@ int
tlv_iter_decode_tls_supported(struct tlv_iterator *tlv_iter, enum tlv_tls_supported *tls_supported)
{
uint8_t u8;
enum tlv_tls_supported tmp_tls_supported;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
*tls_supported = u8;
tmp_tls_supported = u8;
if (*tls_supported != TLV_TLS_UNSUPPORTED &&
*tls_supported != TLV_TLS_SUPPORTED &&
*tls_supported != TLV_TLS_REQUIRED) {
if (tmp_tls_supported != TLV_TLS_UNSUPPORTED &&
tmp_tls_supported != TLV_TLS_SUPPORTED &&
tmp_tls_supported != TLV_TLS_REQUIRED) {
return (-4);
}
*tls_supported = tmp_tls_supported;
return (0);
}
int
tlv_iter_decode_decision_algorithm(struct tlv_iterator *tlv_iter, enum tlv_decision_algorithm_type *decision_algorithm)
tlv_iter_decode_decision_algorithm(struct tlv_iterator *tlv_iter,
enum tlv_decision_algorithm_type *decision_algorithm)
{
uint16_t u16;
@ -557,6 +716,158 @@ tlv_iter_decode_decision_algorithm(struct tlv_iterator *tlv_iter, enum tlv_decis
return (0);
}
int
tlv_iter_decode_ring_id(struct tlv_iterator *tlv_iter, struct tlv_ring_id *ring_id)
{
const char *opt_data;
uint16_t opt_len;
uint32_t nu32;
uint64_t nu64;
char tmp_buf[12];
opt_len = tlv_iter_get_len(tlv_iter);
opt_data = tlv_iter_get_data(tlv_iter);
if (opt_len != sizeof(tmp_buf)) {
return (-1);
}
memcpy(&nu32, opt_data, sizeof(nu32));
memcpy(&nu64, opt_data + sizeof(nu32), sizeof(nu64));
ring_id->node_id = ntohl(nu32);
ring_id->seq = be64toh(nu64);
return (0);
}
int
tlv_iter_decode_node_state(struct tlv_iterator *tlv_iter, enum tlv_node_state *node_state)
{
uint8_t u8;
enum tlv_node_state tmp_node_state;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_node_state = u8;
if (tmp_node_state != TLV_NODE_STATE_MEMBER &&
tmp_node_state != TLV_NODE_STATE_DEAD &&
tmp_node_state != TLV_NODE_STATE_LEAVING) {
return (-4);
}
*node_state = tmp_node_state;
return (0);
}
int
tlv_iter_decode_node_info(struct tlv_iterator *tlv_iter, struct tlv_node_info *node_info)
{
struct tlv_iterator data_tlv_iter;
int iter_res;
int res;
enum tlv_opt_type opt_type;
struct tlv_node_info tmp_node_info;
memset(&tmp_node_info, 0, sizeof(tmp_node_info));
tlv_iter_init_str(tlv_iter_get_data(tlv_iter), tlv_iter_get_len(tlv_iter), 0,
&data_tlv_iter);
while ((iter_res = tlv_iter_next(&data_tlv_iter)) > 0) {
opt_type = tlv_iter_get_type(&data_tlv_iter);
switch (opt_type) {
case TLV_OPT_NODE_ID:
if ((res = tlv_iter_decode_u32(&data_tlv_iter,
&tmp_node_info.node_id)) != 0) {
return (res);
}
break;
case TLV_OPT_DATA_CENTER_ID:
if ((res = tlv_iter_decode_u32(&data_tlv_iter,
&tmp_node_info.data_center_id)) != 0) {
return (res);
}
break;
case TLV_OPT_NODE_STATE:
if ((res = tlv_iter_decode_node_state(&data_tlv_iter,
&tmp_node_info.node_state)) != 0) {
return (res);
}
break;
default:
/*
* Other options are not processed
*/
break;
}
}
if (iter_res != 0) {
return (-3);
}
if (tmp_node_info.node_id == 0) {
return (-4);
}
memcpy(node_info, &tmp_node_info, sizeof(tmp_node_info));
return (0);
}
int
tlv_iter_decode_node_list_type(struct tlv_iterator *tlv_iter,
enum tlv_node_list_type *node_list_type)
{
uint8_t u8;
enum tlv_node_list_type tmp_node_list_type;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_node_list_type = u8;
if (tmp_node_list_type != TLV_NODE_LIST_TYPE_INITIAL_CONFIG &&
tmp_node_list_type != TLV_NODE_LIST_TYPE_CHANGED_CONFIG &&
tmp_node_list_type != TLV_NODE_LIST_TYPE_MEMBERSHIP) {
return (-4);
}
*node_list_type = tmp_node_list_type;
return (0);
}
int
tlv_iter_decode_vote(struct tlv_iterator *tlv_iter, enum tlv_vote *vote)
{
uint8_t u8;
enum tlv_vote tmp_vote;
if (tlv_iter_decode_u8(tlv_iter, &u8) != 0) {
return (-1);
}
tmp_vote = u8;
if (tmp_vote != TLV_VOTE_UNDECIDED &&
tmp_vote != TLV_VOTE_ACK &&
tmp_vote != TLV_VOTE_NACK) {
return (-4);
}
*vote = tmp_vote;
return (0);
}
void
tlv_get_supported_options(enum tlv_opt_type **supported_options, size_t *no_supported_options)
{

View File

@ -58,6 +58,13 @@ enum tlv_opt_type {
TLV_OPT_SUPPORTED_DECISION_ALGORITHMS = 10,
TLV_OPT_DECISION_ALGORITHM = 11,
TLV_OPT_HEARTBEAT_INTERVAL = 12,
TLV_OPT_RING_ID = 13,
TLV_OPT_CONFIG_VERSION = 14,
TLV_OPT_DATA_CENTER_ID = 15,
TLV_OPT_NODE_STATE = 16,
TLV_OPT_NODE_INFO = 17,
TLV_OPT_NODE_LIST_TYPE = 18,
TLV_OPT_VOTE = 19,
};
enum tlv_tls_supported {
@ -87,22 +94,61 @@ enum tlv_decision_algorithm_type {
TLV_DECISION_ALGORITHM_TYPE_TEST = 0,
};
struct tlv_iterator {
const struct dynar *msg;
size_t current_pos;
size_t msg_header_len;
struct tlv_ring_id {
uint32_t node_id;
uint64_t seq;
};
extern int tlv_add(struct dynar *msg, enum tlv_opt_type opt_type, uint16_t opt_len,
const void *value);
enum tlv_node_state {
TLV_NODE_STATE_NOT_SET = 0,
TLV_NODE_STATE_MEMBER = 1,
TLV_NODE_STATE_DEAD = 2,
TLV_NODE_STATE_LEAVING = 3,
};
extern int tlv_add_u32(struct dynar *msg, enum tlv_opt_type opt_type, uint32_t u32);
enum tlv_node_list_type {
TLV_NODE_LIST_TYPE_INITIAL_CONFIG = 0,
TLV_NODE_LIST_TYPE_CHANGED_CONFIG = 1,
TLV_NODE_LIST_TYPE_MEMBERSHIP = 2,
};
extern int tlv_add_u8(struct dynar *msg, enum tlv_opt_type opt_type, uint8_t u8);
enum tlv_vote {
TLV_VOTE_UNDECIDED = 0,
TLV_VOTE_ACK = 1,
TLV_VOTE_NACK = 2,
};
extern int tlv_add_u16(struct dynar *msg, enum tlv_opt_type opt_type, uint16_t u16);
struct tlv_node_info {
uint32_t node_id;
uint32_t data_center_id; // 0 - data center id was not set
enum tlv_node_state node_state; // TLV_NODE_STATE_NOT_SET - state was not set
};
extern int tlv_add_string(struct dynar *msg, enum tlv_opt_type opt_type, const char *str);
struct tlv_iterator {
const char *msg;
size_t msg_len;
size_t current_pos;
size_t msg_header_len;
int iter_next_called;
};
extern int tlv_add(struct dynar *msg, enum tlv_opt_type opt_type,
uint16_t opt_len, const void *value);
extern int tlv_add_u32(struct dynar *msg, enum tlv_opt_type opt_type,
uint32_t u32);
extern int tlv_add_u8(struct dynar *msg, enum tlv_opt_type opt_type,
uint8_t u8);
extern int tlv_add_u16(struct dynar *msg, enum tlv_opt_type opt_type,
uint16_t u16);
extern int tlv_add_u64(struct dynar *msg, enum tlv_opt_type opt_type,
uint64_t u64);
extern int tlv_add_string(struct dynar *msg, enum tlv_opt_type opt_type,
const char *str);
extern int tlv_add_u16_array(struct dynar *msg, enum tlv_opt_type opt_type,
const uint16_t *array, size_t array_size);
@ -110,15 +156,19 @@ extern int tlv_add_u16_array(struct dynar *msg, enum tlv_opt_type opt_type,
extern int tlv_add_supported_options(struct dynar *msg,
const enum tlv_opt_type *supported_options, size_t no_supported_options);
extern int tlv_add_msg_seq_number(struct dynar *msg, uint32_t msg_seq_number);
extern int tlv_add_msg_seq_number(struct dynar *msg,
uint32_t msg_seq_number);
extern int tlv_add_cluster_name(struct dynar *msg, const char *cluster_name);
extern int tlv_add_tls_supported(struct dynar *msg, enum tlv_tls_supported tls_supported);
extern int tlv_add_tls_supported(struct dynar *msg,
enum tlv_tls_supported tls_supported);
extern int tlv_add_tls_client_cert_required(struct dynar *msg, int tls_client_cert_required);
extern int tlv_add_tls_client_cert_required(struct dynar *msg,
int tls_client_cert_required);
extern int tlv_add_reply_error_code(struct dynar *msg, enum tlv_reply_error_code error_code);
extern int tlv_add_reply_error_code(struct dynar *msg,
enum tlv_reply_error_code error_code);
extern int tlv_add_node_id(struct dynar *msg, uint32_t node_id);
@ -134,7 +184,31 @@ extern int tlv_add_supported_decision_algorithms(struct dynar *msg,
extern int tlv_add_decision_algorithm(struct dynar *msg,
enum tlv_decision_algorithm_type decision_algorithm);
extern int tlv_add_heartbeat_interval(struct dynar *msg, uint32_t heartbeat_interval);
extern int tlv_add_heartbeat_interval(struct dynar *msg,
uint32_t heartbeat_interval);
extern int tlv_add_ring_id(struct dynar *msg,
const struct tlv_ring_id *ring_id);
extern int tlv_add_config_version(struct dynar *msg,
uint64_t config_version);
extern int tlv_add_data_center_id(struct dynar *msg,
uint32_t data_center_id);
extern int tlv_add_node_state(struct dynar *msg,
enum tlv_node_state node_state);
extern int tlv_add_node_info(struct dynar *msg,
const struct tlv_node_info *node_info);
extern int tlv_add_node_list_type(struct dynar *msg,
enum tlv_node_list_type node_list_type);
extern int tlv_add_vote(struct dynar *msg, enum tlv_vote vote);
extern void tlv_iter_init_str(const char *msg, size_t msg_len,
size_t msg_header_len, struct tlv_iterator *tlv_iter);
extern void tlv_iter_init(const struct dynar *msg, size_t msg_header_len,
struct tlv_iterator *tlv_iter);
@ -152,12 +226,14 @@ extern int tlv_iter_decode_u8(struct tlv_iterator *tlv_iter, uint8_t *res);
extern int tlv_iter_decode_tls_supported(struct tlv_iterator *tlv_iter,
enum tlv_tls_supported *tls_supported);
extern int tlv_iter_decode_u32(struct tlv_iterator *tlv_iter, uint32_t *res);
extern int tlv_iter_decode_u32(struct tlv_iterator *tlv_iter,
uint32_t *res);
extern int tlv_iter_decode_str(struct tlv_iterator *tlv_iter, char **str, size_t *str_len);
extern int tlv_iter_decode_str(struct tlv_iterator *tlv_iter, char **str,
size_t *str_len);
extern int tlv_iter_decode_client_cert_required(struct tlv_iterator *tlv_iter,
uint8_t *client_cert_required);
extern int tlv_iter_decode_client_cert_required(
struct tlv_iterator *tlv_iter, uint8_t *client_cert_required);
extern int tlv_iter_decode_u16_array(struct tlv_iterator *tlv_iter,
uint16_t **u16a, size_t *no_items);
@ -165,10 +241,16 @@ extern int tlv_iter_decode_u16_array(struct tlv_iterator *tlv_iter,
extern int tlv_iter_decode_supported_options(struct tlv_iterator *tlv_iter,
enum tlv_opt_type **supported_options, size_t *no_supported_options);
extern int tlv_iter_decode_supported_decision_algorithms(struct tlv_iterator *tlv_iter,
enum tlv_decision_algorithm_type **supported_decision_algorithms, size_t *no_supported_decision_algorithms);
extern int tlv_iter_decode_supported_decision_algorithms(
struct tlv_iterator *tlv_iter,
enum tlv_decision_algorithm_type **supported_decision_algorithms,
size_t *no_supported_decision_algorithms);
extern int tlv_iter_decode_u16(struct tlv_iterator *tlv_iter, uint16_t *u16);
extern int tlv_iter_decode_u16(struct tlv_iterator *tlv_iter,
uint16_t *u16);
extern int tlv_iter_decode_u64(struct tlv_iterator *tlv_iter,
uint64_t *u64);
extern int tlv_iter_decode_reply_error_code(struct tlv_iterator *tlv_iter,
enum tlv_reply_error_code *reply_error_code);
@ -176,6 +258,21 @@ extern int tlv_iter_decode_reply_error_code(struct tlv_iterator *tlv_iter,
extern int tlv_iter_decode_decision_algorithm(struct tlv_iterator *tlv_iter,
enum tlv_decision_algorithm_type *decision_algorithm);
extern int tlv_iter_decode_ring_id(struct tlv_iterator *tlv_iter,
struct tlv_ring_id *ring_id);
extern int tlv_iter_decode_node_state(struct tlv_iterator *tlv_iter,
enum tlv_node_state *node_state);
extern int tlv_iter_decode_node_info(struct tlv_iterator *tlv_iter,
struct tlv_node_info *node_info);
extern int tlv_iter_decode_node_list_type(struct tlv_iterator *tlv_iter,
enum tlv_node_list_type *node_list_type);
extern int tlv_iter_decode_vote(struct tlv_iterator *tlv_iter,
enum tlv_vote *vote);
extern void tlv_get_supported_options(enum tlv_opt_type **supported_options,
size_t *no_supported_options);