bgpd: implement buffered reads

* Move and modify all network input related code to bgp_io.c
* Add a real input buffer to `struct peer`
* Move connection initialization to its own thread.c task instead of
  piggybacking off of bgp_read()
* Tons of little fixups

Primary changes are in bgp_packet.[ch], bgp_io.[ch], bgp_fsm.[ch].
Changes made elsewhere are almost exclusively refactoring peer->ibuf to
peer->curr since peer->ibuf is now the true FIFO packet input buffer
while peer->curr represents the packet currently being processed by the
main pthread.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
This commit is contained in:
Quentin Young 2017-05-02 00:37:45 +00:00
parent 56257a44e4
commit 424ab01d0f
No known key found for this signature in database
GPG Key ID: DAF48E0F57E0834F
17 changed files with 871 additions and 570 deletions

View File

@ -1156,7 +1156,7 @@ static int bgp_attr_aspath(struct bgp_attr_parser_args *args)
* peer with AS4 => will get 4Byte ASnums
* otherwise, will get 16 Bit
*/
attr->aspath = aspath_parse(peer->ibuf, length,
attr->aspath = aspath_parse(peer->curr, length,
CHECK_FLAG(peer->cap, PEER_CAP_AS4_RCV));
/* In case of IBGP, length will be zero. */
@ -1230,7 +1230,7 @@ static int bgp_attr_as4_path(struct bgp_attr_parser_args *args,
struct attr *const attr = args->attr;
const bgp_size_t length = args->length;
*as4_path = aspath_parse(peer->ibuf, length, 1);
*as4_path = aspath_parse(peer->curr, length, 1);
/* In case of IBGP, length will be zero. */
if (!*as4_path) {
@ -1271,7 +1271,7 @@ static bgp_attr_parse_ret_t bgp_attr_nexthop(struct bgp_attr_parser_args *args)
logged locally (this is implemented somewhere else). The UPDATE
message
gets ignored in any of these cases. */
nexthop_n = stream_get_ipv4(peer->ibuf);
nexthop_n = stream_get_ipv4(peer->curr);
nexthop_h = ntohl(nexthop_n);
if ((IPV4_NET0(nexthop_h) || IPV4_NET127(nexthop_h)
|| IPV4_CLASS_DE(nexthop_h))
@ -1307,7 +1307,7 @@ static bgp_attr_parse_ret_t bgp_attr_med(struct bgp_attr_parser_args *args)
args->total);
}
attr->med = stream_getl(peer->ibuf);
attr->med = stream_getl(peer->curr);
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_MULTI_EXIT_DISC);
@ -1333,11 +1333,11 @@ bgp_attr_local_pref(struct bgp_attr_parser_args *args)
external peer, then this attribute MUST be ignored by the
receiving speaker. */
if (peer->sort == BGP_PEER_EBGP) {
stream_forward_getp(peer->ibuf, length);
stream_forward_getp(peer->curr, length);
return BGP_ATTR_PARSE_PROCEED;
}
attr->local_pref = stream_getl(peer->ibuf);
attr->local_pref = stream_getl(peer->curr);
/* Set the local-pref flag. */
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_LOCAL_PREF);
@ -1386,10 +1386,10 @@ static int bgp_attr_aggregator(struct bgp_attr_parser_args *args)
}
if (CHECK_FLAG(peer->cap, PEER_CAP_AS4_RCV))
attr->aggregator_as = stream_getl(peer->ibuf);
attr->aggregator_as = stream_getl(peer->curr);
else
attr->aggregator_as = stream_getw(peer->ibuf);
attr->aggregator_addr.s_addr = stream_get_ipv4(peer->ibuf);
attr->aggregator_as = stream_getw(peer->curr);
attr->aggregator_addr.s_addr = stream_get_ipv4(peer->curr);
/* Set atomic aggregate flag. */
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_AGGREGATOR);
@ -1413,8 +1413,8 @@ bgp_attr_as4_aggregator(struct bgp_attr_parser_args *args,
0);
}
*as4_aggregator_as = stream_getl(peer->ibuf);
as4_aggregator_addr->s_addr = stream_get_ipv4(peer->ibuf);
*as4_aggregator_as = stream_getl(peer->curr);
as4_aggregator_addr->s_addr = stream_get_ipv4(peer->curr);
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_AS4_AGGREGATOR);
@ -1540,10 +1540,10 @@ bgp_attr_community(struct bgp_attr_parser_args *args)
}
attr->community =
community_parse((u_int32_t *)stream_pnt(peer->ibuf), length);
community_parse((u_int32_t *)stream_pnt(peer->curr), length);
/* XXX: fix community_parse to use stream API and remove this */
stream_forward_getp(peer->ibuf, length);
stream_forward_getp(peer->curr, length);
if (!attr->community)
return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@ -1570,7 +1570,7 @@ bgp_attr_originator_id(struct bgp_attr_parser_args *args)
args->total);
}
attr->originator_id.s_addr = stream_get_ipv4(peer->ibuf);
attr->originator_id.s_addr = stream_get_ipv4(peer->curr);
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_ORIGINATOR_ID);
@ -1594,10 +1594,10 @@ bgp_attr_cluster_list(struct bgp_attr_parser_args *args)
}
attr->cluster =
cluster_parse((struct in_addr *)stream_pnt(peer->ibuf), length);
cluster_parse((struct in_addr *)stream_pnt(peer->curr), length);
/* XXX: Fix cluster_parse to use stream API and then remove this */
stream_forward_getp(peer->ibuf, length);
stream_forward_getp(peer->curr, length);
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_CLUSTER_LIST);
@ -1778,7 +1778,7 @@ int bgp_mp_unreach_parse(struct bgp_attr_parser_args *args,
struct attr *const attr = args->attr;
const bgp_size_t length = args->length;
s = peer->ibuf;
s = peer->curr;
#define BGP_MP_UNREACH_MIN_SIZE 3
if ((length > STREAM_READABLE(s)) || (length < BGP_MP_UNREACH_MIN_SIZE))
@ -1832,9 +1832,9 @@ bgp_attr_large_community(struct bgp_attr_parser_args *args)
}
attr->lcommunity =
lcommunity_parse((u_int8_t *)stream_pnt(peer->ibuf), length);
lcommunity_parse((u_int8_t *)stream_pnt(peer->curr), length);
/* XXX: fix ecommunity_parse to use stream API */
stream_forward_getp(peer->ibuf, length);
stream_forward_getp(peer->curr, length);
if (!attr->lcommunity)
return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@ -1861,9 +1861,9 @@ bgp_attr_ext_communities(struct bgp_attr_parser_args *args)
}
attr->ecommunity =
ecommunity_parse((u_int8_t *)stream_pnt(peer->ibuf), length);
ecommunity_parse((u_int8_t *)stream_pnt(peer->curr), length);
/* XXX: fix ecommunity_parse to use stream API */
stream_forward_getp(peer->ibuf, length);
stream_forward_getp(peer->curr, length);
if (!attr->ecommunity)
return bgp_attr_malformed(args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@ -1957,7 +1957,7 @@ static int bgp_attr_encap(uint8_t type, struct peer *peer, /* IN */
+ sublength);
tlv->type = subtype;
tlv->length = sublength;
stream_get(tlv->value, peer->ibuf, sublength);
stream_get(tlv->value, peer->curr, sublength);
length -= sublength;
/* attach tlv to encap chain */
@ -2025,8 +2025,8 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
attr->flag |= ATTR_FLAG_BIT(BGP_ATTR_PREFIX_SID);
type = stream_getc(peer->ibuf);
length = stream_getw(peer->ibuf);
type = stream_getc(peer->curr);
length = stream_getw(peer->curr);
if (type == BGP_PREFIX_SID_LABEL_INDEX) {
if (length != BGP_PREFIX_SID_LABEL_INDEX_LENGTH) {
@ -2039,11 +2039,11 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
}
/* Ignore flags and reserved */
stream_getc(peer->ibuf);
stream_getw(peer->ibuf);
stream_getc(peer->curr);
stream_getw(peer->curr);
/* Fetch the label index and see if it is valid. */
label_index = stream_getl(peer->ibuf);
label_index = stream_getl(peer->curr);
if (label_index == BGP_INVALID_LABEL_INDEX)
return bgp_attr_malformed(
args, BGP_NOTIFY_UPDATE_OPT_ATTR_ERR,
@ -2074,16 +2074,16 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
}
/* Ignore reserved */
stream_getc(peer->ibuf);
stream_getw(peer->ibuf);
stream_getc(peer->curr);
stream_getw(peer->curr);
stream_get(&ipv6_sid, peer->ibuf, 16);
stream_get(&ipv6_sid, peer->curr, 16);
}
/* Placeholder code for the Originator SRGB type */
else if (type == BGP_PREFIX_SID_ORIGINATOR_SRGB) {
/* Ignore flags */
stream_getw(peer->ibuf);
stream_getw(peer->curr);
length -= 2;
@ -2099,8 +2099,8 @@ bgp_attr_prefix_sid(struct bgp_attr_parser_args *args,
srgb_count = length / BGP_PREFIX_SID_ORIGINATOR_SRGB_LENGTH;
for (int i = 0; i < srgb_count; i++) {
stream_get(&srgb_base, peer->ibuf, 3);
stream_get(&srgb_range, peer->ibuf, 3);
stream_get(&srgb_base, peer->curr, 3);
stream_get(&srgb_range, peer->curr, 3);
}
}
@ -2125,7 +2125,7 @@ static bgp_attr_parse_ret_t bgp_attr_unknown(struct bgp_attr_parser_args *args)
peer->host, type, length);
/* Forward read pointer of input stream. */
stream_forward_getp(peer->ibuf, length);
stream_forward_getp(peer->curr, length);
/* If any of the mandatory well-known attributes are not recognized,
then the Error Subcode is set to Unrecognized Well-known

View File

@ -126,35 +126,61 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
from_peer->host, from_peer, from_peer->fd, peer,
peer->fd);
peer_writes_off(peer);
BGP_READ_OFF(peer->t_read);
peer_writes_off(from_peer);
BGP_READ_OFF(from_peer->t_read);
bgp_writes_off(peer);
bgp_reads_off(peer);
bgp_writes_off(from_peer);
bgp_reads_off(from_peer);
BGP_TIMER_OFF(peer->t_routeadv);
BGP_TIMER_OFF(peer->t_connect);
BGP_TIMER_OFF(peer->t_connect_check);
BGP_TIMER_OFF(from_peer->t_routeadv);
fd = peer->fd;
peer->fd = from_peer->fd;
from_peer->fd = fd;
stream_reset(peer->ibuf);
BGP_TIMER_OFF(from_peer->t_connect);
BGP_TIMER_OFF(from_peer->t_connect_check);
// At this point in time, it is possible that there are packets pending
// on
// from_peer->obuf. These need to be transferred to the new peer struct.
pthread_mutex_lock(&peer->obuf_mtx);
pthread_mutex_lock(&from_peer->obuf_mtx);
// various buffers. Those need to be transferred or dropped, otherwise
// we'll
// get spurious failures during session establishment.
pthread_mutex_lock(&peer->io_mtx);
pthread_mutex_lock(&from_peer->io_mtx);
{
// wipe new peer's packet queue
stream_fifo_clean(peer->obuf);
fd = peer->fd;
peer->fd = from_peer->fd;
from_peer->fd = fd;
// copy each packet from old peer's queue to new peer's queue
stream_fifo_clean(peer->ibuf);
stream_fifo_clean(peer->obuf);
stream_reset(peer->ibuf_work);
// this should never happen, since bgp_process_packet() is the
// only task
// that sets and unsets the current packet and it runs in our
// pthread.
if (peer->curr) {
zlog_err(
"[%s] Dropping pending packet on connection transfer:",
peer->host);
u_int16_t type = stream_getc_from(peer->curr,
BGP_MARKER_SIZE + 2);
bgp_dump_packet(peer, type, peer->curr);
stream_free(peer->curr);
peer->curr = NULL;
}
// copy each packet from old peer's output queue to new peer
while (from_peer->obuf->head)
stream_fifo_push(peer->obuf,
stream_fifo_pop(from_peer->obuf));
// copy each packet from old peer's input queue to new peer
while (from_peer->ibuf->head)
stream_fifo_push(peer->ibuf,
stream_fifo_pop(from_peer->ibuf));
}
pthread_mutex_unlock(&from_peer->obuf_mtx);
pthread_mutex_unlock(&peer->obuf_mtx);
pthread_mutex_unlock(&from_peer->io_mtx);
pthread_mutex_unlock(&peer->io_mtx);
peer->as = from_peer->as;
peer->v_holdtime = from_peer->v_holdtime;
@ -232,8 +258,8 @@ static struct peer *peer_xfer_conn(struct peer *from_peer)
}
}
BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
peer_writes_on(peer);
bgp_reads_on(peer);
bgp_writes_on(peer);
if (from_peer)
peer_xfer_stats(peer, from_peer);
@ -381,6 +407,10 @@ static int bgp_connect_timer(struct thread *thread)
int ret;
peer = THREAD_ARG(thread);
assert(!peer->t_write);
assert(!peer->t_read);
peer->t_connect = NULL;
if (bgp_debug_neighbor_events(peer))
@ -429,6 +459,9 @@ int bgp_routeadv_timer(struct thread *thread)
peer->synctime = bgp_clock();
thread_add_background(bm->master, bgp_generate_updgrp_packets, peer, 0,
&peer->t_generate_updgrp_packets);
/* MRAI timer will be started again when FIFO is built, no need to
* do it here.
*/
@ -634,6 +667,9 @@ void bgp_adjust_routeadv(struct peer *peer)
BGP_TIMER_OFF(peer->t_routeadv);
peer->synctime = bgp_clock();
thread_add_background(bm->master, bgp_generate_updgrp_packets,
peer, 0,
&peer->t_generate_updgrp_packets);
return;
}
@ -1028,33 +1064,40 @@ int bgp_stop(struct peer *peer)
bgp_bfd_deregister_peer(peer);
}
/* Stop read and write threads when exists. */
BGP_READ_OFF(peer->t_read);
peer_writes_off(peer);
/* stop keepalives */
peer_keepalives_off(peer);
/* Stop read and write threads. */
bgp_writes_off(peer);
bgp_reads_off(peer);
THREAD_OFF(peer->t_connect_check);
/* Stop all timers. */
BGP_TIMER_OFF(peer->t_start);
BGP_TIMER_OFF(peer->t_connect);
BGP_TIMER_OFF(peer->t_holdtime);
peer_keepalives_off(peer);
BGP_TIMER_OFF(peer->t_routeadv);
BGP_TIMER_OFF(peer->t_generate_updgrp_packets);
/* Stream reset. */
peer->packet_size = 0;
/* Clear input and output buffer. */
if (peer->ibuf)
stream_reset(peer->ibuf);
if (peer->work)
stream_reset(peer->work);
pthread_mutex_lock(&peer->obuf_mtx);
pthread_mutex_lock(&peer->io_mtx);
{
if (peer->ibuf)
stream_fifo_clean(peer->ibuf);
if (peer->obuf)
stream_fifo_clean(peer->obuf);
if (peer->ibuf_work)
stream_reset(peer->ibuf_work);
if (peer->obuf_work)
stream_reset(peer->obuf_work);
if (peer->curr) {
stream_free(peer->curr);
peer->curr = NULL;
}
}
pthread_mutex_unlock(&peer->obuf_mtx);
pthread_mutex_unlock(&peer->io_mtx);
/* Close of file descriptor. */
if (peer->fd >= 0) {
@ -1177,10 +1220,12 @@ static int bgp_connect_check(struct thread *thread)
struct peer *peer;
peer = THREAD_ARG(thread);
assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON));
assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON));
assert(!peer->t_read);
assert(!peer->t_write);
/* This value needs to be unset in order for bgp_read() to be scheduled
*/
BGP_READ_OFF(peer->t_read);
peer->t_connect_check = NULL;
/* Check file descriptor. */
slen = sizeof(status);
@ -1218,17 +1263,16 @@ static int bgp_connect_success(struct peer *peer)
return -1;
}
peer_writes_on(peer);
if (bgp_getsockname(peer) < 0) {
zlog_err("%s: bgp_getsockname(): failed for peer %s, fd %d",
__FUNCTION__, peer->host, peer->fd);
bgp_notify_send(peer, BGP_NOTIFY_FSM_ERR,
0); /* internal error */
bgp_writes_on(peer);
return -1;
}
BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
bgp_reads_on(peer);
if (bgp_debug_neighbor_events(peer)) {
char buf1[SU_ADDRSTRLEN];
@ -1332,6 +1376,10 @@ int bgp_start(struct peer *peer)
#endif
}
assert(!peer->t_write);
assert(!peer->t_read);
assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON));
assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON));
status = bgp_connect(peer);
switch (status) {
@ -1362,7 +1410,8 @@ int bgp_start(struct peer *peer)
// when the socket becomes ready (or fails to connect),
// bgp_connect_check
// will be called.
BGP_READ_ON(peer->t_read, bgp_connect_check, peer->fd);
thread_add_read(bm->master, bgp_connect_check, peer, peer->fd,
&peer->t_connect_check);
break;
}
return 0;

