diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs index 84158a2c..070a10c1 100644 --- a/pbs-client/src/chunk_stream.rs +++ b/pbs-client/src/chunk_stream.rs @@ -228,3 +228,120 @@ where } } } + +#[cfg(test)] +mod test { + use futures::stream::StreamExt; + + use super::*; + + struct DummyInput { + data: Vec, + } + + impl DummyInput { + fn new(data: Vec) -> Self { + Self { data } + } + } + + impl Stream for DummyInput { + type Item = Result, Error>; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + let this = self.get_mut(); + match this.data.len() { + 0 => Poll::Ready(None), + size if size > 10 => Poll::Ready(Some(Ok(this.data.split_off(10)))), + _ => Poll::Ready(Some(Ok(std::mem::take(&mut this.data)))), + } + } + } + + #[test] + fn test_chunk_stream_forced_boundaries() { + let mut data = Vec::new(); + for i in 0..(256 * 1024) { + for j in 0..4 { + let byte = ((i >> (j << 3)) & 0xff) as u8; + data.push(byte); + } + } + + let mut input = DummyInput::new(data); + let input = Pin::new(&mut input); + + let (injections_tx, injections_rx) = mpsc::channel(); + let (boundaries_tx, boundaries_rx) = mpsc::channel(); + let (suggested_tx, suggested_rx) = mpsc::channel(); + let injection_data = InjectionData::new(boundaries_rx, injections_tx); + + let mut chunk_stream = ChunkStream::new( + input, + Some(64 * 1024), + Some(injection_data), + Some(suggested_rx), + ); + let chunks = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let chunks_clone = chunks.clone(); + + // Suggested boundary matching forced boundary + suggested_tx.send(32 * 1024).unwrap(); + // Suggested boundary not matching forced boundary + suggested_tx.send(64 * 1024).unwrap(); + // Force chunk boundary at suggested boundary + boundaries_tx + .send(InjectChunks { + boundary: 32 * 1024, + chunks: Vec::new(), + size: 1024, + }) + .unwrap(); + // Force chunk boundary within regular chunk + boundaries_tx + .send(InjectChunks { + boundary: 128 * 1024, + chunks: Vec::new(), + size: 2048, + }) + .unwrap(); + // Force chunk boundary aligned with regular boundary + boundaries_tx + .send(InjectChunks { + boundary: 657408, + chunks: Vec::new(), + size: 512, + }) + .unwrap(); + // Force chunk boundary within regular chunk, without injecting data + boundaries_tx + .send(InjectChunks { + boundary: 657408 + 1024, + chunks: Vec::new(), + size: 0, + }) + .unwrap(); + + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async move { + while let Some(chunk) = chunk_stream.next().await { + let chunk = chunk.unwrap(); + let mut chunks = chunks.lock().unwrap(); + chunks.push(chunk); + } + }); + + let mut total = 0; + let chunks = chunks_clone.lock().unwrap(); + let expected = [32768, 31744, 65536, 262144, 262144, 512, 262144, 131584]; + for (chunk, expected) in chunks.as_slice().iter().zip(expected.iter()) { + assert_eq!(chunk.len(), *expected); + total += chunk.len(); + } + while let Ok(injection) = injections_rx.recv() { + total += injection.size; + } + + assert_eq!(total, 4 * 256 * 1024 + 1024 + 2048 + 512); + } +}