From 0cdb8d9c5bddb7e6abe9dabce7268434835a38c0 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Fri, 23 Aug 2019 13:30:27 +0200 Subject: [PATCH] src/tools/wrapped_reader_stream.rs: switch to async Signed-off-by: Wolfgang Bumiller --- src/tools/wrapped_reader_stream.rs | 50 ++++++++++++++---------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/src/tools/wrapped_reader_stream.rs b/src/tools/wrapped_reader_stream.rs index 06b3929e..bf8c53f2 100644 --- a/src/tools/wrapped_reader_stream.rs +++ b/src/tools/wrapped_reader_stream.rs @@ -1,15 +1,16 @@ -//use failure::*; -use tokio_threadpool; -use std::io::Read; -use futures::Async; +use std::io::{self, Read}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use tokio_executor::threadpool::blocking; use futures::stream::Stream; -pub struct WrappedReaderStream { +pub struct WrappedReaderStream { reader: R, buffer: Vec, } -impl WrappedReaderStream { +impl WrappedReaderStream { pub fn new(reader: R) -> Self { let mut buffer = Vec::with_capacity(64*1024); @@ -18,29 +19,26 @@ impl WrappedReaderStream { } } -fn blocking_err() -> std::io::Error { - std::io::Error::new( - std::io::ErrorKind::Other, - "`blocking` annotated I/O must be called from the context of the Tokio runtime.") -} +impl Stream for WrappedReaderStream { + type Item = Result, io::Error>; -impl Stream for WrappedReaderStream { - - type Item = Vec; - type Error = std::io::Error; - - fn poll(&mut self) -> Result>>, std::io::Error> { - match tokio_threadpool::blocking(|| self.reader.read(&mut self.buffer)) { - Ok(Async::Ready(Ok(n))) => { - if n == 0 { // EOF - Ok(Async::Ready(None)) + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + let this = self.get_mut(); + match blocking(|| this.reader.read(&mut this.buffer)) { + Poll::Ready(Ok(Ok(n))) => { + if n == 0 { + // EOF + Poll::Ready(None) } else { - Ok(Async::Ready(Some(self.buffer[..n].to_vec()))) + Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) } - }, - Ok(Async::Ready(Err(err))) => Err(err), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => Err(blocking_err()), + } + Poll::Ready(Ok(Err(err))) => Poll::Ready(Some(Err(err))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(io::Error::new( + io::ErrorKind::Other, + err.to_string(), + )))), + Poll::Pending => Poll::Pending, } } }