View File

@ -23,24 +23,6 @@
#define _QUAGGA_BGP_FSM_H
/* Macro for BGP read, write and timer thread. */
#define BGP_READ_ON(T, F, V) \
do { \
if ((peer->status != Deleted)) \
thread_add_read(bm->master, (F), peer, (V), &(T)); \
} while (0)
#define BGP_READ_OFF(T) \
do { \
if (T) \
THREAD_READ_OFF(T); \
} while (0)
#define BGP_WRITE_OFF(T) \
do { \
if (T) \
THREAD_WRITE_OFF(T); \
} while (0)
#define BGP_TIMER_ON(T, F, V) \
do { \
if ((peer->status != Deleted)) \

View File

@ -1,8 +1,10 @@
/*
BGP I/O.
Implements a consumer thread to flush packets destined for remote peers.
Implements packet I/O in a consumer pthread.
--------------------------------------------
Copyright (C) 2017 Cumulus Networks
Quentin Young
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -31,7 +33,7 @@
#include "log.h"
#include "monotime.h"
#include "network.h"
#include "frr_pthread.h"
#include "pqueue.h"
#include "bgpd/bgpd.h"
#include "bgpd/bgp_io.h"
@ -39,172 +41,278 @@
#include "bgpd/bgp_packet.h"
#include "bgpd/bgp_fsm.h"
static int bgp_write(struct peer *);
static void peer_process_writes(struct hash_backet *, void *);
/* forward declarations */
static uint16_t bgp_write(struct peer *);
static uint16_t bgp_read(struct peer *);
static int bgp_process_writes(struct thread *);
static int bgp_process_reads(struct thread *);
static bool validate_header(struct peer *);
bool bgp_packet_writes_thread_run = false;
/* generic i/o status codes */
#define BGP_IO_TRANS_ERR (1 << 1) // EAGAIN or similar occurred
#define BGP_IO_FATAL_ERR (1 << 2) // some kind of fatal TCP error
/* Hash table of peers to operate on, associated synchronization primitives and
* hash table callbacks.
/* bgp_read() status codes */
#define BGP_IO_READ_HEADER (1 << 3) // when read a full packet header
#define BGP_IO_READ_FULLPACKET (1 << 4) // read a full packet
/* Start and stop routines for I/O pthread + control variables
* ------------------------------------------------------------------------ */
static struct hash *peerhash;
/* Mutex to protect hash table */
static pthread_mutex_t *peerhash_mtx;
/* Condition variable used to notify the write thread that there is work to do
*/
static pthread_cond_t *write_cond;
bool bgp_packet_write_thread_run = false;
pthread_mutex_t *work_mtx;
static unsigned int peer_hash_key_make(void *p)
static struct list *read_cancel;
static struct list *write_cancel;
void bgp_io_init()
{
struct peer *peer = p;
return sockunion_hash(&peer->su);
work_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t));
pthread_mutex_init(work_mtx, NULL);
read_cancel = list_new();
write_cancel = list_new();
}
static int peer_hash_cmp(const void *p1, const void *p2)
void *bgp_io_start(void *arg)
{
const struct peer *peer1 = p1;
const struct peer *peer2 = p2;
return (sockunion_same(&peer1->su, &peer2->su)
&& CHECK_FLAG(peer1->flags, PEER_FLAG_CONFIG_NODE)
== CHECK_FLAG(peer2->flags, PEER_FLAG_CONFIG_NODE));
}
/* ------------------------------------------------------------------------ */
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
void peer_writes_init(void)
{
peerhash_mtx = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_mutex_t));
write_cond = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_cond_t));
// we definitely don't want to handle signals
fpt->master->handle_signals = false;
// initialize mutex
pthread_mutex_init(peerhash_mtx, NULL);
bgp_packet_write_thread_run = true;
struct thread task;
// use monotonic clock with condition variable
pthread_condattr_t attrs;
pthread_condattr_init(&attrs);
pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
pthread_cond_init(write_cond, &attrs);
pthread_condattr_destroy(&attrs);
while (bgp_packet_write_thread_run) {
if (thread_fetch(fpt->master, &task)) {
pthread_mutex_lock(work_mtx);
{
bool cancel = false;
struct peer *peer = THREAD_ARG(&task);
if ((task.func == bgp_process_reads
&& listnode_lookup(read_cancel, peer))
|| (task.func == bgp_process_writes
&& listnode_lookup(write_cancel, peer)))
cancel = true;
// initialize peerhash
peerhash = hash_create_size(2048, peer_hash_key_make, peer_hash_cmp);
}
list_delete_all_node(write_cancel);
list_delete_all_node(read_cancel);
static void peer_writes_finish(void *arg)
{
bgp_packet_writes_thread_run = false;
if (peerhash)
hash_free(peerhash);
peerhash = NULL;
pthread_mutex_unlock(peerhash_mtx);
pthread_mutex_destroy(peerhash_mtx);
pthread_cond_destroy(write_cond);
XFREE(MTYPE_PTHREAD, peerhash_mtx);
XFREE(MTYPE_PTHREAD, write_cond);
}
void *peer_writes_start(void *arg)
{
struct timeval currtime = {0, 0};
struct timeval sleeptime = {0, 500};
struct timespec next_update = {0, 0};
pthread_mutex_lock(peerhash_mtx);
// register cleanup handler
pthread_cleanup_push(&peer_writes_finish, NULL);
bgp_packet_writes_thread_run = true;
while (bgp_packet_writes_thread_run) {
// wait around until next update time
if (peerhash->count > 0)
pthread_cond_timedwait(write_cond, peerhash_mtx,
&next_update);
else // wait around until we have some peers
while (peerhash->count == 0
&& bgp_packet_writes_thread_run)
pthread_cond_wait(write_cond, peerhash_mtx);
hash_iterate(peerhash, peer_process_writes, NULL);
monotime(&currtime);
timeradd(&currtime, &sleeptime, &currtime);
TIMEVAL_TO_TIMESPEC(&currtime, &next_update);
if (!cancel)
thread_call(&task);
}
pthread_mutex_unlock(work_mtx);
}
}
// clean up
pthread_cleanup_pop(1);
return NULL;
}
int peer_writes_stop(void **result)
int bgp_io_stop(void **result, struct frr_pthread *fpt)
{
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_WRITE);
bgp_packet_writes_thread_run = false;
peer_writes_wake();
fpt->master->spin = false;
bgp_packet_write_thread_run = false;
pthread_kill(fpt->thread, SIGINT);
pthread_join(fpt->thread, result);
pthread_mutex_unlock(work_mtx);
pthread_mutex_destroy(work_mtx);
list_delete(read_cancel);
list_delete(write_cancel);
XFREE(MTYPE_TMP, work_mtx);
return 0;
}
/* ------------------------------------------------------------------------ */
void peer_writes_on(struct peer *peer)
void bgp_writes_on(struct peer *peer)
{
if (peer->status == Deleted)
return;
assert(peer->status != Deleted);
assert(peer->obuf);
assert(peer->ibuf);
assert(peer->ibuf_work);
assert(!peer->t_connect_check);
assert(peer->fd);
pthread_mutex_lock(peerhash_mtx);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
pthread_mutex_lock(work_mtx);
{
if (!hash_lookup(peerhash, peer)) {
hash_get(peerhash, peer, hash_alloc_intern);
peer_lock(peer);
}
listnode_delete(write_cancel, peer);
thread_add_write(fpt->master, bgp_process_writes, peer,
peer->fd, &peer->t_write);
SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
}
pthread_mutex_unlock(peerhash_mtx);
peer_writes_wake();
pthread_mutex_unlock(work_mtx);
}
void peer_writes_off(struct peer *peer)
void bgp_writes_off(struct peer *peer)
{
pthread_mutex_lock(peerhash_mtx);
pthread_mutex_lock(work_mtx);
{
if (hash_release(peerhash, peer)) {
peer_unlock(peer);
fprintf(stderr, "Releasing %p\n", peer);
}
THREAD_OFF(peer->t_write);
THREAD_OFF(peer->t_generate_updgrp_packets);
listnode_add(write_cancel, peer);
// peer access by us after this point will result in pain
UNSET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
}
pthread_mutex_unlock(peerhash_mtx);
pthread_mutex_unlock(work_mtx);
/* upon return, i/o thread must not access the peer */
}
void peer_writes_wake()
void bgp_reads_on(struct peer *peer)
{
pthread_cond_signal(write_cond);
assert(peer->status != Deleted);
assert(peer->ibuf);
assert(peer->fd);
assert(peer->ibuf_work);
assert(stream_get_endp(peer->ibuf_work) == 0);
assert(peer->obuf);
assert(!peer->t_connect_check);
assert(peer->fd);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
pthread_mutex_lock(work_mtx);
{
listnode_delete(read_cancel, peer);
thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);
SET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
}
pthread_mutex_unlock(work_mtx);
}
void bgp_reads_off(struct peer *peer)
{
pthread_mutex_lock(work_mtx);
{
THREAD_OFF(peer->t_read);
THREAD_OFF(peer->t_process_packet);
listnode_add(read_cancel, peer);
// peer access by us after this point will result in pain
UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
}
pthread_mutex_unlock(work_mtx);
}
/**
* Callback for hash_iterate. Takes a hash bucket, unwraps it into a peer and
* synchronously calls bgp_write() on the peer.
* Called from PTHREAD_IO when select() or poll() determines that the file
* descriptor is ready to be written to.
*/
static void peer_process_writes(struct hash_backet *hb, void *arg)
static int bgp_process_writes(struct thread *thread)
{
static struct peer *peer;
peer = hb->data;
pthread_mutex_lock(&peer->obuf_mtx);
{
bgp_write(peer);
}
pthread_mutex_unlock(&peer->obuf_mtx);
peer = THREAD_ARG(thread);
uint16_t status;
// dispatch job on main thread
BGP_TIMER_ON(peer->t_generate_updgrp_packets,
bgp_generate_updgrp_packets, 100);
if (peer->fd < 0)
return -1;
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
bool reschedule;
pthread_mutex_lock(&peer->io_mtx);
{
status = bgp_write(peer);
reschedule = (stream_fifo_head(peer->obuf) != NULL);
}
pthread_mutex_unlock(&peer->io_mtx);
if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
}
if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
reschedule = 0; // problem
if (reschedule) {
thread_add_write(fpt->master, bgp_process_writes, peer,
peer->fd, &peer->t_write);
thread_add_background(bm->master, bgp_generate_updgrp_packets,
peer, 0,
&peer->t_generate_updgrp_packets);
}
return 0;
}
/**
* Called from PTHREAD_IO when select() or poll() determines that the file
* descriptor is ready to be read from.
*/
static int bgp_process_reads(struct thread *thread)
{
static struct peer *peer;
peer = THREAD_ARG(thread);
uint16_t status;
if (peer->fd < 0)
return -1;
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
bool reschedule = true;
// execute read
pthread_mutex_lock(&peer->io_mtx);
{
status = bgp_read(peer);
}
pthread_mutex_unlock(&peer->io_mtx);
// check results of read
bool header_valid = true;
if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
}
if (CHECK_FLAG(status, BGP_IO_FATAL_ERR))
reschedule = false; // problem
if (CHECK_FLAG(status, BGP_IO_READ_HEADER)) {
header_valid = validate_header(peer);
if (!header_valid) {
bgp_size_t packetsize =
MIN((int)stream_get_endp(peer->ibuf_work),
BGP_MAX_PACKET_SIZE);
memcpy(peer->last_reset_cause, peer->ibuf_work->data,
packetsize);
peer->last_reset_cause_size = packetsize;
// We're tearing the session down, no point in
// rescheduling.
// Additionally, bgp_read() will use the TLV if it's
// present to
// determine how much to read; if this is corrupt, we'll
// crash the
// program.
reschedule = false;
}
}
// if we read a full packet, push it onto peer->ibuf, reset our WiP
// buffer
// and schedule a job to process it on the main thread
if (header_valid && CHECK_FLAG(status, BGP_IO_READ_FULLPACKET)) {
pthread_mutex_lock(&peer->io_mtx);
{
stream_fifo_push(peer->ibuf,
stream_dup(peer->ibuf_work));
}
pthread_mutex_unlock(&peer->io_mtx);
stream_reset(peer->ibuf_work);
assert(stream_get_endp(peer->ibuf_work) == 0);
thread_add_background(bm->master, bgp_process_packet, peer, 0,
&peer->t_process_packet);
}
if (reschedule)
thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);
return 0;
}
/**
@ -212,14 +320,14 @@ static void peer_process_writes(struct hash_backet *hb, void *arg)
*
* This function pops packets off of peer->obuf and writes them to peer->fd.
* The amount of packets written is equal to the minimum of peer->wpkt_quanta
* and the number of packets on the output buffer.
* and the number of packets on the output buffer, unless an error occurs.
*
* If write() returns an error, the appropriate FSM event is generated.
*
* The return value is equal to the number of packets written
* (which may be zero).
*/
static int bgp_write(struct peer *peer)
static uint16_t bgp_write(struct peer *peer)
{
u_char type;
struct stream *s;
@ -227,10 +335,8 @@ static int bgp_write(struct peer *peer)
int update_last_write = 0;
unsigned int count = 0;
unsigned int oc = 0;
uint16_t status = 0;
/* Write packets. The number of packets written is the value of
* bgp->wpkt_quanta or the size of the output buffer, whichever is
* smaller.*/
while (count < peer->bgp->wpkt_quanta
&& (s = stream_fifo_head(peer->obuf))) {
int writenum;
@ -239,8 +345,12 @@ static int bgp_write(struct peer *peer)
num = write(peer->fd, STREAM_PNT(s), writenum);
if (num < 0) {
if (!ERRNO_IO_RETRY(errno))
if (!ERRNO_IO_RETRY(errno)) {
BGP_EVENT_ADD(peer, TCP_fatal_error);
SET_FLAG(status, BGP_IO_FATAL_ERR);
} else {
SET_FLAG(status, BGP_IO_TRANS_ERR);
}
goto done;
} else if (num != writenum) // incomplete write
@ -288,7 +398,7 @@ static int bgp_write(struct peer *peer)
}
count++;
/* OK we send packet so delete it. */
stream_free(stream_fifo_pop(peer->obuf));
update_last_write = 1;
}
@ -303,5 +413,170 @@ done : {
peer->last_write = bgp_clock();
}
return count;
return status;
}
/**
* Reads <= 1 packet worth of data from peer->fd into peer->ibuf_work.
*
* @return whether a full packet was read
*/
static uint16_t bgp_read(struct peer *peer)
{
int readsize; // how many bytes we want to read
int nbytes; // how many bytes we actually read
bool have_header = false;
uint16_t status = 0;
if (stream_get_endp(peer->ibuf_work) < BGP_HEADER_SIZE)
readsize = BGP_HEADER_SIZE - stream_get_endp(peer->ibuf_work);
else {
// retrieve packet length from tlv and compute # bytes we still
// need
u_int16_t mlen =
stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
readsize = mlen - stream_get_endp(peer->ibuf_work);
have_header = true;
}
nbytes = stream_read_try(peer->ibuf_work, peer->fd, readsize);
if (nbytes <= 0) // handle errors
{
switch (nbytes) {
case -1: // fatal error; tear down the session
zlog_err("%s [Error] bgp_read_packet error: %s",
peer->host, safe_strerror(errno));
if (peer->status == Established) {
if (CHECK_FLAG(peer->sflags,
PEER_STATUS_NSF_MODE)) {
peer->last_reset =
PEER_DOWN_NSF_CLOSE_SESSION;
SET_FLAG(peer->sflags,
PEER_STATUS_NSF_WAIT);
} else
peer->last_reset =
PEER_DOWN_CLOSE_SESSION;
}
BGP_EVENT_ADD(peer, TCP_fatal_error);
SET_FLAG(status, BGP_IO_FATAL_ERR);
break;
case 0: // TCP session closed
if (bgp_debug_neighbor_events(peer))
zlog_debug(
"%s [Event] BGP connection closed fd %d",
peer->host, peer->fd);
if (peer->status == Established) {
if (CHECK_FLAG(peer->sflags,
PEER_STATUS_NSF_MODE)) {
peer->last_reset =
PEER_DOWN_NSF_CLOSE_SESSION;
SET_FLAG(peer->sflags,
PEER_STATUS_NSF_WAIT);
} else
peer->last_reset =
PEER_DOWN_CLOSE_SESSION;
}
BGP_EVENT_ADD(peer, TCP_connection_closed);
SET_FLAG(status, BGP_IO_FATAL_ERR);
break;
case -2: // temporary error; come back later
SET_FLAG(status, BGP_IO_TRANS_ERR);
break;
default:
break;
}
return status;
}
// If we didn't have the header before read(), and now we do, set the
// appropriate flag. The caller must validate the header for us.
if (!have_header
&& stream_get_endp(peer->ibuf_work) >= BGP_HEADER_SIZE) {
SET_FLAG(status, BGP_IO_READ_HEADER);
have_header = true;
}
// If we read the # of bytes specified in the tlv, we have read a full
// packet.
//
// Note that the header may not have been validated here. This flag
// means
// ONLY that we read the # of bytes specified in the header; if the
// header is
// not valid, the packet MUST NOT be processed further.
if (have_header && (stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE)
== stream_get_endp(peer->ibuf_work)))
SET_FLAG(status, BGP_IO_READ_FULLPACKET);
return status;
}
/*
* Called after we have read a BGP packet header. Validates marker, message
* type and packet length. If any of these aren't correct, sends a notify.
*/
static bool validate_header(struct peer *peer)
{
u_int16_t size, type;
/* Marker check */
for (int i = 0; i < BGP_MARKER_SIZE; i++)
if (peer->ibuf_work->data[i] != 0xff) {
bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
BGP_NOTIFY_HEADER_NOT_SYNC);
return false;
}
/* Get size and type. */
size = stream_getw_from(peer->ibuf_work, BGP_MARKER_SIZE);
type = stream_getc_from(peer->ibuf_work, BGP_MARKER_SIZE + 2);
/* BGP type check. */
if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
&& type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
&& type != BGP_MSG_ROUTE_REFRESH_NEW
&& type != BGP_MSG_ROUTE_REFRESH_OLD
&& type != BGP_MSG_CAPABILITY) {
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s unknown message type 0x%02x", peer->host,
type);
bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
BGP_NOTIFY_HEADER_BAD_MESTYPE,
(u_char *)&type, 1);
return false;
}
/* Mimimum packet length check. */
if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
|| (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
|| (type == BGP_MSG_UPDATE && size < BGP_MSG_UPDATE_MIN_SIZE)
|| (type == BGP_MSG_NOTIFY && size < BGP_MSG_NOTIFY_MIN_SIZE)
|| (type == BGP_MSG_KEEPALIVE && size != BGP_MSG_KEEPALIVE_MIN_SIZE)
|| (type == BGP_MSG_ROUTE_REFRESH_NEW
&& size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
|| (type == BGP_MSG_ROUTE_REFRESH_OLD
&& size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
|| (type == BGP_MSG_CAPABILITY
&& size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s bad message length - %d for %s",
peer->host, size,
type == 128 ? "ROUTE-REFRESH"
: bgp_type_str[(int)type]);
bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
BGP_NOTIFY_HEADER_BAD_MESLEN,
(u_char *)&size, 2);
return false;
}
return true;
}

