Merge pull request #319 from kronosnet/stable1-proposed

stable1-proposed
This commit is contained in:
Fabio M. Di Nitto 2020-10-19 07:10:35 +02:00 committed by GitHub
commit b8d18c8360
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 369 additions and 355 deletions

View File

@ -373,3 +373,208 @@
fun:test
fun:main
}
{
nss internal leak (3.55) non recurring (spotted on f34)
Memcheck:Leak
match-leak-kinds: definite
fun:malloc
fun:realpath@@GLIBC_2.3
obj:*
obj:*
obj:*
obj:/usr/lib64/libnss3.so
fun:SECMOD_LoadModule
fun:SECMOD_LoadModule
obj:/usr/lib64/libnss3.so
fun:NSS_NoDB_Init
fun:init_nss
fun:nsscrypto_init
fun:crypto_init
fun:_knet_handle_crypto_set_config
}
{
openssl uncoditional jump (spotted on Ubuntu devel 10092020)
Memcheck:Cond
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_generate
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_instantiate
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_get0_public
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:encrypt_openssl.isra.0
fun:opensslcrypto_encrypt_and_signv
fun:opensslcrypto_encrypt_and_sign
fun:send_ping
fun:_send_pings
fun:_handle_heartbt_thread
fun:start_thread
}
{
openssl uncoditional jump (spotted on Ubuntu devel 10092020)
Memcheck:Cond
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_generate
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_instantiate
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_get0_public
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:encrypt_openssl.isra.0
fun:opensslcrypto_encrypt_and_signv
fun:opensslcrypto_encrypt_and_sign
fun:send_ping
fun:_send_pings
fun:_handle_heartbt_thread
}
{
openssl uncoditional jump (spotted on Ubuntu devel 10092020)
Memcheck:Cond
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_generate
fun:RAND_DRBG_bytes
fun:encrypt_openssl.isra.0
fun:opensslcrypto_encrypt_and_signv
fun:opensslcrypto_encrypt_and_sign
fun:send_ping
fun:_send_pings
fun:_handle_heartbt_thread
fun:start_thread
fun:clone
}
{
openssl uncoditional jump (spotted on Ubuntu devel 10092020)
Memcheck:Cond
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_generate
fun:RAND_DRBG_bytes
fun:encrypt_openssl.isra.0
fun:opensslcrypto_encrypt_and_signv
fun:opensslcrypto_encrypt_and_sign
fun:send_ping
fun:_send_pings
fun:_handle_heartbt_thread
fun:start_thread
fun:clone
}
{
ubuntu-devel new toolchain is not stable yet (spotted on Ubuntu devel 10092020)
Memcheck:Param
socketcall.sendto(msg)
fun:sendto
fun:send_ping
fun:_send_pings
fun:_handle_heartbt_thread
fun:start_thread
fun:clone
}
{
ubuntu-devel new toolchain is not stable yet (spotted on Ubuntu devel 10092020)
Memcheck:Param
socketcall.sendto(msg)
fun:sendto
fun:send_pong
fun:process_ping
fun:_parse_recv_from_links
fun:_handle_recv_from_links
fun:_handle_recv_from_links_thread
fun:start_thread
fun:clone
}
{
ubuntu-devel new toolchain is not stable yet (spotted on Ubuntu devel 10092020)
Memcheck:Param
socketcall.sendto(msg)
fun:sendto
fun:send_pmtud_reply
fun:process_pmtud
fun:_parse_recv_from_links
fun:_handle_recv_from_links
fun:_handle_recv_from_links_thread
fun:start_thread
fun:clone
}
{
ubuntu-devel new toolchain is not stable yet (spotted on Ubuntu devel 10092020)
Memcheck:Param
sendmsg(msg.msg_iov[0])
fun:__libc_sendmsg
fun:sendmsg
fun:_sendmmsg
fun:_dispatch_to_links
fun:_prep_and_send_msgs
fun:_parse_recv_from_sock
fun:_handle_send_to_links
fun:_handle_send_to_links_thread
fun:start_thread
fun:clone
}
{
openssl uncoditional jump (clang) (spotted on Ubuntu devel 10092020)
Memcheck:Cond
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_generate
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_instantiate
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_get0_public
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:encrypt_openssl
fun:opensslcrypto_encrypt_and_signv
fun:opensslcrypto_encrypt_and_sign
fun:send_ping
fun:_send_pings
fun:_handle_heartbt_thread
fun:start_thread
}
{
openssl uncoditional jump (clang) (spotted on Ubuntu devel 10092020)
Memcheck:Cond
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_generate
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_instantiate
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_get0_public
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:encrypt_openssl
fun:opensslcrypto_encrypt_and_signv
fun:opensslcrypto_encrypt_and_sign
fun:send_ping
fun:_send_pings
fun:_handle_heartbt_thread
}
{
openssl uncoditional jump (clang) (spotted on Ubuntu devel 10092020)
Memcheck:Cond
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_generate
fun:RAND_DRBG_bytes
fun:encrypt_openssl
fun:opensslcrypto_encrypt_and_signv
fun:opensslcrypto_encrypt_and_sign
fun:send_ping
fun:_send_pings
fun:_handle_heartbt_thread
fun:start_thread
fun:clone
}
{
openssl uncoditional jump (clang) (spotted on Ubuntu devel 10092020)
Memcheck:Cond
obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1
fun:RAND_DRBG_generate
fun:RAND_DRBG_bytes
fun:encrypt_openssl
fun:opensslcrypto_encrypt_and_signv
fun:opensslcrypto_encrypt_and_sign
fun:send_ping
fun:_send_pings
fun:_handle_heartbt_thread
fun:start_thread
fun:clone
}

