mirror of
https://git.proxmox.com/git/proxmox
synced 2025-05-01 17:07:32 +00:00
rest-server: support unix sockets in create_daemon
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com> Reviewed-by: Lukas Wagner <l.wagner@proxmox.com>
This commit is contained in:
parent
440c7e3361
commit
aad01f7a90
@ -2,12 +2,13 @@
|
|||||||
|
|
||||||
use std::ffi::CString;
|
use std::ffi::CString;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::io::{Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
use std::os::raw::{c_char, c_int, c_uchar};
|
use std::os::raw::{c_char, c_int, c_uchar};
|
||||||
use std::os::unix::ffi::OsStrExt;
|
use std::os::unix::ffi::OsStrExt;
|
||||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||||
use std::panic::UnwindSafe;
|
use std::panic::UnwindSafe;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
use anyhow::{bail, format_err, Error};
|
use anyhow::{bail, format_err, Error};
|
||||||
use futures::future::{self, Either};
|
use futures::future::{self, Either};
|
||||||
@ -22,7 +23,8 @@ type BoxedStoreFunc = Box<dyn FnMut() -> Result<String, Error> + UnwindSafe + Se
|
|||||||
|
|
||||||
// Helper trait to "store" something in the environment to be re-used after re-executing the
|
// Helper trait to "store" something in the environment to be re-used after re-executing the
|
||||||
// service on a reload.
|
// service on a reload.
|
||||||
trait Reloadable: Sized {
|
#[doc(hidden)] // not public api
|
||||||
|
pub trait Reloadable: Sized {
|
||||||
fn restore(var: &str) -> Result<Self, Error>;
|
fn restore(var: &str) -> Result<Self, Error>;
|
||||||
fn get_store_func(&self) -> Result<BoxedStoreFunc, Error>;
|
fn get_store_func(&self) -> Result<BoxedStoreFunc, Error>;
|
||||||
}
|
}
|
||||||
@ -222,33 +224,79 @@ impl Reloader {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn fd_store_func(fd: RawFd) -> Result<BoxedStoreFunc, Error> {
|
||||||
|
let mut fd_opt = Some(unsafe {
|
||||||
|
OwnedFd::from_raw_fd(nix::fcntl::fcntl(
|
||||||
|
fd,
|
||||||
|
nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0),
|
||||||
|
)?)
|
||||||
|
});
|
||||||
|
Ok(Box::new(move || {
|
||||||
|
let fd = fd_opt.take().unwrap();
|
||||||
|
fd_change_cloexec(fd.as_raw_fd(), false)?;
|
||||||
|
Ok(fd.into_raw_fd().to_string())
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// NOTE: This must only be used for *async* I/O objects!
|
||||||
|
unsafe fn fd_restore_func<T>(var: &str) -> Result<T, Error>
|
||||||
|
where
|
||||||
|
T: FromRawFd,
|
||||||
|
{
|
||||||
|
let fd = var
|
||||||
|
.parse::<u32>()
|
||||||
|
.map_err(|e| format_err!("invalid file descriptor: {}", e))? as RawFd;
|
||||||
|
fd_change_cloexec(fd, true)?;
|
||||||
|
Ok(unsafe { T::from_raw_fd(fd) })
|
||||||
|
}
|
||||||
|
|
||||||
// For now all we need to do is store and reuse a tcp listening socket:
|
// For now all we need to do is store and reuse a tcp listening socket:
|
||||||
impl Reloadable for tokio::net::TcpListener {
|
impl Reloadable for tokio::net::TcpListener {
|
||||||
// NOTE: The socket must not be closed when the store-function is called:
|
// NOTE: The socket must not be closed when the store-function is called:
|
||||||
// FIXME: We could become "independent" of the TcpListener and its reference to the file
|
|
||||||
// descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?)
|
|
||||||
fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
|
fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
|
||||||
let mut fd_opt = Some(unsafe {
|
fd_store_func(self.as_raw_fd())
|
||||||
OwnedFd::from_raw_fd(nix::fcntl::fcntl(
|
|
||||||
self.as_raw_fd(),
|
|
||||||
nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0),
|
|
||||||
)?)
|
|
||||||
});
|
|
||||||
Ok(Box::new(move || {
|
|
||||||
let fd = fd_opt.take().unwrap();
|
|
||||||
fd_change_cloexec(fd.as_raw_fd(), false)?;
|
|
||||||
Ok(fd.into_raw_fd().to_string())
|
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn restore(var: &str) -> Result<Self, Error> {
|
fn restore(var: &str) -> Result<Self, Error> {
|
||||||
let fd = var
|
Ok(Self::from_std(unsafe { fd_restore_func(var) }?)?)
|
||||||
.parse::<u32>()
|
}
|
||||||
.map_err(|e| format_err!("invalid file descriptor: {}", e))? as RawFd;
|
}
|
||||||
fd_change_cloexec(fd, true)?;
|
|
||||||
Ok(Self::from_std(unsafe {
|
// For now all we need to do is store and reuse a tcp listening socket:
|
||||||
std::net::TcpListener::from_raw_fd(fd)
|
impl Reloadable for tokio::net::UnixListener {
|
||||||
})?)
|
// NOTE: The socket must not be closed when the store-function is called:
|
||||||
|
fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
|
||||||
|
fd_store_func(self.as_raw_fd())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn restore(var: &str) -> Result<Self, Error> {
|
||||||
|
Ok(Self::from_std(unsafe { fd_restore_func(var) }?)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Listenable: Reloadable {
|
||||||
|
type Address;
|
||||||
|
fn bind(addr: &Self::Address) -> Pin<Box<dyn Future<Output = io::Result<Self>> + Send + '_>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Listenable for tokio::net::TcpListener {
|
||||||
|
type Address = std::net::SocketAddr;
|
||||||
|
|
||||||
|
fn bind(addr: &Self::Address) -> Pin<Box<dyn Future<Output = io::Result<Self>> + Send + '_>> {
|
||||||
|
Box::pin(Self::bind(addr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Listenable for tokio::net::UnixListener {
|
||||||
|
type Address = std::os::unix::net::SocketAddr;
|
||||||
|
|
||||||
|
fn bind(addr: &Self::Address) -> Pin<Box<dyn Future<Output = io::Result<Self>> + Send + '_>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
let addr = addr.as_pathname().ok_or_else(|| {
|
||||||
|
io::Error::new(io::ErrorKind::Other, "missing path for unix socket")
|
||||||
|
})?;
|
||||||
|
Self::bind(addr)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -259,20 +307,21 @@ impl Reloadable for tokio::net::TcpListener {
|
|||||||
/// socket. The finished listening socket is then passed to the `create_service` function which
|
/// socket. The finished listening socket is then passed to the `create_service` function which
|
||||||
/// can be used to setup the TLS and the HTTP daemon. The returned future has to call
|
/// can be used to setup the TLS and the HTTP daemon. The returned future has to call
|
||||||
/// [systemd_notify] with [SystemdNotify::Ready] when the service is ready.
|
/// [systemd_notify] with [SystemdNotify::Ready] when the service is ready.
|
||||||
pub async fn create_daemon<F, S>(
|
pub async fn create_daemon<F, S, L>(
|
||||||
address: std::net::SocketAddr,
|
address: L::Address,
|
||||||
create_service: F,
|
create_service: F,
|
||||||
pidfn: Option<&str>,
|
pidfn: Option<&str>,
|
||||||
) -> Result<(), Error>
|
) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
F: FnOnce(tokio::net::TcpListener) -> Result<S, Error>,
|
L: Listenable,
|
||||||
|
F: FnOnce(L) -> Result<S, Error>,
|
||||||
S: Future<Output = Result<(), Error>>,
|
S: Future<Output = Result<(), Error>>,
|
||||||
{
|
{
|
||||||
let mut reloader = Reloader::new()?;
|
let mut reloader = Reloader::new()?;
|
||||||
|
|
||||||
let listener: tokio::net::TcpListener = reloader
|
let listener: L = reloader
|
||||||
.restore("PROXMOX_BACKUP_LISTEN_FD", move || async move {
|
.restore("PROXMOX_BACKUP_LISTEN_FD", move || async move {
|
||||||
Ok(tokio::net::TcpListener::bind(&address).await?)
|
Ok(L::bind(&address).await?)
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user