View File

@ -23,13 +23,13 @@
#ifndef _FRR_BGP_IO_H
#define _FRR_BGP_IO_H
#include "frr_pthread.h"
#include "bgpd/bgpd.h"
/**
* Control variable for write thread.
*
* Setting this variable to false and calling peer_writes_wake() will
* eventually result in thread termination.
* Setting this variable to false will eventually result in thread termination.
*/
extern bool bgp_packet_writes_thread_run;
@ -37,35 +37,32 @@ extern bool bgp_packet_writes_thread_run;
* Initializes data structures and flags for the write thread.
*
* This function should be called from the main thread before
* peer_writes_start() is invoked.
* bgp_writes_start() is invoked.
*/
extern void peer_writes_init(void);
extern void bgp_io_init(void);
/**
* Start function for write thread.
*
* This function should be passed to pthread_create() during BGP startup.
* @param arg - unused
*/
extern void *peer_writes_start(void *arg);
extern void *bgp_io_start(void *arg);
/**
* Start function for write thread.
*
* Uninitializes all resources and stops the thread.
*
* @param result -- where to store data result, unused
* @param result - where to store data result, unused
*/
extern int peer_writes_stop(void **result);
extern int bgp_io_stop(void **result, struct frr_pthread *fpt);
/**
* Registers a peer with the write thread.
*
* This function adds the peer to an internal data structure, which must be
* locked for write access. This call will block until the structure can be
* locked.
* Turns on packet writing for a peer.
*
* After this function is called, any packets placed on peer->obuf will be
* written to peer->fd at regular intervals.
* written to peer->fd at regular intervals. Additionally it becomes unsafe to
* use peer->fd with select() or poll().
*
* This function increments the peer reference counter with peer_lock().
*
@ -73,17 +70,51 @@ extern int peer_writes_stop(void **result);
*
* @param peer - peer to register
*/
extern void peer_writes_on(struct peer *peer);
extern void bgp_writes_on(struct peer *peer);
/**
* Deregisters a peer with the write thread.
*
* This function removes the peer from an internal data structure, which must
* be locked for write access. This call will block until the structure can be
* locked.
* Turns off packet writing for a peer.
*
* After this function is called, any packets placed on peer->obuf will not be
* written to peer->fd.
* written to peer->fd. After this function returns it is safe to use peer->fd
* with select() or poll().
*
* If the flush = true, a last-ditch effort will be made to flush any remaining
* packets to peer->fd. Upon encountering any error whatsoever, the attempt
* will abort. If the caller wishes to know whether the flush succeeded they
* may check peer->obuf->count against zero.
*
* If the peer is not registered, nothing happens.
*
* @param peer - peer to deregister
* @param flush - as described
*/
extern void bgp_writes_off(struct peer *peer);
/**
* Turns on packet reading for a peer.
*
* After this function is called, any packets received on peer->fd will be read
* and copied into the FIFO queue peer->ibuf. Additionally it becomes unsafe to
* use peer->fd with select() or poll().
*
* When a full packet is read, bgp_process_packet() will be scheduled on the
* main thread.
*
* This function increments the peer reference counter with peer_lock().
*
* If the peer is already registered, nothing happens.
*
* @param peer - peer to register
*/
extern void bgp_reads_on(struct peer *peer);
/**
* Turns off packet reading for a peer.
*
* After this function is called, any packets received on peer->fd will not be
* read. After this function returns it is safe to use peer->fd with select()
* or poll().
*
* This function decrements the peer reference counter with peer_unlock().
*
@ -91,14 +122,6 @@ extern void peer_writes_on(struct peer *peer);
*
* @param peer - peer to deregister
*/
extern void peer_writes_off(struct peer *peer);
/**
* Notifies the write thread that there is work to be done.
*
* This function has the effect of waking the write thread if it is sleeping.
* If the thread is not sleeping, this signal will be ignored.
*/
extern void peer_writes_wake(void);
extern void bgp_reads_off(struct peer *peer);
#endif /* _FRR_BGP_IO_H */