View File

@ -167,13 +167,6 @@ static int _init_socks(knet_handle_t knet_h)
{
int savederrno = 0;
if (_init_socketpair(knet_h, knet_h->hostsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, knet_h->dstsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s",
@ -191,7 +184,6 @@ exit_fail:
static void _close_socks(knet_handle_t knet_h)
{
_close_socketpair(knet_h, knet_h->dstsockfd);
_close_socketpair(knet_h, knet_h->hostsockfd);
}
static int _init_buffers(knet_handle_t knet_h)
@ -406,18 +398,6 @@ static int _init_epolls(knet_handle_t knet_h)
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->hostsockfd[0];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->dstsockfd[0];
@ -453,7 +433,6 @@ static void _close_epolls(knet_handle_t knet_h)
}
}
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev);
epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev);
close(knet_h->send_to_links_epollfd);
close(knet_h->recv_from_links_epollfd);

View File

@ -527,27 +527,6 @@ int knet_host_enable_status_change_notify(knet_handle_t knet_h,
return 0;
}
int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen)
{
ssize_t ret = 0;
if (knet_h->fini_in_progress) {
return 0;
}
ret = sendto(knet_h->hostsockfd[1], data, datalen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
if (ret < 0) {
log_debug(knet_h, KNET_SUB_HOST, "Unable to write data to hostpipe. Error: %s", strerror(errno));
return -1;
}
if ((size_t)ret != datalen) {
log_debug(knet_h, KNET_SUB_HOST, "Unable to write all data to hostpipe. Expected: %zu, Written: %zd.", datalen, ret);
return -1;
}
return 0;
}
static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num)
{
int i;

View File

@ -15,7 +15,6 @@
int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf);
void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf);
int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen);
int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host);
int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host);

View File

