diff --git a/src/tools/wrapped_reader_stream.rs b/src/tools/wrapped_reader_stream.rs index 06b3929e..bf8c53f2 100644 --- a/src/tools/wrapped_reader_stream.rs +++ b/src/tools/wrapped_reader_stream.rs @@ -1,15 +1,16 @@ -//use failure::*; -use tokio_threadpool; -use std::io::Read; -use futures::Async; +use std::io::{self, Read}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use tokio_executor::threadpool::blocking; use futures::stream::Stream; -pub struct WrappedReaderStream { +pub struct WrappedReaderStream { reader: R, buffer: Vec, } -impl WrappedReaderStream { +impl WrappedReaderStream { pub fn new(reader: R) -> Self { let mut buffer = Vec::with_capacity(64*1024); @@ -18,29 +19,26 @@ impl WrappedReaderStream { } } -fn blocking_err() -> std::io::Error { - std::io::Error::new( - std::io::ErrorKind::Other, - "`blocking` annotated I/O must be called from the context of the Tokio runtime.") -} +impl Stream for WrappedReaderStream { + type Item = Result, io::Error>; -impl Stream for WrappedReaderStream { - - type Item = Vec; - type Error = std::io::Error; - - fn poll(&mut self) -> Result>>, std::io::Error> { - match tokio_threadpool::blocking(|| self.reader.read(&mut self.buffer)) { - Ok(Async::Ready(Ok(n))) => { - if n == 0 { // EOF - Ok(Async::Ready(None)) + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + let this = self.get_mut(); + match blocking(|| this.reader.read(&mut this.buffer)) { + Poll::Ready(Ok(Ok(n))) => { + if n == 0 { + // EOF + Poll::Ready(None) } else { - Ok(Async::Ready(Some(self.buffer[..n].to_vec()))) + Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) } - }, - Ok(Async::Ready(Err(err))) => Err(err), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => Err(blocking_err()), + } + Poll::Ready(Ok(Err(err))) => Poll::Ready(Some(Err(err))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(io::Error::new( + io::ErrorKind::Other, + err.to_string(), + )))), + Poll::Pending => Poll::Pending, } } }