View File

@ -1,26 +1,26 @@
/*
* BGP Keepalives.
*
* Implements a producer thread to generate BGP keepalives for peers.
* ----------------------------------------
* Copyright (C) 2017 Cumulus Networks, Inc.
* Quentin Young
*
* This file is part of FRRouting.
*
* FRRouting is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2, or (at your option) any later
* version.
*
* FRRouting is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* FRRouting; see the file COPYING. If not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
BGP Keepalives.
Implements a producer thread to generate BGP keepalives for peers.
----------------------------------------
Copyright (C) 2017 Cumulus Networks, Inc.
Quentin Young
This file is part of FRRouting.
FRRouting is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free
Software Foundation; either version 2, or (at your option) any later
version.
FRRouting is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
You should have received a copy of the GNU General Public License along with
FRRouting; see the file COPYING. If not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include <zebra.h>
#include <signal.h>
@ -73,7 +73,8 @@ static void pkat_del(void *pkat)
/*
* Walks the list of peers, sending keepalives to those that are due for them.
* Callback for hash_iterate. Determines if a peer needs a keepalive and if so,
* generates and sends it.
*
* For any given peer, if the elapsed time since its last keepalive exceeds its
* configured keepalive timer, a keepalive is sent to the peer and its
@ -143,8 +144,8 @@ static unsigned int peer_hash_key(void *arg)
void peer_keepalives_init()
{
peerhash_mtx = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_mutex_t));
peerhash_cond = XCALLOC(MTYPE_PTHREAD, sizeof(pthread_cond_t));
peerhash_mtx = XCALLOC(MTYPE_TMP, sizeof(pthread_mutex_t));
peerhash_cond = XCALLOC(MTYPE_TMP, sizeof(pthread_cond_t));
// initialize mutex
pthread_mutex_init(peerhash_mtx, NULL);
@ -175,8 +176,8 @@ static void peer_keepalives_finish(void *arg)
pthread_mutex_destroy(peerhash_mtx);
pthread_cond_destroy(peerhash_cond);
XFREE(MTYPE_PTHREAD, peerhash_mtx);
XFREE(MTYPE_PTHREAD, peerhash_cond);
XFREE(MTYPE_TMP, peerhash_mtx);
XFREE(MTYPE_TMP, peerhash_cond);
}
/**
@ -275,9 +276,8 @@ void peer_keepalives_wake()
pthread_mutex_unlock(peerhash_mtx);
}
int peer_keepalives_stop(void **result)
int peer_keepalives_stop(void **result, struct frr_pthread *fpt)
{
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_KEEPALIVES);
bgp_keepalives_thread_run = false;
peer_keepalives_wake();
pthread_join(fpt->thread, result);

View File

@ -24,6 +24,7 @@
#ifndef _BGP_KEEPALIVES_H_
#define _BGP_KEEPALIVES_H_
#include "frr_pthread.h"
#include "bgpd.h"
/* Thread control flag.
@ -88,6 +89,6 @@ extern void *peer_keepalives_start(void *arg);
extern void peer_keepalives_wake(void);
/* stop function */
int peer_keepalives_stop(void **result);
int peer_keepalives_stop(void **result, struct frr_pthread *fpt);
#endif /* _BGP_KEEPALIVES_H */