@ -37,8 +37,6 @@
#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX + 1
#define KNET_INTERNAL_DATA_CHANNEL KNET_DATAFD_MAX
/*
* Size of threads stack. Value is choosen by experimenting, how much is needed
* to sucesfully finish test suite, and at the time of writing patch it was
@ -172,7 +170,6 @@ struct knet_handle {
struct knet_sock sockfd[KNET_DATAFD_MAX + 1];
int logfd;
uint8_t log_levels[KNET_MAX_SUBSYSTEMS];
int hostsockfd[2];
int dstsockfd[2];
int send_to_links_epollfd;
int recv_from_links_epollfd;

View File

@ -16,6 +16,7 @@
#include <stdlib.h>
#include "internals.h"
#include "netutils.h"
#include "logging.h"
#include "transports.h"
#include "links_acl.h"
@ -29,15 +30,6 @@ struct ip_acl_match_entry {
struct ip_acl_match_entry *next;
};
/*
* s6_addr32 is not defined in BSD userland, only kernel.
* definition is the same as linux and it works fine for
* what we need.
*/
#ifndef s6_addr32
#define s6_addr32 __u6_addr.__u6_addr32
#endif
/*
* IPv4 See if the address we have matches the current match entry
*/

View File

@ -19,65 +19,22 @@
#include "internals.h"
#include "netutils.h"
static int is_v4_mapped(const struct sockaddr_storage *ss, socklen_t salen)
int cmpaddr(const struct sockaddr_storage *ss1, const struct sockaddr_storage *ss2)
{
char map[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff };
struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *) ss;
return memcmp(&addr6->sin6_addr, map, 12);
}
int cmpaddr(const struct sockaddr_storage *ss1, socklen_t sslen1,
const struct sockaddr_storage *ss2, socklen_t sslen2)
{
int ss1_offset = 0, ss2_offset = 0;
struct sockaddr_in6 *ss1_addr6 = (struct sockaddr_in6 *)ss1;
struct sockaddr_in6 *ss2_addr6 = (struct sockaddr_in6 *)ss2;
struct sockaddr_in *ss1_addr = (struct sockaddr_in *)ss1;
struct sockaddr_in *ss2_addr = (struct sockaddr_in *)ss2;
char *addr1, *addr2;
if (ss1->ss_family == ss2->ss_family) {
return memcmp(ss1, ss2, sslen1);
if (ss1->ss_family != ss2->ss_family) {
return -1;
}
if (ss1->ss_family == AF_INET6) {
if (is_v4_mapped(ss1, sslen1)) {
return 1;
}
addr1 = (char *)&ss1_addr6->sin6_addr;
ss1_offset = 12;
} else {
addr1 = (char *)&ss1_addr->sin_addr;
return memcmp(&ss1_addr6->sin6_addr.s6_addr32, &ss2_addr6->sin6_addr.s6_addr32, sizeof(struct in6_addr));
}
if (ss2->ss_family == AF_INET6) {
if (is_v4_mapped(ss2, sslen2)) {
return 1;
}
addr2 = (char *)&ss2_addr6->sin6_addr;
ss2_offset = 12;
} else {
addr2 = (char *)&ss2_addr->sin_addr;
}
return memcmp(addr1+ss1_offset, addr2+ss2_offset, 4);
}
int cpyaddrport(struct sockaddr_storage *dst, const struct sockaddr_storage *src)
{
struct sockaddr_in6 *dst_addr6 = (struct sockaddr_in6 *)dst;
struct sockaddr_in6 *src_addr6 = (struct sockaddr_in6 *)src;
memset(dst, 0, sizeof(struct sockaddr_storage));
if (src->ss_family == AF_INET6) {
dst->ss_family = src->ss_family;
memmove(&dst_addr6->sin6_port, &src_addr6->sin6_port, sizeof(in_port_t));
memmove(&dst_addr6->sin6_addr, &src_addr6->sin6_addr, sizeof(struct in6_addr));
} else {
memmove(dst, src, sizeof(struct sockaddr_in));
}
return 0;
return memcmp(&ss1_addr->sin_addr.s_addr, &ss2_addr->sin_addr.s_addr, sizeof(struct in_addr));
}
socklen_t sockaddr_len(const struct sockaddr_storage *ss)

View File

@ -11,9 +11,19 @@
#define __KNET_NETUTILS_H__
#include <sys/socket.h>
#include <netinet/in.h>
int cmpaddr(const struct sockaddr_storage *ss1, socklen_t sslen1, const struct sockaddr_storage *ss2, socklen_t sslen2);
int cpyaddrport(struct sockaddr_storage *dst, const struct sockaddr_storage *src);
/*
* s6_addr32 is not defined in BSD userland, only kernel.
* definition is the same as linux and it works fine for
* what we need.
*/
#ifndef s6_addr32
#define s6_addr32 __u6_addr.__u6_addr32
#endif
int cmpaddr(const struct sockaddr_storage *ss1, const struct sockaddr_storage *ss2);
socklen_t sockaddr_len(const struct sockaddr_storage *ss);
#endif

View File

@ -19,78 +19,6 @@
#include "libknet.h"
#if 0
/*
* for future protocol extension (re-switching table calculation)
*/
struct knet_hinfo_link {
uint8_t khl_link_id;
uint8_t khl_link_dynamic;
uint8_t khl_link_priority;
uint64_t khl_link_latency;
char khl_link_dst_ipaddr[KNET_MAX_HOST_LEN];
char khl_link_dst_port[KNET_MAX_PORT_LEN];
} __attribute__((packed));
struct knet_hinfo_link_table {
knet_node_id_t khlt_node_id;
uint8_t khlt_local; /* we have this node connected locally */
struct knet_hinfo_link khlt_link[KNET_MAX_LINK]; /* info we send about each link in the node */
} __attribute__((packed));
struct link_table {
knet_node_id_t khdt_host_entries;
uint8_t khdt_host_maps[0]; /* array of knet_hinfo_link_table[khdt_host_entries] */
} __attribute__((packed));
#endif
#define KNET_HOSTINFO_LINK_STATUS_DOWN 0
#define KNET_HOSTINFO_LINK_STATUS_UP 1
struct knet_hostinfo_payload_link_status {
uint8_t khip_link_status_link_id; /* link id */
uint8_t khip_link_status_status; /* up/down status */
} __attribute__((packed));
/*
* union to reference possible individual payloads
*/
union knet_hostinfo_payload {
struct knet_hostinfo_payload_link_status knet_hostinfo_payload_link_status;
} __attribute__((packed));
/*
* due to the nature of knet_hostinfo, we are currently
* sending those data as part of knet_header_payload_data.khp_data_userdata
* and avoid a union that increses knet_header_payload_data size
* unnecessarely.
* This might change later on depending on how we implement
* host info exchange
*/
#define KNET_HOSTINFO_TYPE_LINK_UP_DOWN 0 // UNUSED
#define KNET_HOSTINFO_TYPE_LINK_TABLE 1 // NOT IMPLEMENTED
#define KNET_HOSTINFO_UCAST 0 /* send info to a specific host */
#define KNET_HOSTINFO_BCAST 1 /* send info to all known / connected hosts */
struct knet_hostinfo {
uint8_t khi_type; /* type of hostinfo we are sending */
uint8_t khi_bcast; /* hostinfo destination bcast/ucast */
knet_node_id_t khi_dst_node_id;/* used only if in ucast mode */
union knet_hostinfo_payload khi_payload;
} __attribute__((packed));
#define KNET_HOSTINFO_ALL_SIZE sizeof(struct knet_hostinfo)
#define KNET_HOSTINFO_SIZE (KNET_HOSTINFO_ALL_SIZE - sizeof(union knet_hostinfo_payload))
#define KNET_HOSTINFO_LINK_STATUS_SIZE (KNET_HOSTINFO_SIZE + sizeof(struct knet_hostinfo_payload_link_status))
#define khip_link_status_status khi_payload.knet_hostinfo_payload_link_status.khip_link_status_status
#define khip_link_status_link_id khi_payload.knet_hostinfo_payload_link_status.khip_link_status_link_id
/*
* typedef uint64_t seq_num_t;
* #define SEQ_MAX UINT64_MAX
@ -152,7 +80,6 @@ union knet_header_payload {
#define KNET_HEADER_VERSION 0x01 /* we currently support only one version */
#define KNET_HEADER_TYPE_DATA 0x00 /* pure data packet */
#define KNET_HEADER_TYPE_HOST_INFO 0x01 /* host status information pckt */
#define KNET_HEADER_TYPE_PMSK 0x80 /* packet mask */
#define KNET_HEADER_TYPE_PING 0x81 /* heartbeat */

View File

@ -18,10 +18,6 @@ int main(void)
printf("KNET_HEADER_PING_SIZE: %zu (%zu)\n", KNET_HEADER_PING_SIZE, sizeof(struct knet_header_payload_ping));
printf("KNET_HEADER_PMTUD_SIZE: %zu (%zu)\n", KNET_HEADER_PMTUD_SIZE, sizeof(struct knet_header_payload_pmtud));
printf("KNET_HEADER_DATA_SIZE: %zu (%zu)\n", KNET_HEADER_DATA_SIZE, sizeof(struct knet_header_payload_data));
printf("\n");
printf("KNET_HOSTINFO_ALL_SIZE: %zu\n", KNET_HOSTINFO_ALL_SIZE);
printf("KNET_HOSTINFO_SIZE: %zu\n", KNET_HOSTINFO_SIZE);
printf("KNET_HOSTINFO_LINK_STATUS_SIZE: %zu (%zu)\n", KNET_HOSTINFO_LINK_STATUS_SIZE, sizeof(struct knet_hostinfo_payload_link_status));
return 0;
}

