diff --git a/exec/totempg.c b/exec/totempg.c index 2f33354b..dde5390b 100644 --- a/exec/totempg.c +++ b/exec/totempg.c @@ -99,9 +99,21 @@ struct totempg_mcast_header { short type; }; +/* + * totempg_mcast structure + * + * header: Identify the mcast. + * fragmented: Set if this message continues into next message + * continuation: Set if this message is a continuation from last message + * msg_count Indicates how many packed messages are contained + * in the mcast. + * Also, the size of each packed message and the messages themselves are + * appended to the end of this structure when sent. + */ struct totempg_mcast { struct totempg_mcast_header header; - short fragmented; /* This message continues into next message */ + unsigned char fragmented; + unsigned char continuation; short msg_count; /* * short msg_len[msg_count]; @@ -114,7 +126,8 @@ struct totempg_mcast { /* * Maximum packet size for totem pg messages */ -#define TOTEMPG_PACKET_SIZE (TOTEMSRP_PACKET_SIZE_MAX - sizeof (struct totempg_mcast)) +#define TOTEMPG_PACKET_SIZE (TOTEMSRP_PACKET_SIZE_MAX - \ + sizeof (struct totempg_mcast)) /* * Local variables used for packing small messages @@ -147,9 +160,19 @@ struct assembly { struct assembly *assembly_list[16]; // MAX PROCESSORS TODO int assembly_list_entries = 0; +/* + * Staging buffer for packed messages. Messages are staged in this buffer + * before sending. Multiple messages may fit which cuts down on the + * number of mcasts sent. If a message doesn't completely fit, then + * the mcast header has a fragment bit set that says that there are more + * data to follow. fragment_size is an index into the buffer. It indicates + * the size of message data and where to place new message data. + * fragment_contuation indicates whether the first packed message in + * the buffer is a continuation of a previously packed fragment. + */ static unsigned char fragmentation_data[TOTEMPG_PACKET_SIZE]; - int fragment_size = 0; +int fragment_continuation = 0; static struct iovec iov_delv; @@ -233,16 +256,20 @@ static void totempg_deliver_fn ( int h_index; int a_i = 0; int msg_count; + int continuation; assembly = find_assembly (source_addr); assert (assembly); /* - * Assemble the header into one block of data - * Assemble the packet contents into one block of data to simplify delivery + * Assemble the header into one block of data and + * assemble the packet contents into one block of data to simplify delivery */ if (iov_len == 1) { - /* message originated from external processor - 1 iovec for full msg */ + /* + * This message originated from external processor + * because there is only one iovec for the full msg. + */ char *data; int datasize; @@ -263,7 +290,10 @@ static void totempg_deliver_fn ( memcpy (&assembly->data[assembly->index], &data[datasize], iovec[0].iov_len - datasize); } else { - /* message originated from local processor - <1 iovec for full msg */ + /* + * The message originated from local processor + * becasue there is greater than one iovec for then full msg. + */ h_index = 0; for (i = 0; i < 2; i++) { memcpy (&header[h_index], iovec[i].iov_base, iovec[i].iov_len); @@ -284,7 +314,6 @@ static void totempg_deliver_fn ( } if (endian_conversion_required) { - mcast->fragmented = swab16 (mcast->fragmented); mcast->msg_count = swab16 (mcast->msg_count); for (i = 0; i < mcast->msg_count; i++) { msg_lens[i] = swab16 (msg_lens[i]); @@ -299,48 +328,47 @@ printf ("Message fragmented %d count %d\n", mcast->fragmented, mcast->msg_count) */ /* - * Deliver all full messages in packed message + * If the last message in the buffer is a fragment, then we + * can't deliver it. We'll first deliver the full messages + * then adjust the assembly buffer so we can add the rest of the + * fragment when it arrives. */ + msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count; + continuation = mcast->continuation; + iov_delv.iov_base = &assembly->data[0]; + iov_delv.iov_len = assembly->index + msg_lens[0]; - /* - * this message's last packed message is not a fragment - */ - if (mcast->fragmented == 0) { - iov_delv.iov_base = &assembly->data[0]; - iov_delv.iov_len = assembly->index + msg_lens[0]; - for (i = 0; i < mcast->msg_count; i++) { - assembly->index += msg_lens[i]; -//printf ("app deliver\n"); - app_deliver_fn (source_addr, &iov_delv, 1, + for (i = 0; i < msg_count; i++) { + /* + * If the first packed message is a continuation + * of a previous message, but the assembly buffer + * is empty, then we need to discard it since we can't + * assemble a complete message. + */ + if (continuation && (assembly->index == 0)) { + continuation = 0; + } else { + app_deliver_fn(source_addr, &iov_delv, 1, endian_conversion_required); - iov_delv.iov_base = &assembly->data[assembly->index]; + } + assembly->index += msg_lens[i]; + iov_delv.iov_base = &assembly->data[assembly->index]; + if (i < (msg_count - 1)) { iov_delv.iov_len = msg_lens[i + 1]; } - assembly->index = 0; - } else + } - /* - * This message's last packed message is a fragment - */ - if (mcast->fragmented == 1) { - iov_delv.iov_base = &assembly->data[0]; - iov_delv.iov_len = assembly->index + msg_lens[0]; - for (i = 0; i < mcast->msg_count - 1; i++) { - assembly->index += msg_lens[i]; -//printf ("app deliver\n"); - app_deliver_fn (source_addr, &iov_delv, 1, - endian_conversion_required); - iov_delv.iov_base = &assembly->data[assembly->index]; - iov_delv.iov_len = msg_lens[i + 1]; - } + if (mcast->fragmented) { if (mcast->msg_count > 1) { memmove (&assembly->data[0], &assembly->data[assembly->index], - msg_lens[mcast->msg_count - 1]); + msg_lens[msg_count]); assembly->index = 0; } - assembly->index += msg_lens[mcast->msg_count - 1]; + assembly->index += msg_lens[msg_count]; + } else { + assembly->index = 0; } } @@ -381,6 +409,14 @@ int callback_token_received_fn (enum totemsrp_callback_token_type type, return (0); } mcast.fragmented = 0; + + /* + * Was the first message in this buffer a continuation of a + * fragmented message? + */ + mcast.continuation = fragment_continuation; + fragment_continuation = 0; + mcast.msg_count = mcast_packed_msg_count; iovecs[0].iov_base = &mcast; @@ -468,6 +504,7 @@ int totempg_mcast ( for (i = 0; i < iov_len; ) { mcast.fragmented = 0; + mcast.continuation = fragment_continuation; copy_len = iovec[i].iov_len - copy_base; /* @@ -498,11 +535,15 @@ int totempg_mcast ( /* * if we're not on the last iovec or the iovec is too large to - * fit, then indicate a fragment. + * fit, then indicate a fragment. This also means that the next + * message will have the continuation of this one. */ if ((i < (iov_len - 1)) || ((copy_base + copy_len) < iovec[i].iov_len)) { mcast.fragmented = 1; + fragment_continuation = 1; + } else { + fragment_continuation = 0; } /*