View File

@ -59,16 +59,6 @@
#include "bgpd/bgp_label.h"
#include "bgpd/bgp_io.h"
/* Linked list of active peers */
static pthread_mutex_t *plist_mtx;
static pthread_cond_t *write_cond;
static struct list *plist;
/* periodically scheduled thread to generate update-group updates */
static struct thread *t_generate_updgrp_packets;
bool bgp_packet_writes_thread_run = false;
/* Set up BGP packet marker and packet type. */
int bgp_packet_set_marker(struct stream *s, u_char type)
{
@ -107,11 +97,9 @@ int bgp_packet_set_size(struct stream *s)
*/
static void bgp_packet_add(struct peer *peer, struct stream *s)
{
pthread_mutex_lock(&peer->obuf_mtx);
pthread_mutex_lock(&peer->io_mtx);
stream_fifo_push(peer->obuf, s);
pthread_mutex_unlock(&peer->obuf_mtx);
peer_writes_wake();
pthread_mutex_unlock(&peer->io_mtx);
}
static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,
@ -165,7 +153,6 @@ static struct stream *bgp_update_packet_eor(struct peer *peer, afi_t afi,
int bgp_generate_updgrp_packets(struct thread *thread)
{
struct peer *peer = THREAD_ARG(thread);
peer->t_generate_updgrp_packets = NULL;
struct stream *s;
struct peer_af *paf;
@ -237,10 +224,13 @@ int bgp_generate_updgrp_packets(struct thread *thread)
if ((s = bgp_update_packet_eor(
peer, afi,
safi)))
safi))) {
bgp_packet_add(
peer,
s);
bgp_writes_on(
peer);
}
}
}
continue;
@ -252,6 +242,7 @@ int bgp_generate_updgrp_packets(struct thread *thread)
* attributes from peer and advance peer */
s = bpacket_reformat_for_peer(next_pkt, paf);
bgp_packet_add(peer, s);
bgp_writes_on(peer);
bpacket_queue_advance_peer(paf);
}
} while (s);
@ -282,6 +273,8 @@ void bgp_keepalive_send(struct peer *peer)
/* Add packet to the peer. */
bgp_packet_add(peer, s);
bgp_writes_on(peer);
}
/*
@ -335,6 +328,67 @@ void bgp_open_send(struct peer *peer)
/* Add packet to the peer. */
bgp_packet_add(peer, s);
bgp_writes_on(peer);
}
/* This is only for sending NOTIFICATION message to neighbor. */
static int bgp_write_notify(struct peer *peer)
{
int ret, val;
u_char type;
struct stream *s;
pthread_mutex_lock(&peer->io_mtx);
{
/* There should be at least one packet. */
s = stream_fifo_pop(peer->obuf);
if (!s)
return 0;
assert(stream_get_endp(s) >= BGP_HEADER_SIZE);
}
pthread_mutex_unlock(&peer->io_mtx);
/* Stop collecting data within the socket */
sockopt_cork(peer->fd, 0);
/* socket is in nonblocking mode, if we can't deliver the NOTIFY, well,
* we only care about getting a clean shutdown at this point. */
ret = write(peer->fd, STREAM_DATA(s), stream_get_endp(s));
/* only connection reset/close gets counted as TCP_fatal_error, failure
* to write the entire NOTIFY doesn't get different FSM treatment */
if (ret <= 0) {
BGP_EVENT_ADD(peer, TCP_fatal_error);
return 0;
}
/* Disable Nagle, make NOTIFY packet go out right away */
val = 1;
(void)setsockopt(peer->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val,
sizeof(val));
/* Retrieve BGP packet type. */
stream_set_getp(s, BGP_MARKER_SIZE + 2);
type = stream_getc(s);
assert(type == BGP_MSG_NOTIFY);
/* Type should be notify. */
peer->notify_out++;
/* Double start timer. */
peer->v_start *= 2;
/* Overflow check. */
if (peer->v_start >= (60 * 2))
peer->v_start = (60 * 2);
/* Handle Graceful Restart case where the state changes to
Connect instead of Idle */
BGP_EVENT_ADD(peer, BGP_Stop);
return 0;
}
/*
@ -372,10 +426,12 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
/* Set BGP packet length. */
length = bgp_packet_set_size(s);
/* Add packet to the peer. */
pthread_mutex_lock(&peer->obuf_mtx);
stream_fifo_clean(peer->obuf);
pthread_mutex_unlock(&peer->obuf_mtx);
/* wipe output buffer */
pthread_mutex_lock(&peer->io_mtx);
{
stream_fifo_clean(peer->obuf);
}
pthread_mutex_unlock(&peer->io_mtx);
/* For debug */
{
@ -428,8 +484,8 @@ void bgp_notify_send_with_data(struct peer *peer, u_char code, u_char sub_code,
/* Add packet to peer's output queue */
bgp_packet_add(peer, s);
/* Wake up the write thread to get the notify out ASAP */
peer_writes_wake();
bgp_write_notify(peer);
}
/*
@ -544,6 +600,8 @@ void bgp_route_refresh_send(struct peer *peer, afi_t afi, safi_t safi,
/* Add packet to the peer. */
bgp_packet_add(peer, s);
bgp_writes_on(peer);
}
/*
@ -593,6 +651,8 @@ void bgp_capability_send(struct peer *peer, afi_t afi, safi_t safi,
/* Add packet to the peer. */
bgp_packet_add(peer, s);
bgp_writes_on(peer);
}
/* RFC1771 6.8 Connection collision detection. */
@ -696,13 +756,13 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
u_int16_t *holdtime_ptr;
/* Parse open packet. */
version = stream_getc(peer->ibuf);
memcpy(notify_data_remote_as, stream_pnt(peer->ibuf), 2);
remote_as = stream_getw(peer->ibuf);
holdtime_ptr = (u_int16_t *)stream_pnt(peer->ibuf);
holdtime = stream_getw(peer->ibuf);
memcpy(notify_data_remote_id, stream_pnt(peer->ibuf), 4);
remote_id.s_addr = stream_get_ipv4(peer->ibuf);
version = stream_getc(peer->curr);
memcpy(notify_data_remote_as, stream_pnt(peer->curr), 2);
remote_as = stream_getw(peer->curr);
holdtime_ptr = (u_int16_t *)stream_pnt(peer->curr);
holdtime = stream_getw(peer->curr);
memcpy(notify_data_remote_id, stream_pnt(peer->curr), 4);
remote_id.s_addr = stream_get_ipv4(peer->curr);
/* Receive OPEN message log */
if (bgp_debug_neighbor_events(peer))
@ -714,11 +774,11 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
/* BEGIN to read the capability here, but dont do it yet */
mp_capability = 0;
optlen = stream_getc(peer->ibuf);
optlen = stream_getc(peer->curr);
if (optlen != 0) {
/* If not enough bytes, it is an error. */
if (STREAM_READABLE(peer->ibuf) < optlen) {
if (STREAM_READABLE(peer->curr) < optlen) {
bgp_notify_send(peer, BGP_NOTIFY_OPEN_ERR,
BGP_NOTIFY_OPEN_MALFORMED_ATTR);
return -1;
@ -990,10 +1050,6 @@ static int bgp_open_receive(struct peer *peer, bgp_size_t size)
return (ret);
}
peer->packet_size = 0;
if (peer->ibuf)
stream_reset(peer->ibuf);
return 0;
}
@ -1177,7 +1233,7 @@ static int bgp_update_receive(struct peer *peer, bgp_size_t size)
memset(peer->rcvd_attr_str, 0, BUFSIZ);
peer->rcvd_attr_printed = 0;
s = peer->ibuf;
s = peer->curr;
end = stream_pnt(s) + size;
/* RFC1771 6.3 If the Unfeasible Routes Length or Total Attribute
@ -1424,8 +1480,8 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
peer->notify.length = 0;
}
bgp_notify.code = stream_getc(peer->ibuf);
bgp_notify.subcode = stream_getc(peer->ibuf);
bgp_notify.code = stream_getc(peer->curr);
bgp_notify.subcode = stream_getc(peer->curr);
bgp_notify.length = size - 2;
bgp_notify.data = NULL;
@ -1436,7 +1492,7 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
if (bgp_notify.length) {
peer->notify.length = size - 2;
peer->notify.data = XMALLOC(MTYPE_TMP, size - 2);
memcpy(peer->notify.data, stream_pnt(peer->ibuf), size - 2);
memcpy(peer->notify.data, stream_pnt(peer->curr), size - 2);
}
/* For debug */
@ -1451,12 +1507,12 @@ static void bgp_notify_receive(struct peer *peer, bgp_size_t size)
for (i = 0; i < bgp_notify.length; i++)
if (first) {
sprintf(c, " %02x",
stream_getc(peer->ibuf));
stream_getc(peer->curr));
strcat(bgp_notify.data, c);
} else {
first = 1;
sprintf(c, "%02x",
stream_getc(peer->ibuf));
stream_getc(peer->curr));
strcpy(bgp_notify.data, c);
}
bgp_notify.raw_data = (u_char *)peer->notify.data;
@ -1526,7 +1582,7 @@ static void bgp_route_refresh_receive(struct peer *peer, bgp_size_t size)
return;
}
s = peer->ibuf;
s = peer->curr;
/* Parse packet. */
pkt_afi = stream_getw(s);
@ -1874,7 +1930,7 @@ int bgp_capability_receive(struct peer *peer, bgp_size_t size)
u_char *pnt;
/* Fetch pointer. */
pnt = stream_pnt(peer->ibuf);
pnt = stream_pnt(peer->curr);
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s rcv CAPABILITY", peer->host);
@ -1902,188 +1958,50 @@ int bgp_capability_receive(struct peer *peer, bgp_size_t size)
return bgp_capability_msg_parse(peer, pnt, size);
}
/* BGP read utility function. */
static int bgp_read_packet(struct peer *peer)
/* Starting point of packet process function. */
int bgp_process_packet(struct thread *thread)
{
int nbytes;
int readsize;
/* Yes first of all get peer pointer. */
struct peer *peer;
peer = THREAD_ARG(thread);
readsize = peer->packet_size - stream_get_endp(peer->ibuf);
/* If size is zero then return. */
if (!readsize)
/* Guard against scheduled events that occur after peer deletion. */
if (peer->status == Deleted)
return 0;
/* Read packet from fd. */
nbytes = stream_read_try(peer->ibuf, peer->fd, readsize);
/* If read byte is smaller than zero then error occured. */
if (nbytes < 0) {
/* Transient error should retry */
if (nbytes == -2)
return -1;
zlog_err("%s [Error] bgp_read_packet error: %s", peer->host,
safe_strerror(errno));
if (peer->status == Established) {
if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
} else
peer->last_reset = PEER_DOWN_CLOSE_SESSION;
}
BGP_EVENT_ADD(peer, TCP_fatal_error);
return -1;
}
/* When read byte is zero : clear bgp peer and return */
if (nbytes == 0) {
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s [Event] BGP connection closed fd %d",
peer->host, peer->fd);
if (peer->status == Established) {
if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_MODE)) {
peer->last_reset = PEER_DOWN_NSF_CLOSE_SESSION;
SET_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT);
} else
peer->last_reset = PEER_DOWN_CLOSE_SESSION;
}
BGP_EVENT_ADD(peer, TCP_connection_closed);
return -1;
}
/* We read partial packet. */
if (stream_get_endp(peer->ibuf) != peer->packet_size)
return -1;
return 0;
}
/* Marker check. */
static int bgp_marker_all_one(struct stream *s, int length)
{
int i;
for (i = 0; i < length; i++)
if (s->data[i] != 0xff)
return 0;
return 1;
}
/* Starting point of packet process function. */
int bgp_read(struct thread *thread)
{
int ret;
u_char type = 0;
struct peer *peer;
bgp_size_t size;
char notify_data_length[2];
u_int32_t notify_out;
/* Yes first of all get peer pointer. */
peer = THREAD_ARG(thread);
peer->t_read = NULL;
/* Note notify_out so we can check later to see if we sent another one
*/
notify_out = peer->notify_out;
if (peer->fd < 0) {
zlog_err("bgp_read(): peer's fd is negative value %d",
peer->fd);
return -1;
pthread_mutex_lock(&peer->io_mtx);
{
peer->curr = stream_fifo_pop(peer->ibuf);
}
pthread_mutex_unlock(&peer->io_mtx);
BGP_READ_ON(peer->t_read, bgp_read, peer->fd);
if (peer->curr == NULL) // no packets to process, hmm...
return 0;
/* Read packet header to determine type of the packet */
if (peer->packet_size == 0)
peer->packet_size = BGP_HEADER_SIZE;
bgp_size_t actual_size = stream_get_endp(peer->curr);
if (stream_get_endp(peer->ibuf) < BGP_HEADER_SIZE) {
ret = bgp_read_packet(peer);
/* skip the marker and copy the packet length */
stream_forward_getp(peer->curr, BGP_MARKER_SIZE);
memcpy(notify_data_length, stream_pnt(peer->curr), 2);
/* Header read error or partial read packet. */
if (ret < 0)
goto done;
/* Get size and type. */
stream_forward_getp(peer->ibuf, BGP_MARKER_SIZE);
memcpy(notify_data_length, stream_pnt(peer->ibuf), 2);
size = stream_getw(peer->ibuf);
type = stream_getc(peer->ibuf);
/* Marker check */
if (((type == BGP_MSG_OPEN) || (type == BGP_MSG_KEEPALIVE))
&& !bgp_marker_all_one(peer->ibuf, BGP_MARKER_SIZE)) {
bgp_notify_send(peer, BGP_NOTIFY_HEADER_ERR,
BGP_NOTIFY_HEADER_NOT_SYNC);
goto done;
}
/* BGP type check. */
if (type != BGP_MSG_OPEN && type != BGP_MSG_UPDATE
&& type != BGP_MSG_NOTIFY && type != BGP_MSG_KEEPALIVE
&& type != BGP_MSG_ROUTE_REFRESH_NEW
&& type != BGP_MSG_ROUTE_REFRESH_OLD
&& type != BGP_MSG_CAPABILITY) {
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s unknown message type 0x%02x",
peer->host, type);
bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
BGP_NOTIFY_HEADER_BAD_MESTYPE,
&type, 1);
goto done;
}
/* Mimimum packet length check. */
if ((size < BGP_HEADER_SIZE) || (size > BGP_MAX_PACKET_SIZE)
|| (type == BGP_MSG_OPEN && size < BGP_MSG_OPEN_MIN_SIZE)
|| (type == BGP_MSG_UPDATE
&& size < BGP_MSG_UPDATE_MIN_SIZE)
|| (type == BGP_MSG_NOTIFY
&& size < BGP_MSG_NOTIFY_MIN_SIZE)
|| (type == BGP_MSG_KEEPALIVE
&& size != BGP_MSG_KEEPALIVE_MIN_SIZE)
|| (type == BGP_MSG_ROUTE_REFRESH_NEW
&& size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
|| (type == BGP_MSG_ROUTE_REFRESH_OLD
&& size < BGP_MSG_ROUTE_REFRESH_MIN_SIZE)
|| (type == BGP_MSG_CAPABILITY
&& size < BGP_MSG_CAPABILITY_MIN_SIZE)) {
if (bgp_debug_neighbor_events(peer))
zlog_debug("%s bad message length - %d for %s",
peer->host, size,
type == 128
? "ROUTE-REFRESH"
: bgp_type_str[(int)type]);
bgp_notify_send_with_data(peer, BGP_NOTIFY_HEADER_ERR,
BGP_NOTIFY_HEADER_BAD_MESLEN,
(u_char *)notify_data_length,
2);
goto done;
}
/* Adjust size to message length. */
peer->packet_size = size;
}
ret = bgp_read_packet(peer);
if (ret < 0)
goto done;
/* Get size and type again. */
(void)stream_getw_from(peer->ibuf, BGP_MARKER_SIZE);
type = stream_getc_from(peer->ibuf, BGP_MARKER_SIZE + 2);
/* read in the packet length and type */
size = stream_getw(peer->curr);
type = stream_getc(peer->curr);
/* BGP packet dump function. */
bgp_dump_packet(peer, type, peer->ibuf);
bgp_dump_packet(peer, type, peer->curr);
size = (peer->packet_size - BGP_HEADER_SIZE);
/* adjust size to exclude the marker + length + type */
size -= BGP_HEADER_SIZE;
/* Read rest of the packet and call each sort of packet routine */
switch (type) {
@ -2118,26 +2036,14 @@ int bgp_read(struct thread *thread)
* of the packet for troubleshooting purposes
*/
if (notify_out < peer->notify_out) {
memcpy(peer->last_reset_cause, peer->ibuf->data,
peer->packet_size);
peer->last_reset_cause_size = peer->packet_size;
notify_out = peer->notify_out;
memcpy(peer->last_reset_cause, peer->curr->data, actual_size);
peer->last_reset_cause_size = actual_size;
}
/* Clear input buffer. */
peer->packet_size = 0;
if (peer->ibuf)
stream_reset(peer->ibuf);
done:
/* If reading this packet caused us to send a NOTIFICATION then store a
* copy
* of the packet for troubleshooting purposes
*/
if (notify_out < peer->notify_out) {
memcpy(peer->last_reset_cause, peer->ibuf->data,
peer->packet_size);
peer->last_reset_cause_size = peer->packet_size;
/* Delete packet and carry on. */
if (peer->curr) {
stream_free(peer->curr);
peer->curr = NULL;
}
return 0;

View File

@ -38,8 +38,6 @@
#define ORF_COMMON_PART_DENY 0x20
/* Packet send and receive function prototypes. */
extern int bgp_read(struct thread *);
extern void bgp_keepalive_send(struct peer *);
extern void bgp_open_send(struct peer *);
extern void bgp_notify_send(struct peer *, u_int8_t, u_int8_t);
@ -68,5 +66,6 @@ extern int bgp_packet_set_size(struct stream *s);
extern bool bgp_packet_writes_thread_run;
extern int bgp_generate_updgrp_packets(struct thread *);
extern int bgp_process_packet(struct thread *);
#endif /* _QUAGGA_BGP_PACKET_H */

View File

@ -179,7 +179,7 @@ struct update_subgroup {
struct stream *work;
/* We use a separate stream to encode MP_REACH_NLRI for efficient
* NLRI packing. peer->work stores all the other attributes. The
* NLRI packing. peer->obuf_work stores all the other attributes. The
* actual packet is then constructed by concatenating the two.
*/
struct stream *scratch;

View File

@ -7068,14 +7068,40 @@ static int bgp_show_summary(struct vty *vty, struct bgp *bgp, int afi, int safi,
vty_out(vty, "4 %10u %7u %7u %8" PRIu64 " %4d %4zd %8s",
peer->as,
peer->open_in + peer->update_in
+ peer->keepalive_in + peer->notify_in
+ peer->refresh_in
+ peer->dynamic_cap_in,
peer->open_out + peer->update_out
+ peer->keepalive_out + peer->notify_out
+ peer->refresh_out
+ peer->dynamic_cap_out,
atomic_load_explicit(&peer->open_in,
memory_order_relaxed)
+ atomic_load_explicit(
&peer->update_in,
memory_order_relaxed)
+ atomic_load_explicit(
&peer->keepalive_in,
memory_order_relaxed)
+ atomic_load_explicit(
&peer->notify_in,
memory_order_relaxed)
+ atomic_load_explicit(
&peer->refresh_in,
memory_order_relaxed)
+ atomic_load_explicit(
&peer->dynamic_cap_in,
memory_order_relaxed),
atomic_load_explicit(&peer->open_out,
memory_order_relaxed)
+ atomic_load_explicit(
&peer->update_out,
memory_order_relaxed)
+ atomic_load_explicit(
&peer->keepalive_out,
memory_order_relaxed)
+ atomic_load_explicit(
&peer->notify_out,
memory_order_relaxed)
+ atomic_load_explicit(
&peer->refresh_out,
memory_order_relaxed)
+ atomic_load_explicit(
&peer->dynamic_cap_out,
memory_order_relaxed),
peer->version[afi][safi], 0, peer->obuf->count,
peer_uptime(peer->uptime, timebuf,
BGP_UPTIME_LEN, 0, NULL));

View File

@ -992,10 +992,14 @@ static void peer_free(struct peer *peer)
* but just to be sure..
*/
bgp_timer_set(peer);
BGP_READ_OFF(peer->t_read);
peer_writes_off(peer);
bgp_reads_off(peer);
bgp_writes_off(peer);
assert(!peer->t_write);
assert(!peer->t_read);
BGP_EVENT_FLUSH(peer);
pthread_mutex_destroy(&peer->io_mtx);
/* Free connected nexthop, if present */
if (CHECK_FLAG(peer->flags, PEER_FLAG_CONFIG_NODE)
&& !peer_dynamic_neighbor(peer))
@ -1138,11 +1142,11 @@ struct peer *peer_new(struct bgp *bgp)
SET_FLAG(peer->sflags, PEER_STATUS_CAPABILITY_OPEN);
/* Create buffers. */
peer->ibuf = stream_new(BGP_MAX_PACKET_SIZE);
peer->ibuf = stream_fifo_new();
peer->obuf = stream_fifo_new();
pthread_mutex_init(&peer->obuf_mtx, NULL);
pthread_mutex_init(&peer->io_mtx, NULL);
/* We use a larger buffer for peer->work in the event that:
/* We use a larger buffer for peer->obuf_work in the event that:
* - We RX a BGP_UPDATE where the attributes alone are just
* under BGP_MAX_PACKET_SIZE
* - The user configures an outbound route-map that does many as-path
@ -1156,8 +1160,9 @@ struct peer *peer_new(struct bgp *bgp)
* bounds
* checking for every single attribute as we construct an UPDATE.
*/
peer->work =
peer->obuf_work =
stream_new(BGP_MAX_PACKET_SIZE + BGP_MAX_PACKET_SIZE_OVERFLOW);
peer->ibuf_work = stream_new(BGP_MAX_PACKET_SIZE);
peer->scratch = stream_new(BGP_MAX_PACKET_SIZE);
@ -2086,6 +2091,11 @@ int peer_delete(struct peer *peer)
bgp = peer->bgp;
accept_peer = CHECK_FLAG(peer->sflags, PEER_STATUS_ACCEPT_PEER);
bgp_reads_off(peer);
bgp_writes_off(peer);
assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON));
assert(!CHECK_FLAG(peer->thread_flags, PEER_THREAD_READS_ON));
if (CHECK_FLAG(peer->sflags, PEER_STATUS_NSF_WAIT))
peer_nsf_stop(peer);
@ -2147,7 +2157,7 @@ int peer_delete(struct peer *peer)
/* Buffers. */
if (peer->ibuf) {
stream_free(peer->ibuf);
stream_fifo_free(peer->ibuf);
peer->ibuf = NULL;
}
@ -2156,9 +2166,14 @@ int peer_delete(struct peer *peer)
peer->obuf = NULL;
}
if (peer->work) {
stream_free(peer->work);
peer->work = NULL;
if (peer->ibuf_work) {
stream_free(peer->ibuf_work);
peer->ibuf_work = NULL;
}
if (peer->obuf_work) {
stream_free(peer->obuf_work);
peer->obuf_work = NULL;
}
if (peer->scratch) {
@ -7389,20 +7404,24 @@ void bgp_pthreads_init()
{
frr_pthread_init();
frr_pthread_new("BGP write thread", PTHREAD_WRITE, peer_writes_start,
peer_writes_stop);
frr_pthread_new("BGP i/o thread", PTHREAD_IO, bgp_io_start,
bgp_io_stop);
frr_pthread_new("BGP keepalives thread", PTHREAD_KEEPALIVES,
peer_keepalives_start, peer_keepalives_stop);
/* pre-run initialization */
peer_keepalives_init();
peer_writes_init();
bgp_io_init();
}
void bgp_pthreads_run()
{
frr_pthread_run(PTHREAD_WRITE, NULL, NULL);
frr_pthread_run(PTHREAD_KEEPALIVES, NULL, NULL);
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
frr_pthread_run(PTHREAD_IO, &attr, NULL);
frr_pthread_run(PTHREAD_KEEPALIVES, &attr, NULL);
}
void bgp_pthreads_finish()

View File

@ -101,7 +101,7 @@ struct bgp_master {
struct thread_master *master;
/* BGP pthreads. */
#define PTHREAD_WRITE (1 << 1)
#define PTHREAD_IO (1 << 1)
#define PTHREAD_KEEPALIVES (1 << 2)
/* work queues */
@ -589,13 +589,17 @@ struct peer {
struct in_addr local_id;
/* Packet receive and send buffer. */
struct stream *ibuf;
pthread_mutex_t obuf_mtx;
struct stream_fifo *obuf;
struct stream *work;
pthread_mutex_t io_mtx; // guards ibuf, obuf
struct stream_fifo *ibuf; // packets waiting to be processed
struct stream_fifo *obuf; // packets waiting to be written
struct stream *ibuf_work; // WiP buffer used by bgp_read() only
struct stream *obuf_work; // WiP buffer used to construct packets
struct stream *curr; // the current packet being parsed
/* We use a separate stream to encode MP_REACH_NLRI for efficient
* NLRI packing. peer->work stores all the other attributes. The
* NLRI packing. peer->obuf_work stores all the other attributes. The
* actual packet is then constructed by concatenating the two.
*/
struct stream *scratch;
@ -799,7 +803,9 @@ struct peer {
/* Threads. */
struct thread *t_read;
struct thread *t_write;
struct thread *t_start;
struct thread *t_connect_check;
struct thread *t_connect;
struct thread *t_holdtime;
struct thread *t_routeadv;
@ -807,11 +813,13 @@ struct peer {
struct thread *t_gr_restart;
struct thread *t_gr_stale;
struct thread *t_generate_updgrp_packets;
struct thread *t_process_packet;
/* Thread flags. */
u_int16_t thread_flags;
#define PEER_THREAD_WRITES_ON (1 << 0)
#define PEER_THREAD_KEEPALIVES_ON (1 << 1)
#define PEER_THREAD_WRITES_ON (1 << 1)
#define PEER_THREAD_READS_ON (1 << 2)
#define PEER_THREAD_KEEPALIVES_ON (1 << 3)
/* workqueues */
struct work_queue *clear_node_queue;
@ -853,9 +861,6 @@ struct peer {
/* Notify data. */
struct bgp_notify notify;
/* Whole packet size to be read. */
unsigned long packet_size;
/* Filter structure. */
struct bgp_filter filter[AFI_MAX][SAFI_MAX];
@ -1149,7 +1154,7 @@ enum bgp_clear_type {
};
/* Macros. */
#define BGP_INPUT(P) ((P)->ibuf)
#define BGP_INPUT(P) ((P)->curr)
#define BGP_INPUT_PNT(P) (STREAM_PNT(BGP_INPUT(P)))
#define BGP_IS_VALID_STATE_FOR_NOTIF(S) \
(((S) == OpenSent) || ((S) == OpenConfirm) || ((S) == Established))

View File

@ -1304,18 +1304,29 @@ static int rfapi_open_inner(struct rfapi_descriptor *rfd, struct bgp *bgp,
rfd->peer = peer_new(bgp);
rfd->peer->status = Established; /* keep bgp core happy */
bgp_sync_delete(rfd->peer); /* don't need these */
if (rfd->peer->ibuf) {
stream_free(rfd->peer->ibuf); /* don't need it */
// since this peer is not on the I/O thread, this lock is not strictly
// necessary, but serves as a reminder to those who may meddle...
pthread_mutex_lock(&rfd->peer->io_mtx);
{
// we don't need any I/O related facilities
if (rfd->peer->ibuf)
stream_fifo_free(rfd->peer->ibuf);
if (rfd->peer->obuf)
stream_fifo_free(rfd->peer->obuf);
if (rfd->peer->ibuf_work)
stream_free(rfd->peer->ibuf_work);
if (rfd->peer->obuf_work)
stream_free(rfd->peer->obuf_work);
rfd->peer->ibuf = NULL;
}
if (rfd->peer->obuf) {
stream_fifo_free(rfd->peer->obuf); /* don't need it */
rfd->peer->obuf = NULL;
rfd->peer->obuf_work = NULL;
rfd->peer->ibuf_work = NULL;
}
if (rfd->peer->work) {
stream_free(rfd->peer->work); /* don't need it */
rfd->peer->work = NULL;
}
pthread_mutex_unlock(&rfd->peer->io_mtx);
{ /* base code assumes have valid host pointer */
char buf[BUFSIZ];
buf[0] = 0;

View File

@ -183,22 +183,31 @@ static void vnc_redistribute_add(struct prefix *p, u_int32_t metric,
vncHD1VR.peer->status =
Established; /* keep bgp core happy */
bgp_sync_delete(vncHD1VR.peer); /* don't need these */
if (vncHD1VR.peer->ibuf) {
stream_free(vncHD1VR.peer
->ibuf); /* don't need it */
// since this peer is not on the I/O thread, this lock
// is not strictly
// necessary, but serves as a reminder to those who may
// meddle...
pthread_mutex_lock(&vncHD1VR.peer->io_mtx);
{
// we don't need any I/O related facilities
if (vncHD1VR.peer->ibuf)
stream_fifo_free(vncHD1VR.peer->ibuf);
if (vncHD1VR.peer->obuf)
stream_fifo_free(vncHD1VR.peer->obuf);
if (vncHD1VR.peer->ibuf_work)
stream_free(vncHD1VR.peer->ibuf_work);
if (vncHD1VR.peer->obuf_work)
stream_free(vncHD1VR.peer->obuf_work);
vncHD1VR.peer->ibuf = NULL;
}
if (vncHD1VR.peer->obuf) {
stream_fifo_free(
vncHD1VR.peer
->obuf); /* don't need it */
vncHD1VR.peer->obuf = NULL;
vncHD1VR.peer->obuf_work = NULL;
vncHD1VR.peer->ibuf_work = NULL;
}
if (vncHD1VR.peer->work) {
stream_free(vncHD1VR.peer
->work); /* don't need it */
vncHD1VR.peer->work = NULL;
}
pthread_mutex_unlock(&vncHD1VR.peer->io_mtx);
/* base code assumes have valid host pointer */
vncHD1VR.peer->host =
XSTRDUP(MTYPE_BGP_PEER_HOST, ".zebra.");

View File

@ -36,7 +36,6 @@
DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread")
DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master")
DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats")
DEFINE_MTYPE(LIB, PTHREAD, "POSIX Thread")
#if defined(__APPLE__)
#include <mach/mach.h>

View File

@ -26,9 +26,6 @@
#include <poll.h>
#include "monotime.h"
#include "memory.h"
DECLARE_MTYPE(PTHREAD)
struct rusage_t {
struct rusage cpu;
struct timeval real;