diff --git a/src/client/pipe_to_stream.rs b/src/client/pipe_to_stream.rs new file mode 100644 index 00000000..14c34be7 --- /dev/null +++ b/src/client/pipe_to_stream.rs @@ -0,0 +1,65 @@ +// Implement simple flow control for h2 client +// +// See also: hyper/src/proto/h2/mod.rs + +use failure::*; + +use futures::{try_ready, Async, Future, Poll}; +use h2::{SendStream}; +use bytes::Bytes; + +pub struct PipeToSendStream { + body_tx: SendStream, + data: Option, +} + +impl PipeToSendStream { + pub fn new(data: Bytes, tx: SendStream) -> PipeToSendStream { + PipeToSendStream { + body_tx: tx, + data: Some(data), + } + } +} + +impl Future for PipeToSendStream { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + loop { + if self.data != None { + // we don't have the next chunk of data yet, so just reserve 1 byte to make + // sure there's some capacity available. h2 will handle the capacity management + // for the actual body chunk. + self.body_tx.reserve_capacity(1); + + if self.body_tx.capacity() == 0 { + loop { + match try_ready!(self.body_tx.poll_capacity().map_err(Error::from)) { + Some(0) => {} + Some(_) => break, + None => return Err(format_err!("protocol canceled")), + } + } + } else { + if let Async::Ready(reason) = self.body_tx.poll_reset().map_err(Error::from)? { + return Err(format_err!("stream received RST_STREAM: {:?}", reason)); + } + } + + self.body_tx + .send_data(self.data.take().unwrap(), true) + .map_err(Error::from)?; + + return Ok(Async::Ready(())); + + } else { + if let Async::Ready(reason) = self.body_tx.poll_reset().map_err(Error::from)? { + return Err(format_err!("stream received RST_STREAM: {:?}", reason)); + } + return Ok(Async::Ready(())); + } + } + } +}