diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs index 0e837991..884407b5 100644 --- a/src/client/pxar_backup_stream.rs +++ b/src/client/pxar_backup_stream.rs @@ -1,12 +1,13 @@ -use failure::*; -use std::io::{Write, Seek}; -use std::thread; -use std::sync::{Arc, Mutex}; +use std::collections::HashSet; +use std::io::{Seek, Write}; use std::os::unix::io::FromRawFd; use std::path::{Path, PathBuf}; -use std::collections::HashSet; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::thread; -use futures::Poll; +use failure::*; use futures::stream::Stream; use nix::fcntl::OFlag; @@ -38,6 +39,7 @@ impl Drop for PxarBackupStream { } impl PxarBackupStream { + pin_utils::unsafe_pinned!(stream: Option>); pub fn new( mut dir: Dir, @@ -93,16 +95,22 @@ impl PxarBackupStream { impl Stream for PxarBackupStream { - type Item = Vec; - type Error = Error; + type Item = Result, Error>; - fn poll(&mut self) -> Poll>, Error> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { { // limit lock scope let error = self.error.lock().unwrap(); if let Some(ref msg) = *error { - return Err(format_err!("{}", msg)); + return Poll::Ready(Some(Err(format_err!("{}", msg)))); } } - self.stream.as_mut().unwrap().poll().map_err(Error::from) + let res = self.as_mut() + .stream() + .as_pin_mut() + .unwrap() + .poll_next(cx); + Poll::Ready(futures::ready!(res) + .map(|v| v.map_err(Error::from)) + ) } }