diff --git a/build-aux/knet_valgrind_memcheck.supp b/build-aux/knet_valgrind_memcheck.supp index 06a3ec0..e22fceb 100644 --- a/build-aux/knet_valgrind_memcheck.supp +++ b/build-aux/knet_valgrind_memcheck.supp @@ -373,3 +373,208 @@ fun:test fun:main } +{ + nss internal leak (3.55) non recurring (spotted on f34) + Memcheck:Leak + match-leak-kinds: definite + fun:malloc + fun:realpath@@GLIBC_2.3 + obj:* + obj:* + obj:* + obj:/usr/lib64/libnss3.so + fun:SECMOD_LoadModule + fun:SECMOD_LoadModule + obj:/usr/lib64/libnss3.so + fun:NSS_NoDB_Init + fun:init_nss + fun:nsscrypto_init + fun:crypto_init + fun:_knet_handle_crypto_set_config +} +{ + openssl uncoditional jump (spotted on Ubuntu devel 10092020) + Memcheck:Cond + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_generate + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_instantiate + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_get0_public + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:encrypt_openssl.isra.0 + fun:opensslcrypto_encrypt_and_signv + fun:opensslcrypto_encrypt_and_sign + fun:send_ping + fun:_send_pings + fun:_handle_heartbt_thread + fun:start_thread +} +{ + openssl uncoditional jump (spotted on Ubuntu devel 10092020) + Memcheck:Cond + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_generate + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_instantiate + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_get0_public + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:encrypt_openssl.isra.0 + fun:opensslcrypto_encrypt_and_signv + fun:opensslcrypto_encrypt_and_sign + fun:send_ping + fun:_send_pings + fun:_handle_heartbt_thread +} +{ + openssl uncoditional jump (spotted on Ubuntu devel 10092020) + Memcheck:Cond + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_generate + fun:RAND_DRBG_bytes + fun:encrypt_openssl.isra.0 + fun:opensslcrypto_encrypt_and_signv + fun:opensslcrypto_encrypt_and_sign + fun:send_ping + fun:_send_pings + fun:_handle_heartbt_thread + fun:start_thread + fun:clone +} +{ + openssl uncoditional jump (spotted on Ubuntu devel 10092020) + Memcheck:Cond + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_generate + fun:RAND_DRBG_bytes + fun:encrypt_openssl.isra.0 + fun:opensslcrypto_encrypt_and_signv + fun:opensslcrypto_encrypt_and_sign + fun:send_ping + fun:_send_pings + fun:_handle_heartbt_thread + fun:start_thread + fun:clone +} +{ + ubuntu-devel new toolchain is not stable yet (spotted on Ubuntu devel 10092020) + Memcheck:Param + socketcall.sendto(msg) + fun:sendto + fun:send_ping + fun:_send_pings + fun:_handle_heartbt_thread + fun:start_thread + fun:clone +} +{ + ubuntu-devel new toolchain is not stable yet (spotted on Ubuntu devel 10092020) + Memcheck:Param + socketcall.sendto(msg) + fun:sendto + fun:send_pong + fun:process_ping + fun:_parse_recv_from_links + fun:_handle_recv_from_links + fun:_handle_recv_from_links_thread + fun:start_thread + fun:clone +} +{ + ubuntu-devel new toolchain is not stable yet (spotted on Ubuntu devel 10092020) + Memcheck:Param + socketcall.sendto(msg) + fun:sendto + fun:send_pmtud_reply + fun:process_pmtud + fun:_parse_recv_from_links + fun:_handle_recv_from_links + fun:_handle_recv_from_links_thread + fun:start_thread + fun:clone +} +{ + ubuntu-devel new toolchain is not stable yet (spotted on Ubuntu devel 10092020) + Memcheck:Param + sendmsg(msg.msg_iov[0]) + fun:__libc_sendmsg + fun:sendmsg + fun:_sendmmsg + fun:_dispatch_to_links + fun:_prep_and_send_msgs + fun:_parse_recv_from_sock + fun:_handle_send_to_links + fun:_handle_send_to_links_thread + fun:start_thread + fun:clone +} +{ + openssl uncoditional jump (clang) (spotted on Ubuntu devel 10092020) + Memcheck:Cond + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_generate + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_instantiate + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_get0_public + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:encrypt_openssl + fun:opensslcrypto_encrypt_and_signv + fun:opensslcrypto_encrypt_and_sign + fun:send_ping + fun:_send_pings + fun:_handle_heartbt_thread + fun:start_thread +} +{ + openssl uncoditional jump (clang) (spotted on Ubuntu devel 10092020) + Memcheck:Cond + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_generate + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_instantiate + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_get0_public + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:encrypt_openssl + fun:opensslcrypto_encrypt_and_signv + fun:opensslcrypto_encrypt_and_sign + fun:send_ping + fun:_send_pings + fun:_handle_heartbt_thread +} +{ + openssl uncoditional jump (clang) (spotted on Ubuntu devel 10092020) + Memcheck:Cond + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_generate + fun:RAND_DRBG_bytes + fun:encrypt_openssl + fun:opensslcrypto_encrypt_and_signv + fun:opensslcrypto_encrypt_and_sign + fun:send_ping + fun:_send_pings + fun:_handle_heartbt_thread + fun:start_thread + fun:clone +} +{ + openssl uncoditional jump (clang) (spotted on Ubuntu devel 10092020) + Memcheck:Cond + obj:/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1 + fun:RAND_DRBG_generate + fun:RAND_DRBG_bytes + fun:encrypt_openssl + fun:opensslcrypto_encrypt_and_signv + fun:opensslcrypto_encrypt_and_sign + fun:send_ping + fun:_send_pings + fun:_handle_heartbt_thread + fun:start_thread + fun:clone +} diff --git a/libknet/handle.c b/libknet/handle.c index 68641d6..d4bd86c 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -167,13 +167,6 @@ static int _init_socks(knet_handle_t knet_h) { int savederrno = 0; - if (_init_socketpair(knet_h, knet_h->hostsockfd)) { - savederrno = errno; - log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s", - strerror(savederrno)); - goto exit_fail; - } - if (_init_socketpair(knet_h, knet_h->dstsockfd)) { savederrno = errno; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s", @@ -191,7 +184,6 @@ exit_fail: static void _close_socks(knet_handle_t knet_h) { _close_socketpair(knet_h, knet_h->dstsockfd); - _close_socketpair(knet_h, knet_h->hostsockfd); } static int _init_buffers(knet_handle_t knet_h) @@ -406,18 +398,6 @@ static int _init_epolls(knet_handle_t knet_h) goto exit_fail; } - memset(&ev, 0, sizeof(struct epoll_event)); - ev.events = EPOLLIN; - ev.data.fd = knet_h->hostsockfd[0]; - - if (epoll_ctl(knet_h->send_to_links_epollfd, - EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) { - savederrno = errno; - log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s", - strerror(savederrno)); - goto exit_fail; - } - memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = knet_h->dstsockfd[0]; @@ -453,7 +433,6 @@ static void _close_epolls(knet_handle_t knet_h) } } - epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev); epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev); close(knet_h->send_to_links_epollfd); close(knet_h->recv_from_links_epollfd); diff --git a/libknet/host.c b/libknet/host.c index cf35179..e9e86eb 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -527,27 +527,6 @@ int knet_host_enable_status_change_notify(knet_handle_t knet_h, return 0; } -int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen) -{ - ssize_t ret = 0; - - if (knet_h->fini_in_progress) { - return 0; - } - - ret = sendto(knet_h->hostsockfd[1], data, datalen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0); - if (ret < 0) { - log_debug(knet_h, KNET_SUB_HOST, "Unable to write data to hostpipe. Error: %s", strerror(errno)); - return -1; - } - if ((size_t)ret != datalen) { - log_debug(knet_h, KNET_SUB_HOST, "Unable to write all data to hostpipe. Expected: %zu, Written: %zd.", datalen, ret); - return -1; - } - - return 0; -} - static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num) { int i; diff --git a/libknet/host.h b/libknet/host.h index bd2e8a7..3312c8b 100644 --- a/libknet/host.h +++ b/libknet/host.h @@ -15,7 +15,6 @@ int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf); void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf); -int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen); int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host); int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host); diff --git a/libknet/internals.h b/libknet/internals.h index cda58a6..22f654b 100644 --- a/libknet/internals.h +++ b/libknet/internals.h @@ -37,8 +37,6 @@ #define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX + 1 -#define KNET_INTERNAL_DATA_CHANNEL KNET_DATAFD_MAX - /* * Size of threads stack. Value is choosen by experimenting, how much is needed * to sucesfully finish test suite, and at the time of writing patch it was @@ -172,7 +170,6 @@ struct knet_handle { struct knet_sock sockfd[KNET_DATAFD_MAX + 1]; int logfd; uint8_t log_levels[KNET_MAX_SUBSYSTEMS]; - int hostsockfd[2]; int dstsockfd[2]; int send_to_links_epollfd; int recv_from_links_epollfd; diff --git a/libknet/links_acl_ip.c b/libknet/links_acl_ip.c index 0f269ef..0f9c0f7 100644 --- a/libknet/links_acl_ip.c +++ b/libknet/links_acl_ip.c @@ -16,6 +16,7 @@ #include #include "internals.h" +#include "netutils.h" #include "logging.h" #include "transports.h" #include "links_acl.h" @@ -29,15 +30,6 @@ struct ip_acl_match_entry { struct ip_acl_match_entry *next; }; -/* - * s6_addr32 is not defined in BSD userland, only kernel. - * definition is the same as linux and it works fine for - * what we need. - */ -#ifndef s6_addr32 -#define s6_addr32 __u6_addr.__u6_addr32 -#endif - /* * IPv4 See if the address we have matches the current match entry */ diff --git a/libknet/netutils.c b/libknet/netutils.c index 754e198..25bba33 100644 --- a/libknet/netutils.c +++ b/libknet/netutils.c @@ -19,65 +19,22 @@ #include "internals.h" #include "netutils.h" -static int is_v4_mapped(const struct sockaddr_storage *ss, socklen_t salen) +int cmpaddr(const struct sockaddr_storage *ss1, const struct sockaddr_storage *ss2) { - char map[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff }; - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *) ss; - return memcmp(&addr6->sin6_addr, map, 12); -} - -int cmpaddr(const struct sockaddr_storage *ss1, socklen_t sslen1, - const struct sockaddr_storage *ss2, socklen_t sslen2) -{ - int ss1_offset = 0, ss2_offset = 0; struct sockaddr_in6 *ss1_addr6 = (struct sockaddr_in6 *)ss1; struct sockaddr_in6 *ss2_addr6 = (struct sockaddr_in6 *)ss2; struct sockaddr_in *ss1_addr = (struct sockaddr_in *)ss1; struct sockaddr_in *ss2_addr = (struct sockaddr_in *)ss2; - char *addr1, *addr2; - if (ss1->ss_family == ss2->ss_family) { - return memcmp(ss1, ss2, sslen1); + if (ss1->ss_family != ss2->ss_family) { + return -1; } if (ss1->ss_family == AF_INET6) { - if (is_v4_mapped(ss1, sslen1)) { - return 1; - } - addr1 = (char *)&ss1_addr6->sin6_addr; - ss1_offset = 12; - } else { - addr1 = (char *)&ss1_addr->sin_addr; + return memcmp(&ss1_addr6->sin6_addr.s6_addr32, &ss2_addr6->sin6_addr.s6_addr32, sizeof(struct in6_addr)); } - if (ss2->ss_family == AF_INET6) { - if (is_v4_mapped(ss2, sslen2)) { - return 1; - } - addr2 = (char *)&ss2_addr6->sin6_addr; - ss2_offset = 12; - } else { - addr2 = (char *)&ss2_addr->sin_addr; - } - - return memcmp(addr1+ss1_offset, addr2+ss2_offset, 4); -} - -int cpyaddrport(struct sockaddr_storage *dst, const struct sockaddr_storage *src) -{ - struct sockaddr_in6 *dst_addr6 = (struct sockaddr_in6 *)dst; - struct sockaddr_in6 *src_addr6 = (struct sockaddr_in6 *)src; - - memset(dst, 0, sizeof(struct sockaddr_storage)); - - if (src->ss_family == AF_INET6) { - dst->ss_family = src->ss_family; - memmove(&dst_addr6->sin6_port, &src_addr6->sin6_port, sizeof(in_port_t)); - memmove(&dst_addr6->sin6_addr, &src_addr6->sin6_addr, sizeof(struct in6_addr)); - } else { - memmove(dst, src, sizeof(struct sockaddr_in)); - } - return 0; + return memcmp(&ss1_addr->sin_addr.s_addr, &ss2_addr->sin_addr.s_addr, sizeof(struct in_addr)); } socklen_t sockaddr_len(const struct sockaddr_storage *ss) diff --git a/libknet/netutils.h b/libknet/netutils.h index ee10b2b..6395398 100644 --- a/libknet/netutils.h +++ b/libknet/netutils.h @@ -11,9 +11,19 @@ #define __KNET_NETUTILS_H__ #include +#include -int cmpaddr(const struct sockaddr_storage *ss1, socklen_t sslen1, const struct sockaddr_storage *ss2, socklen_t sslen2); -int cpyaddrport(struct sockaddr_storage *dst, const struct sockaddr_storage *src); +/* + * s6_addr32 is not defined in BSD userland, only kernel. + * definition is the same as linux and it works fine for + * what we need. + */ + +#ifndef s6_addr32 +#define s6_addr32 __u6_addr.__u6_addr32 +#endif + +int cmpaddr(const struct sockaddr_storage *ss1, const struct sockaddr_storage *ss2); socklen_t sockaddr_len(const struct sockaddr_storage *ss); #endif diff --git a/libknet/onwire.h b/libknet/onwire.h index e00ad91..1040ea0 100644 --- a/libknet/onwire.h +++ b/libknet/onwire.h @@ -19,78 +19,6 @@ #include "libknet.h" -#if 0 - -/* - * for future protocol extension (re-switching table calculation) - */ - -struct knet_hinfo_link { - uint8_t khl_link_id; - uint8_t khl_link_dynamic; - uint8_t khl_link_priority; - uint64_t khl_link_latency; - char khl_link_dst_ipaddr[KNET_MAX_HOST_LEN]; - char khl_link_dst_port[KNET_MAX_PORT_LEN]; -} __attribute__((packed)); - -struct knet_hinfo_link_table { - knet_node_id_t khlt_node_id; - uint8_t khlt_local; /* we have this node connected locally */ - struct knet_hinfo_link khlt_link[KNET_MAX_LINK]; /* info we send about each link in the node */ -} __attribute__((packed)); - -struct link_table { - knet_node_id_t khdt_host_entries; - uint8_t khdt_host_maps[0]; /* array of knet_hinfo_link_table[khdt_host_entries] */ -} __attribute__((packed)); -#endif - -#define KNET_HOSTINFO_LINK_STATUS_DOWN 0 -#define KNET_HOSTINFO_LINK_STATUS_UP 1 - -struct knet_hostinfo_payload_link_status { - uint8_t khip_link_status_link_id; /* link id */ - uint8_t khip_link_status_status; /* up/down status */ -} __attribute__((packed)); - -/* - * union to reference possible individual payloads - */ - -union knet_hostinfo_payload { - struct knet_hostinfo_payload_link_status knet_hostinfo_payload_link_status; -} __attribute__((packed)); - -/* - * due to the nature of knet_hostinfo, we are currently - * sending those data as part of knet_header_payload_data.khp_data_userdata - * and avoid a union that increses knet_header_payload_data size - * unnecessarely. - * This might change later on depending on how we implement - * host info exchange - */ - -#define KNET_HOSTINFO_TYPE_LINK_UP_DOWN 0 // UNUSED -#define KNET_HOSTINFO_TYPE_LINK_TABLE 1 // NOT IMPLEMENTED - -#define KNET_HOSTINFO_UCAST 0 /* send info to a specific host */ -#define KNET_HOSTINFO_BCAST 1 /* send info to all known / connected hosts */ - -struct knet_hostinfo { - uint8_t khi_type; /* type of hostinfo we are sending */ - uint8_t khi_bcast; /* hostinfo destination bcast/ucast */ - knet_node_id_t khi_dst_node_id;/* used only if in ucast mode */ - union knet_hostinfo_payload khi_payload; -} __attribute__((packed)); - -#define KNET_HOSTINFO_ALL_SIZE sizeof(struct knet_hostinfo) -#define KNET_HOSTINFO_SIZE (KNET_HOSTINFO_ALL_SIZE - sizeof(union knet_hostinfo_payload)) -#define KNET_HOSTINFO_LINK_STATUS_SIZE (KNET_HOSTINFO_SIZE + sizeof(struct knet_hostinfo_payload_link_status)) - -#define khip_link_status_status khi_payload.knet_hostinfo_payload_link_status.khip_link_status_status -#define khip_link_status_link_id khi_payload.knet_hostinfo_payload_link_status.khip_link_status_link_id - /* * typedef uint64_t seq_num_t; * #define SEQ_MAX UINT64_MAX @@ -152,7 +80,6 @@ union knet_header_payload { #define KNET_HEADER_VERSION 0x01 /* we currently support only one version */ #define KNET_HEADER_TYPE_DATA 0x00 /* pure data packet */ -#define KNET_HEADER_TYPE_HOST_INFO 0x01 /* host status information pckt */ #define KNET_HEADER_TYPE_PMSK 0x80 /* packet mask */ #define KNET_HEADER_TYPE_PING 0x81 /* heartbeat */ diff --git a/libknet/tests/pckt_test.c b/libknet/tests/pckt_test.c index 9522c18..30798f3 100644 --- a/libknet/tests/pckt_test.c +++ b/libknet/tests/pckt_test.c @@ -18,10 +18,6 @@ int main(void) printf("KNET_HEADER_PING_SIZE: %zu (%zu)\n", KNET_HEADER_PING_SIZE, sizeof(struct knet_header_payload_ping)); printf("KNET_HEADER_PMTUD_SIZE: %zu (%zu)\n", KNET_HEADER_PMTUD_SIZE, sizeof(struct knet_header_payload_pmtud)); printf("KNET_HEADER_DATA_SIZE: %zu (%zu)\n", KNET_HEADER_DATA_SIZE, sizeof(struct knet_header_payload_data)); - printf("\n"); - printf("KNET_HOSTINFO_ALL_SIZE: %zu\n", KNET_HOSTINFO_ALL_SIZE); - printf("KNET_HOSTINFO_SIZE: %zu\n", KNET_HOSTINFO_SIZE); - printf("KNET_HOSTINFO_LINK_STATUS_SIZE: %zu (%zu)\n", KNET_HOSTINFO_LINK_STATUS_SIZE, sizeof(struct knet_hostinfo_payload_link_status)); return 0; } diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index 85fc996..ac2f46f 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -241,13 +241,11 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base; ssize_t len = msg->msg_len; - struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[1]; int8_t channel; - struct sockaddr_storage pckt_src; seq_num_t recv_seq_num; int wipe_bufs = 0; - int try_decrypt = 0, decrypted = 0, i; + int try_decrypt = 0, decrypted = 0, i, found_link = 0; for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) { if (knet_h->crypto_instance[i]) { @@ -303,22 +301,16 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc return; } - src_link = src_host->link + - (inbuf->khp_ping_link % KNET_MAX_LINK); if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) { + /* be aware this works only for PING / PONG and PMTUd packets! */ + src_link = src_host->link + + (inbuf->khp_ping_link % KNET_MAX_LINK); if (src_link->dynamic == KNET_LINK_DYNIP) { - /* - * cpyaddrport will only copy address and port of the incoming - * packet and strip extra bits such as flow and scopeid - */ - cpyaddrport(&pckt_src, msg->msg_hdr.msg_name); - - if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), - &pckt_src, sockaddr_len(&pckt_src)) != 0) { + if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) { log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); - memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage)); - if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name), + memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage)); + if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); @@ -337,6 +329,18 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc */ transport_link_dyn_connect(knet_h, sockfd, src_link); } + } else { /* data packet */ + for (i = 0; i < KNET_MAX_LINK; i++) { + src_link = &src_host->link[i]; + if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) { + found_link = 1; + break; + } + } + if (!found_link) { + log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet."); + return; + } } stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); @@ -347,20 +351,44 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc } switch (inbuf->kh_type) { - case KNET_HEADER_TYPE_HOST_INFO: case KNET_HEADER_TYPE_DATA: + + /* data stats at the top for consistency with TX */ + src_link->status.stats.rx_data_packets++; + src_link->status.stats.rx_data_bytes += len; + + if (decrypted) { + stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); + if (stats_err < 0) { + pthread_mutex_unlock(&src_link->link_stats_mutex); + log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); + return; + } + /* Only update the crypto overhead for data packets. Mainly to be + consistent with TX */ + if (decrypt_time < knet_h->stats.rx_crypt_time_min) { + knet_h->stats.rx_crypt_time_min = decrypt_time; + } + if (decrypt_time > knet_h->stats.rx_crypt_time_max) { + knet_h->stats.rx_crypt_time_max = decrypt_time; + } + knet_h->stats.rx_crypt_time_ave = + (knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets + + decrypt_time) / (knet_h->stats.rx_crypt_packets+1); + knet_h->stats.rx_crypt_packets++; + pthread_mutex_unlock(&knet_h->handle_stats_mutex); + } + if (!src_host->status.reachable) { pthread_mutex_unlock(&src_link->link_stats_mutex); log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id); return; } + inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); channel = inbuf->khp_data_channel; src_host->got_data = 1; - src_link->status.stats.rx_data_packets++; - src_link->status.stats.rx_data_bytes += len; - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { pthread_mutex_unlock(&src_link->link_stats_mutex); if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { @@ -435,136 +463,91 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc pthread_mutex_unlock(&knet_h->handle_stats_mutex); } - if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { - if (decrypted) { - stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); - if (stats_err < 0) { - pthread_mutex_unlock(&src_link->link_stats_mutex); - log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); - return; - } - /* Only update the crypto overhead for data packets. Mainly to be - consistent with TX */ - if (decrypt_time < knet_h->stats.rx_crypt_time_min) { - knet_h->stats.rx_crypt_time_min = decrypt_time; - } - if (decrypt_time > knet_h->stats.rx_crypt_time_max) { - knet_h->stats.rx_crypt_time_max = decrypt_time; - } - knet_h->stats.rx_crypt_time_ave = - (knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets + - decrypt_time) / (knet_h->stats.rx_crypt_packets+1); - knet_h->stats.rx_crypt_packets++; - pthread_mutex_unlock(&knet_h->handle_stats_mutex); + if (knet_h->enabled != 1) /* data forward is disabled */ + break; + + if (knet_h->dst_host_filter_fn) { + size_t host_idx; + int found = 0; + + bcast = knet_h->dst_host_filter_fn( + knet_h->dst_host_filter_fn_private_data, + (const unsigned char *)inbuf->khp_data_userdata, + len - KNET_HEADER_DATA_SIZE, + KNET_NOTIFY_RX, + knet_h->host_id, + inbuf->kh_node, + &channel, + dst_host_ids, + &dst_host_ids_entries); + if (bcast < 0) { + pthread_mutex_unlock(&src_link->link_stats_mutex); + log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); + return; } - if (knet_h->enabled != 1) /* data forward is disabled */ - break; + if ((!bcast) && (!dst_host_ids_entries)) { + pthread_mutex_unlock(&src_link->link_stats_mutex); + log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); + return; + } - if (knet_h->dst_host_filter_fn) { - size_t host_idx; - int found = 0; - - bcast = knet_h->dst_host_filter_fn( - knet_h->dst_host_filter_fn_private_data, - (const unsigned char *)inbuf->khp_data_userdata, - len - KNET_HEADER_DATA_SIZE, - KNET_NOTIFY_RX, - knet_h->host_id, - inbuf->kh_node, - &channel, - dst_host_ids, - &dst_host_ids_entries); - if (bcast < 0) { + /* check if we are dst for this packet */ + if (!bcast) { + if (dst_host_ids_entries > KNET_MAX_HOST) { pthread_mutex_unlock(&src_link->link_stats_mutex); - log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); + log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations"); return; } - - if ((!bcast) && (!dst_host_ids_entries)) { - pthread_mutex_unlock(&src_link->link_stats_mutex); - log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); - return; + for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { + if (dst_host_ids[host_idx] == knet_h->host_id) { + found = 1; + break; + } } - - /* check if we are dst for this packet */ - if (!bcast) { - if (dst_host_ids_entries > KNET_MAX_HOST) { - pthread_mutex_unlock(&src_link->link_stats_mutex); - log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations"); - return; - } - for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { - if (dst_host_ids[host_idx] == knet_h->host_id) { - found = 1; - break; - } - } - if (!found) { - pthread_mutex_unlock(&src_link->link_stats_mutex); - log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); - return; - } + if (!found) { + pthread_mutex_unlock(&src_link->link_stats_mutex); + log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); + return; } } } - if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { - if (!knet_h->sockfd[channel].in_use) { - pthread_mutex_unlock(&src_link->link_stats_mutex); - log_debug(knet_h, KNET_SUB_RX, - "received packet for channel %d but there is no local sock connected", - channel); - return; - } + if (!knet_h->sockfd[channel].in_use) { + pthread_mutex_unlock(&src_link->link_stats_mutex); + log_debug(knet_h, KNET_SUB_RX, + "received packet for channel %d but there is no local sock connected", + channel); + return; + } - outlen = 0; - memset(iov_out, 0, sizeof(iov_out)); + outlen = 0; + memset(iov_out, 0, sizeof(iov_out)); retry: - iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen; - iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE); + iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen; + iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE); - outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); - if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) { - log_debug(knet_h, KNET_SUB_RX, - "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n", - iov_out[0].iov_len, outlen); - goto retry; - } + outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); + if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) { + log_debug(knet_h, KNET_SUB_RX, + "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n", + iov_out[0].iov_len, outlen); + goto retry; + } - if (outlen <= 0) { - knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, - knet_h->sockfd[channel].sockfd[0], - channel, - KNET_NOTIFY_RX, - outlen, - errno); - pthread_mutex_unlock(&src_link->link_stats_mutex); - return; - } - if ((size_t)outlen == iov_out[0].iov_len) { - _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); - } - } else { /* HOSTINFO */ - knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; - if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { - knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id); - } - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { - pthread_mutex_unlock(&src_link->link_stats_mutex); - return; - } + if (outlen <= 0) { + knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, + knet_h->sockfd[channel].sockfd[0], + channel, + KNET_NOTIFY_RX, + outlen, + errno); + pthread_mutex_unlock(&src_link->link_stats_mutex); + return; + } + if ((size_t)outlen == iov_out[0].iov_len) { _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); - switch(knet_hostinfo->khi_type) { - case KNET_HOSTINFO_TYPE_LINK_UP_DOWN: - break; - case KNET_HOSTINFO_TYPE_LINK_TABLE: - break; - default: - log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id); - break; - } } break; case KNET_HEADER_TYPE_PING: diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 615c426..2f69991 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -150,7 +150,6 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; - struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[PCKT_FRAG_MAX][2]; int iovcnt_out = 2; uint8_t frag_idx; @@ -172,8 +171,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan inbuf = knet_h->recv_from_sock_buf; - if ((knet_h->enabled != 1) && - (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ + if (knet_h->enabled != 1) { log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; @@ -259,15 +257,6 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan } } break; - case KNET_HEADER_TYPE_HOST_INFO: - knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; - if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { - bcast = 0; - dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; - dst_host_ids_entries_temp = 1; - knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); - } - break; default: log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); savederrno = ENOMSG; @@ -624,25 +613,19 @@ static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int docallback = 1; memset(&ev, 0, sizeof(struct epoll_event)); - if (channel != KNET_INTERNAL_DATA_CHANNEL) { - if (epoll_ctl(knet_h->send_to_links_epollfd, - EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { - log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", - knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); - } else { - knet_h->sockfd[channel].has_error = 1; - } + if (epoll_ctl(knet_h->send_to_links_epollfd, + EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { + log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", + knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); + } else { + knet_h->sockfd[channel].has_error = 1; } - /* - * TODO: add error handling for KNET_INTERNAL_DATA_CHANNEL - * once we add support for internal knet communication - */ } else { knet_h->recv_from_sock_buf->kh_type = type; _parse_recv_from_sock(knet_h, inlen, channel, 0); } - if ((docallback) && (channel != KNET_INTERNAL_DATA_CHANNEL)) { + if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, @@ -735,22 +718,17 @@ void *_handle_send_to_links_thread(void *data) } for (i = 0; i < nev; i++) { - if (events[i].data.fd == knet_h->hostsockfd[0]) { - type = KNET_HEADER_TYPE_HOST_INFO; - channel = KNET_INTERNAL_DATA_CHANNEL; - } else { - type = KNET_HEADER_TYPE_DATA; - for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { - if ((knet_h->sockfd[channel].in_use) && - (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { - break; - } - } - if (channel >= KNET_DATAFD_MAX) { - log_debug(knet_h, KNET_SUB_TX, "No available channels"); - continue; /* channel not found */ + type = KNET_HEADER_TYPE_DATA; + for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { + if ((knet_h->sockfd[channel].in_use) && + (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { + break; } } + if (channel >= KNET_DATAFD_MAX) { + log_debug(knet_h, KNET_SUB_TX, "No available channels"); + continue; /* channel not found */ + } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c index 705b6ca..ecc40c2 100644 --- a/libknet/transport_sctp.c +++ b/libknet/transport_sctp.c @@ -871,7 +871,13 @@ exit_error: if ((i >= 0) && (i < MAX_ACCEPTED_SOCKS)) { info->accepted_socks[i] = -1; } - _set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL); + /* + * check the error to make coverity scan happy. + * _set_fd_tracker cannot fail at this stage + */ + if (_set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0){ + log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to update fdtracker for socket %d", new_fd); + } free(accept_info); if (new_fd >= 0) { close(new_fd); @@ -939,7 +945,13 @@ static void _handle_listen_sctp_errors(knet_handle_t knet_h) for (i=0; iaccepted_socks[i]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd); - _set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL); + /* + * check the error to make coverity scan happy. + * _set_fd_tracker cannot fail at this stage + */ + if (_set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) { + log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to update fdtracker for socket %d", sockfd); + } info->accepted_socks[i] = -1; free(accept_info); close(sockfd);