View File

@ -241,13 +241,11 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[1];
int8_t channel;
struct sockaddr_storage pckt_src;
seq_num_t recv_seq_num;
int wipe_bufs = 0;
int try_decrypt = 0, decrypted = 0, i;
int try_decrypt = 0, decrypted = 0, i, found_link = 0;
for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
if (knet_h->crypto_instance[i]) {
@ -303,22 +301,16 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
return;
}
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
/* be aware this works only for PING / PONG and PMTUd packets! */
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
if (src_link->dynamic == KNET_LINK_DYNIP) {
/*
* cpyaddrport will only copy address and port of the incoming
* packet and strip extra bits such as flow and scopeid
*/
cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
&pckt_src, sockaddr_len(&pckt_src)) != 0) {
if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
@ -337,6 +329,18 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
*/
transport_link_dyn_connect(knet_h, sockfd, src_link);
}
} else { /* data packet */
for (i = 0; i < KNET_MAX_LINK; i++) {
src_link = &src_host->link[i];
if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) {
found_link = 1;
break;
}
}
if (!found_link) {
log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet.");
return;
}
}
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
@ -347,20 +351,44 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_HOST_INFO:
case KNET_HEADER_TYPE_DATA:
/* data stats at the top for consistency with TX */
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
if (decrypted) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = decrypt_time;
}
if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = decrypt_time;
}
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (!src_host->status.reachable) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
return;
}
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
@ -435,136 +463,91 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (decrypted) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = decrypt_time;
}
if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = decrypt_time;
}
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
if (knet_h->enabled != 1) /* data forward is disabled */
break;
if (knet_h->dst_host_filter_fn) {
size_t host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if (knet_h->enabled != 1) /* data forward is disabled */
break;
if ((!bcast) && (!dst_host_ids_entries)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
if (knet_h->dst_host_filter_fn) {
size_t host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
/* check if we are dst for this packet */
if (!bcast) {
if (dst_host_ids_entries > KNET_MAX_HOST) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
/* check if we are dst for this packet */
if (!bcast) {
if (dst_host_ids_entries > KNET_MAX_HOST) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return;
}
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
if (!found) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (!knet_h->sockfd[channel].in_use) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
if (!knet_h->sockfd[channel].in_use) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
outlen = 0;
memset(iov_out, 0, sizeof(iov_out));
outlen = 0;
memset(iov_out, 0, sizeof(iov_out));
retry:
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
log_debug(knet_h, KNET_SUB_RX,
"Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
iov_out[0].iov_len, outlen);
goto retry;
}
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
log_debug(knet_h, KNET_SUB_RX,
"Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
iov_out[0].iov_len, outlen);
goto retry;
}
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
}
} else { /* HOSTINFO */
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
switch(knet_hostinfo->khi_type) {
case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
break;
case KNET_HOSTINFO_TYPE_LINK_TABLE:
break;
default:
log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
break;
}
}
break;
case KNET_HEADER_TYPE_PING:

