diff --git a/exec/gmi.c b/exec/gmi.c index 3ea71349..d41bfea8 100644 --- a/exec/gmi.c +++ b/exec/gmi.c @@ -92,7 +92,7 @@ #define QUEUE_ASSEMBLY_SIZE_MAX ((MESSAGE_SIZE_MAX / 1472) + 1) #define QUEUE_RTR_ITEMS_SIZE_MAX 256 #define QUEUE_PEND_TRANS_SIZE_MAX ((MESSAGE_SIZE_MAX / 1472) + 1) -#define MAXIOVS 8 +#define MAXIOVS 4 #define RTR_TOKEN_SIZE_MAX 32 #define MISSING_MCAST_WINDOW 64 #define TIMEOUT_STATE_GATHER 100 @@ -142,12 +142,17 @@ enum message_type { */ struct queue queues_pend_trans[PRIORITY_MAX]; +struct reftwo { + int refcount; +}; + /* * In-order pending delivery queue */ struct assembly_queue_item { struct iovec iovec[MAXIOVS]; int iov_len; + struct reftwo *reftwo; }; struct assembly_queue { @@ -156,10 +161,18 @@ struct assembly_queue { struct queue queue; }; +struct pend_queue_msg_item { + struct reftwo *reftwo; + char *iov_base[MAXIOVS]; + int iovec_entries; +}; + struct pend_queue_item { int seqid; - struct iovec iovec[256]; + struct iovec iovec[QUEUE_PEND_SIZE_MAX * MAXIOVS]; int iov_len; + struct pend_queue_msg_item pend_queue_msg_items[QUEUE_PEND_SIZE_MAX]; + int pend_queue_msg_item_count; }; struct queue_frag { @@ -199,10 +212,6 @@ int gmi_arut = 0; /* * Delivered up to and including */ -int gmi_adut = 0; - -int gmi_adut_old = 0; - int gmi_original_arut = 0; int gmi_highest_seq = 0; @@ -279,6 +288,15 @@ struct mcast { struct gmi_groupname groupname; }; +/* + * MTU - multicast message header - IP header - UDP header + * + * On lossy switches, making use of the DF UDP flag can lead to loss of + * forward progress. So the packets must be fragmented by the algorithm + * and reassembled at the receiver. + */ +#define FRAGMENT_SIZE (PACKET_SIZE_MAX - sizeof (struct mcast) - 20 - 8) + struct rtr_item { struct memb_conf_id conf_id; int seqid; @@ -329,14 +347,15 @@ struct memb_join { struct gmi_pend_trans_item { struct mcast *mcast; - struct iovec iovec[MAXIOVS]; int iov_len; + struct reftwo *reftwo; }; struct gmi_rtr_item { struct iovec iovec[MAXIOVS+2]; /* +2 is for mcast msg + group name TODO is this right */ int iov_len; + struct reftwo *reftwo; }; enum memb_state { @@ -611,6 +630,12 @@ static int gmi_pend_trans_item_store ( goto error_mcast; } + gmi_pend_trans_item.reftwo = malloc (sizeof (struct reftwo)); + if (gmi_pend_trans_item.reftwo == 0) { + goto error_reftwo; + } + gmi_pend_trans_item.reftwo->refcount = 2; + /* * Set mcast header */ @@ -650,10 +675,46 @@ error_iovec: free (gmi_pend_trans_item.iovec[j].iov_base); } return (-1); +error_reftwo: + free (gmi_pend_trans_item.mcast); error_mcast: return (0); } +static void release_reftwo_char (struct reftwo *reftwo, + char **iovec, + int iovec_entries) +{ + int i; + + assert (reftwo > 0); + if ((--reftwo->refcount) == 0) { + for (i = 0; i < iovec_entries; i++) { + free (iovec[i]); + iovec[i] = (char *)0xdeadbeef; + } + reftwo->refcount = -1; + free (reftwo); + } +} + +static void release_reftwo_iovec (struct reftwo *reftwo, + struct iovec *iovec, + int iovec_entries) +{ + int i; + + assert (reftwo > 0); + if ((--reftwo->refcount) == 0) { + for (i = 0; i < iovec_entries; i++) { + free (iovec[i].iov_base); + iovec[i].iov_base = (char *)0xdeadbeef; + } + reftwo->refcount = -1; + free (reftwo); + } +} + static void encrypt_and_sign (struct iovec *iovec, int iov_len) { char *addr = iov_encrypted.iov_base + sizeof (struct security_header); @@ -822,15 +883,6 @@ print_digest ("calculated digest", digest_comparison); return (0); } -/* - * MTU - multicast message header - IP header - UDP header - * - * On lossy switches, making use of the DF UDP flag can lead to loss of - * forward progress. So the packets must be fragmented by the algorithm - * and reassembled at the receiver. - */ -#define FRAGMENT_SIZE (PACKET_SIZE_MAX - sizeof (struct mcast) - 20 - 8) - static void timer_function_single_member (void *data); /* @@ -872,9 +924,9 @@ static void timer_function_single_member (void *data) } int gmi_mcast ( - struct gmi_groupname *groupname, - struct iovec *iovec, - int iov_len, + struct gmi_groupname *groupname, + struct iovec *iovec, + int iov_len, int priority) { int res; @@ -1214,7 +1266,7 @@ int gmi_brake; static int messages_free (int group_arut) { struct gmi_rtr_item *gmi_rtr_item_p; - int i, j; + int i; int res; int lesser; @@ -1227,13 +1279,7 @@ static int messages_free (int group_arut) gmi_brake = last_group_arut; } - /* - * Determine low water mark for messages to be freed - */ lesser = gmi_brake; - if (lesser > gmi_adut) { - lesser = gmi_adut; - } //printf ("Freeing lesser %d %d %d\n", lesser, group_arut, last_group_arut); //printf ("lesser %d gropu arut %d last group arut %d\n", lesser, group_arut, last_group_arut); @@ -1253,16 +1299,14 @@ static int messages_free (int group_arut) for (i = last_released; i <= lesser; i++) { res = sq_item_get (&queue_rtr_items, i, (void **)&gmi_rtr_item_p); if (res == 0) { - for (j = 0; j < gmi_rtr_item_p->iov_len; j++) { - free (gmi_rtr_item_p->iovec[j].iov_base); - gmi_rtr_item_p->iovec[j].iov_base = (void *)0xdeadbeef; - gmi_rtr_item_p->iovec[j].iov_len = i; - } + release_reftwo_iovec (gmi_rtr_item_p->reftwo, + gmi_rtr_item_p->iovec, + gmi_rtr_item_p->iov_len); } + sq_items_release (&queue_rtr_items, i); last_released = i + 1; } - sq_items_release (&queue_rtr_items, lesser); gmi_log_printf (gmi_log_level_debug, "releasing messages up to and including %d\n", lesser); return (0); } @@ -1340,6 +1384,7 @@ static int orf_token_mcast ( * Build IO vector */ memset (&gmi_rtr_item, 0, sizeof (struct gmi_rtr_item)); + gmi_rtr_item.reftwo = gmi_pend_trans_item->reftwo; gmi_rtr_item.iovec[0].iov_base = gmi_pend_trans_item->mcast; gmi_rtr_item.iovec[0].iov_len = sizeof (struct mcast); @@ -1558,6 +1603,23 @@ static void queues_queue_frag_memb_new (void) memset (queues_frag_new, 0, sizeof (struct queue_frag) * MAX_MEMBERS); + /* + * Free queues that are no longer part of the configuration + */ + for (i = 0; i < MAX_MEMBERS; i++) { + found = 0; + for (j = 0; j < memb_list_entries_confchg; j++) { + if (memb_list[j].sin_addr.s_addr == queues_frag[i].source_addr.s_addr) { + found = 1; + break; + } + } + if (found == 0) { + queue_free (&queues_frag[i].assembly.queue); + queue_free (&queues_frag[i].pend_queue); + } + } + /* * Build new pending list */ @@ -1662,19 +1724,18 @@ printf ("EVS STATE group arut %d gmi arut %d highest %d barrier %d starting grou // TODO if (memb_state == MEMB_STATE_EVS && gmi_arut == gmi_barrier_seq && orf_token->group_arut == gmi_barrier_seq) { gmi_log_printf (gmi_log_level_notice, "EVS recovery of messages complete, transitioning to operational.\n"); + messages_free (gmi_barrier_seq - 1); /* * EVS recovery complete, reset local variables */ gmi_arut = 0; - gmi_adut_old = gmi_adut; - gmi_adut = 0; - // gmi_token_seqid = 0; gmi_highest_seq_old = gmi_highest_seq; gmi_highest_seq = 0; last_group_arut = 0; + last_released = 0; sq_reinit (&queue_rtr_items, 0); memb_failed_list_entries = 0; @@ -2820,6 +2881,7 @@ printf ("Queue empty[%d] %d queues seqid %d lowest so far %d\n", i, static void app_deliver (void) { struct queue_frag *queue_frag; struct pend_queue_item *pend_queue_item; + int i; do { queue_frag = queue_frag_delivery_find (); @@ -2849,9 +2911,14 @@ assert (queue_frag); pend_queue_item->iov_len); /* - * Release messages that can be freed - */ - gmi_adut = queue_frag->seqid; + * Reduce ref count on these delivered messages and free them if their + * reference count is zero + */ + for (i = 0; i < pend_queue_item->pend_queue_msg_item_count; i++) { + release_reftwo_char (pend_queue_item->pend_queue_msg_items[i].reftwo, + pend_queue_item->pend_queue_msg_items[i].iov_base, + pend_queue_item->pend_queue_msg_items[i].iovec_entries); + } /* * Reset lowest seqid for this pending queue from next assembled message @@ -2876,6 +2943,8 @@ static void assembly_deliver (struct queue_frag *queue_frag) struct iovec iovec_delv[256]; int iov_len_delv = 0; struct mcast *mcast = 0; + int pend_queue_msg_item_count; + int i; memset (iovec_delv, 0, sizeof (iovec_delv)); @@ -2887,9 +2956,21 @@ static void assembly_deliver (struct queue_frag *queue_frag) /* * Assemble all of the message iovectors into one iovector for delivery */ + pend_queue_msg_item_count = 0; do { assembly_queue_item = queue_item_iterator_get (&queue_frag->assembly.queue); + /* + * Assemble the refcounting structure to free the messages if appropriate + */ + for (i = 0; i < assembly_queue_item->iov_len; i++) { + pend_queue_item.pend_queue_msg_items[pend_queue_msg_item_count].iov_base[i] = + assembly_queue_item->iovec[i].iov_base; + } + pend_queue_item.pend_queue_msg_items[pend_queue_msg_item_count].iovec_entries = i; + pend_queue_item.pend_queue_msg_items[pend_queue_msg_item_count].reftwo = assembly_queue_item->reftwo; + pend_queue_msg_item_count++; + /* * Assemble io vector */ @@ -2924,6 +3005,7 @@ static void assembly_deliver (struct queue_frag *queue_frag) res = queue_item_iterator_next (&queue_frag->assembly.queue); } while (res == 0); + pend_queue_item.pend_queue_msg_item_count = pend_queue_msg_item_count; /* * assert that this really is the end of the packet @@ -3002,6 +3084,7 @@ static void pending_queues_deliver (void) /* * Create pending delivery item */ + assembly_queue_item.reftwo = gmi_rtr_item_p->reftwo; assembly_queue_item.iov_len = gmi_rtr_item_p->iov_len; memcpy (&assembly_queue_item.iovec, gmi_rtr_item_p->iovec, sizeof (struct iovec) * gmi_rtr_item_p->iov_len); @@ -3080,6 +3163,13 @@ static int message_handler_mcast ( if (gmi_rtr_item.iovec[0].iov_base == 0) { return (-1); /* error here is corrected by the algorithm */ } + gmi_rtr_item.reftwo = malloc (sizeof (struct reftwo)); + if (gmi_rtr_item.reftwo == 0) { + free (gmi_rtr_item.iovec[0].iov_base); + return (-1); + } + gmi_rtr_item.reftwo->refcount = 2; + memcpy (gmi_rtr_item.iovec[0].iov_base, mcast, bytes_received); gmi_rtr_item.iovec[0].iov_len = bytes_received; assert (gmi_rtr_item.iovec[0].iov_len > 0);