View File

@ -150,7 +150,6 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[PCKT_FRAG_MAX][2];
int iovcnt_out = 2;
uint8_t frag_idx;
@ -172,8 +171,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan
inbuf = knet_h->recv_from_sock_buf;
if ((knet_h->enabled != 1) &&
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
if (knet_h->enabled != 1) {
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
@ -259,15 +257,6 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan
}
}
break;
case KNET_HEADER_TYPE_HOST_INFO:
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
dst_host_ids_entries_temp = 1;
knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
}
break;
default:
log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
savederrno = ENOMSG;
@ -624,25 +613,19 @@ static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int
docallback = 1;
memset(&ev, 0, sizeof(struct epoll_event));
if (channel != KNET_INTERNAL_DATA_CHANNEL) {
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
} else {
knet_h->sockfd[channel].has_error = 1;
}
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
} else {
knet_h->sockfd[channel].has_error = 1;
}
/*
* TODO: add error handling for KNET_INTERNAL_DATA_CHANNEL
* once we add support for internal knet communication
*/
} else {
knet_h->recv_from_sock_buf->kh_type = type;
_parse_recv_from_sock(knet_h, inlen, channel, 0);
}
if ((docallback) && (channel != KNET_INTERNAL_DATA_CHANNEL)) {
if (docallback) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
@ -735,22 +718,17 @@ void *_handle_send_to_links_thread(void *data)
}
for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->hostsockfd[0]) {
type = KNET_HEADER_TYPE_HOST_INFO;
channel = KNET_INTERNAL_DATA_CHANNEL;
} else {
type = KNET_HEADER_TYPE_DATA;
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
if ((knet_h->sockfd[channel].in_use) &&
(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
break;
}
}
if (channel >= KNET_DATAFD_MAX) {
log_debug(knet_h, KNET_SUB_TX, "No available channels");
continue; /* channel not found */
type = KNET_HEADER_TYPE_DATA;
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
if ((knet_h->sockfd[channel].in_use) &&
(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
break;
}
}
if (channel >= KNET_DATAFD_MAX) {
log_debug(knet_h, KNET_SUB_TX, "No available channels");
continue; /* channel not found */
}
if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;

View File

@ -871,7 +871,13 @@ exit_error:
if ((i >= 0) && (i < MAX_ACCEPTED_SOCKS)) {
info->accepted_socks[i] = -1;
}
_set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
/*
* check the error to make coverity scan happy.
* _set_fd_tracker cannot fail at this stage
*/
if (_set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0){
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to update fdtracker for socket %d", new_fd);
}
free(accept_info);
if (new_fd >= 0) {
close(new_fd);
@ -939,7 +945,13 @@ static void _handle_listen_sctp_errors(knet_handle_t knet_h)
for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
if (sockfd == info->accepted_socks[i]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd);
_set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
/*
* check the error to make coverity scan happy.
* _set_fd_tracker cannot fail at this stage
*/
if (_set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to update fdtracker for socket %d", sockfd);
}
info->accepted_socks[i] = -1;
free(accept_info);
close(sockfd);