mirror of
https://github.com/rosenpass/rosenpass.git
synced 2025-12-18 21:34:37 +03:00
Merge: API: Close connections after errors & use mio::Token based polling
Merge pull request #404 from rosenpass/dev/karo/api_remove_connection
This commit is contained in:
@@ -74,6 +74,7 @@ tempfile = { workspace = true }
|
|||||||
rustix = {workspace = true}
|
rustix = {workspace = true}
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
default = ["experiment_api"]
|
||||||
experiment_memfd_secret = ["rosenpass-wireguard-broker/experiment_memfd_secret"]
|
experiment_memfd_secret = ["rosenpass-wireguard-broker/experiment_memfd_secret"]
|
||||||
experiment_libcrux = ["rosenpass-ciphers/experiment_libcrux"]
|
experiment_libcrux = ["rosenpass-ciphers/experiment_libcrux"]
|
||||||
experiment_api = ["hex-literal", "uds", "command-fds", "rosenpass-util/experiment_file_descriptor_passing", "rosenpass-wireguard-broker/experiment_api"]
|
experiment_api = ["hex-literal", "uds", "command-fds", "rosenpass-util/experiment_file_descriptor_passing", "rosenpass-wireguard-broker/experiment_api"]
|
||||||
|
|||||||
@@ -203,7 +203,7 @@ where
|
|||||||
mio::net::UdpSocket::from_std(sock).ok()
|
mio::net::UdpSocket::from_std(sock).ok()
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut sock = match sock_res {
|
let sock = match sock_res {
|
||||||
Ok(sock) => sock,
|
Ok(sock) => sock,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::debug!("Error processing AddListenSocket API request: {e:?}");
|
log::debug!("Error processing AddListenSocket API request: {e:?}");
|
||||||
@@ -213,16 +213,7 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Register socket
|
// Register socket
|
||||||
let reg_result = run(|| -> anyhow::Result<()> {
|
let reg_result = self.app_server_mut().register_listen_socket(sock);
|
||||||
let srv = self.app_server_mut();
|
|
||||||
srv.mio_poll.registry().register(
|
|
||||||
&mut sock,
|
|
||||||
srv.mio_token_dispenser.dispense(),
|
|
||||||
mio::Interest::READABLE,
|
|
||||||
)?;
|
|
||||||
srv.sockets.push(sock);
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Err(internal_error) = reg_result {
|
if let Err(internal_error) = reg_result {
|
||||||
log::warn!("Internal error processing AddListenSocket API request: {internal_error:?}");
|
log::warn!("Internal error processing AddListenSocket API request: {internal_error:?}");
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ struct MioConnectionBuffers {
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct MioConnection {
|
pub struct MioConnection {
|
||||||
io: UnixStream,
|
io: UnixStream,
|
||||||
|
mio_token: mio::Token,
|
||||||
invalid_read: bool,
|
invalid_read: bool,
|
||||||
buffers: Option<MioConnectionBuffers>,
|
buffers: Option<MioConnectionBuffers>,
|
||||||
api_handler: ApiHandler,
|
api_handler: ApiHandler,
|
||||||
@@ -62,11 +63,11 @@ pub struct MioConnection {
|
|||||||
|
|
||||||
impl MioConnection {
|
impl MioConnection {
|
||||||
pub fn new(app_server: &mut AppServer, mut io: UnixStream) -> std::io::Result<Self> {
|
pub fn new(app_server: &mut AppServer, mut io: UnixStream) -> std::io::Result<Self> {
|
||||||
app_server.mio_poll.registry().register(
|
let mio_token = app_server.mio_token_dispenser.dispense();
|
||||||
&mut io,
|
app_server
|
||||||
app_server.mio_token_dispenser.dispense(),
|
.mio_poll
|
||||||
MIO_RW,
|
.registry()
|
||||||
)?;
|
.register(&mut io, mio_token, MIO_RW)?;
|
||||||
|
|
||||||
let invalid_read = false;
|
let invalid_read = false;
|
||||||
let read_buffer = LengthPrefixDecoder::new(SecretBuffer::new());
|
let read_buffer = LengthPrefixDecoder::new(SecretBuffer::new());
|
||||||
@@ -80,11 +81,30 @@ impl MioConnection {
|
|||||||
let api_state = ApiHandler::new();
|
let api_state = ApiHandler::new();
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
io,
|
io,
|
||||||
|
mio_token,
|
||||||
invalid_read,
|
invalid_read,
|
||||||
buffers,
|
buffers,
|
||||||
api_handler: api_state,
|
api_handler: api_state,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn shoud_close(&self) -> bool {
|
||||||
|
let exhausted = self
|
||||||
|
.buffers
|
||||||
|
.as_ref()
|
||||||
|
.map(|b| b.write_buffer.exhausted())
|
||||||
|
.unwrap_or(false);
|
||||||
|
self.invalid_read && exhausted
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn close(mut self, app_server: &mut AppServer) -> anyhow::Result<()> {
|
||||||
|
app_server.mio_poll.registry().deregister(&mut self.io)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn mio_token(&self) -> mio::Token {
|
||||||
|
self.mio_token
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait MioConnectionContext {
|
pub trait MioConnectionContext {
|
||||||
@@ -211,12 +231,7 @@ pub trait MioConnectionContext {
|
|||||||
log::warn!("Received message on API that was too big to fit in our buffers; \
|
log::warn!("Received message on API that was too big to fit in our buffers; \
|
||||||
looks like the client is broken. Stopping to process messages of the client.\n\
|
looks like the client is broken. Stopping to process messages of the client.\n\
|
||||||
Error: {e:?}");
|
Error: {e:?}");
|
||||||
// TODO: We should properly close down the socket in this case, but to do that,
|
conn.invalid_read = true; // Closed mio_manager
|
||||||
// we need to have the facilities in the Rosenpass IO handling system to close
|
|
||||||
// open connections.
|
|
||||||
// Just leaving the API connections dangling for now.
|
|
||||||
// This should be fixed for non-experimental use of the API.
|
|
||||||
conn.invalid_read = true;
|
|
||||||
break Ok(None);
|
break Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -235,13 +250,20 @@ pub trait MioConnectionContext {
|
|||||||
The connection is broken. Stopping to process messages of the client.\n\
|
The connection is broken. Stopping to process messages of the client.\n\
|
||||||
Error: {e:?}"
|
Error: {e:?}"
|
||||||
);
|
);
|
||||||
// TODO: Same as above
|
conn.invalid_read = true; // closed later by mio_manager
|
||||||
conn.invalid_read = true;
|
|
||||||
break Err(e.into());
|
break Err(e.into());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn mio_token(&self) -> mio::Token {
|
||||||
|
self.mio_connection().mio_token()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn should_close(&self) -> bool {
|
||||||
|
self.mio_connection().shoud_close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trait MioConnectionContextPrivate: MioConnectionContext {
|
trait MioConnectionContextPrivate: MioConnectionContext {
|
||||||
|
|||||||
@@ -1,20 +1,25 @@
|
|||||||
use std::{
|
use std::{borrow::BorrowMut, io};
|
||||||
borrow::{Borrow, BorrowMut},
|
|
||||||
io,
|
|
||||||
};
|
|
||||||
|
|
||||||
use mio::net::{UnixListener, UnixStream};
|
use mio::net::{UnixListener, UnixStream};
|
||||||
|
|
||||||
use rosenpass_util::{io::nonblocking_handle_io_errors, mio::interest::RW as MIO_RW};
|
use rosenpass_util::{
|
||||||
|
functional::ApplyExt, io::nonblocking_handle_io_errors, mio::interest::RW as MIO_RW,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::app_server::AppServer;
|
use crate::app_server::{AppServer, AppServerIoSource};
|
||||||
|
|
||||||
use super::{MioConnection, MioConnectionContext};
|
use super::{MioConnection, MioConnectionContext};
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct MioManager {
|
pub struct MioManager {
|
||||||
listeners: Vec<UnixListener>,
|
listeners: Vec<UnixListener>,
|
||||||
connections: Vec<MioConnection>,
|
connections: Vec<Option<MioConnection>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
|
||||||
|
pub enum MioManagerIoSource {
|
||||||
|
Listener(usize),
|
||||||
|
Connection(usize),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MioManager {
|
impl MioManager {
|
||||||
@@ -42,18 +47,49 @@ pub trait MioManagerContext {
|
|||||||
|
|
||||||
fn add_listener(&mut self, mut listener: UnixListener) -> io::Result<()> {
|
fn add_listener(&mut self, mut listener: UnixListener) -> io::Result<()> {
|
||||||
let srv = self.app_server_mut();
|
let srv = self.app_server_mut();
|
||||||
srv.mio_poll.registry().register(
|
let mio_token = srv.mio_token_dispenser.dispense();
|
||||||
&mut listener,
|
srv.mio_poll
|
||||||
srv.mio_token_dispenser.dispense(),
|
.registry()
|
||||||
MIO_RW,
|
.register(&mut listener, mio_token, MIO_RW)?;
|
||||||
)?;
|
let io_source = self
|
||||||
|
.mio_manager()
|
||||||
|
.listeners
|
||||||
|
.len()
|
||||||
|
.apply(MioManagerIoSource::Listener)
|
||||||
|
.apply(AppServerIoSource::MioManager);
|
||||||
self.mio_manager_mut().listeners.push(listener);
|
self.mio_manager_mut().listeners.push(listener);
|
||||||
|
self.app_server_mut()
|
||||||
|
.register_io_source(mio_token, io_source);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_connection(&mut self, connection: UnixStream) -> io::Result<()> {
|
fn add_connection(&mut self, connection: UnixStream) -> io::Result<()> {
|
||||||
let connection = MioConnection::new(self.app_server_mut(), connection)?;
|
let connection = MioConnection::new(self.app_server_mut(), connection)?;
|
||||||
self.mio_manager_mut().connections.push(connection);
|
let mio_token = connection.mio_token();
|
||||||
|
let conns: &mut Vec<Option<MioConnection>> =
|
||||||
|
self.mio_manager_mut().connections.borrow_mut();
|
||||||
|
let idx = conns
|
||||||
|
.iter_mut()
|
||||||
|
.enumerate()
|
||||||
|
.find(|(_, slot)| slot.is_some())
|
||||||
|
.map(|(idx, _)| idx)
|
||||||
|
.unwrap_or(conns.len());
|
||||||
|
conns.insert(idx, Some(connection));
|
||||||
|
let io_source = idx
|
||||||
|
.apply(MioManagerIoSource::Listener)
|
||||||
|
.apply(AppServerIoSource::MioManager);
|
||||||
|
self.app_server_mut()
|
||||||
|
.register_io_source(mio_token, io_source);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_particular(&mut self, io_source: MioManagerIoSource) -> anyhow::Result<()> {
|
||||||
|
use MioManagerIoSource as S;
|
||||||
|
match io_source {
|
||||||
|
S::Listener(idx) => self.accept_from(idx)?,
|
||||||
|
S::Connection(idx) => self.poll_particular_connection(idx)?,
|
||||||
|
};
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -88,16 +124,37 @@ pub trait MioManagerContext {
|
|||||||
|
|
||||||
fn poll_connections(&mut self) -> anyhow::Result<()> {
|
fn poll_connections(&mut self) -> anyhow::Result<()> {
|
||||||
for idx in 0..self.mio_manager().connections.len() {
|
for idx in 0..self.mio_manager().connections.len() {
|
||||||
let mut foc: MioConnectionFocus<Self> = MioConnectionFocus::new(self, idx);
|
self.poll_particular_connection(idx)?;
|
||||||
foc.poll()?;
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_particular_connection(&mut self, idx: usize) -> anyhow::Result<()> {
|
||||||
|
if self.mio_manager().connections[idx].is_none() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut conn = MioConnectionFocus::new(self, idx);
|
||||||
|
conn.poll()?;
|
||||||
|
|
||||||
|
if conn.should_close() {
|
||||||
|
let conn = self.mio_manager_mut().connections[idx].take().unwrap();
|
||||||
|
let mio_token = conn.mio_token();
|
||||||
|
if let Err(e) = conn.close(self.app_server_mut()) {
|
||||||
|
log::warn!("Error while closing API connection {e:?}");
|
||||||
|
};
|
||||||
|
self.app_server_mut().unregister_io_source(mio_token);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: ?Sized + MioManagerContext> MioConnectionContext for MioConnectionFocus<'_, T> {
|
impl<T: ?Sized + MioManagerContext> MioConnectionContext for MioConnectionFocus<'_, T> {
|
||||||
fn mio_connection(&self) -> &MioConnection {
|
fn mio_connection(&self) -> &MioConnection {
|
||||||
self.ctx.mio_manager().connections[self.conn_idx].borrow()
|
self.ctx.mio_manager().connections[self.conn_idx]
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn app_server(&self) -> &AppServer {
|
fn app_server(&self) -> &AppServer {
|
||||||
@@ -105,7 +162,9 @@ impl<T: ?Sized + MioManagerContext> MioConnectionContext for MioConnectionFocus<
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn mio_connection_mut(&mut self) -> &mut MioConnection {
|
fn mio_connection_mut(&mut self) -> &mut MioConnection {
|
||||||
self.ctx.mio_manager_mut().connections[self.conn_idx].borrow_mut()
|
self.ctx.mio_manager_mut().connections[self.conn_idx]
|
||||||
|
.as_mut()
|
||||||
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn app_server_mut(&mut self) -> &mut AppServer {
|
fn app_server_mut(&mut self) -> &mut AppServer {
|
||||||
|
|||||||
@@ -10,6 +10,12 @@ use rosenpass_secret_memory::Public;
|
|||||||
use rosenpass_secret_memory::Secret;
|
use rosenpass_secret_memory::Secret;
|
||||||
use rosenpass_util::build::ConstructionSite;
|
use rosenpass_util::build::ConstructionSite;
|
||||||
use rosenpass_util::file::StoreValueB64;
|
use rosenpass_util::file::StoreValueB64;
|
||||||
|
use rosenpass_util::functional::run;
|
||||||
|
use rosenpass_util::functional::ApplyExt;
|
||||||
|
use rosenpass_util::io::IoResultKindHintExt;
|
||||||
|
use rosenpass_util::io::SubstituteForIoErrorKindExt;
|
||||||
|
use rosenpass_util::option::SomeExt;
|
||||||
|
use rosenpass_util::result::OkExt;
|
||||||
use rosenpass_wireguard_broker::WireguardBrokerMio;
|
use rosenpass_wireguard_broker::WireguardBrokerMio;
|
||||||
use rosenpass_wireguard_broker::{WireguardBrokerCfg, WG_KEY_LEN};
|
use rosenpass_wireguard_broker::{WireguardBrokerCfg, WG_KEY_LEN};
|
||||||
use zerocopy::AsBytes;
|
use zerocopy::AsBytes;
|
||||||
@@ -17,7 +23,9 @@ use zerocopy::AsBytes;
|
|||||||
use std::cell::Cell;
|
use std::cell::Cell;
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::collections::VecDeque;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
use std::io;
|
||||||
use std::io::stdout;
|
use std::io::stdout;
|
||||||
use std::io::ErrorKind;
|
use std::io::ErrorKind;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
@@ -143,6 +151,17 @@ pub struct AppServerTest {
|
|||||||
pub termination_handler: Option<std::sync::mpsc::Receiver<()>>,
|
pub termination_handler: Option<std::sync::mpsc::Receiver<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
|
||||||
|
pub enum AppServerIoSource {
|
||||||
|
Socket(usize),
|
||||||
|
#[cfg(feature = "experiment_api")]
|
||||||
|
PskBroker(Public<BROKER_ID_BYTES>),
|
||||||
|
#[cfg(feature = "experiment_api")]
|
||||||
|
MioManager(crate::api::mio::MioManagerIoSource),
|
||||||
|
}
|
||||||
|
|
||||||
|
const EVENT_CAPACITY: usize = 20;
|
||||||
|
|
||||||
/// Holds the state of the application, namely the external IO
|
/// Holds the state of the application, namely the external IO
|
||||||
///
|
///
|
||||||
/// Responsible for file IO, network IO
|
/// Responsible for file IO, network IO
|
||||||
@@ -152,6 +171,9 @@ pub struct AppServer {
|
|||||||
pub crypto_site: ConstructionSite<BuildCryptoServer, CryptoServer>,
|
pub crypto_site: ConstructionSite<BuildCryptoServer, CryptoServer>,
|
||||||
pub sockets: Vec<mio::net::UdpSocket>,
|
pub sockets: Vec<mio::net::UdpSocket>,
|
||||||
pub events: mio::Events,
|
pub events: mio::Events,
|
||||||
|
pub short_poll_queue: VecDeque<mio::event::Event>,
|
||||||
|
pub performed_long_poll: bool,
|
||||||
|
pub io_source_index: HashMap<mio::Token, AppServerIoSource>,
|
||||||
pub mio_poll: mio::Poll,
|
pub mio_poll: mio::Poll,
|
||||||
pub mio_token_dispenser: MioTokenDispenser,
|
pub mio_token_dispenser: MioTokenDispenser,
|
||||||
pub brokers: BrokerStore,
|
pub brokers: BrokerStore,
|
||||||
@@ -521,7 +543,7 @@ impl AppServer {
|
|||||||
) -> anyhow::Result<Self> {
|
) -> anyhow::Result<Self> {
|
||||||
// setup mio
|
// setup mio
|
||||||
let mio_poll = mio::Poll::new()?;
|
let mio_poll = mio::Poll::new()?;
|
||||||
let events = mio::Events::with_capacity(20);
|
let events = mio::Events::with_capacity(EVENT_CAPACITY);
|
||||||
let mut mio_token_dispenser = MioTokenDispenser::default();
|
let mut mio_token_dispenser = MioTokenDispenser::default();
|
||||||
|
|
||||||
// bind each SocketAddr to a socket
|
// bind each SocketAddr to a socket
|
||||||
@@ -596,12 +618,14 @@ impl AppServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// register all sockets to mio
|
// register all sockets to mio
|
||||||
for socket in sockets.iter_mut() {
|
let mut io_source_index = HashMap::new();
|
||||||
mio_poll.registry().register(
|
for (idx, socket) in sockets.iter_mut().enumerate() {
|
||||||
socket,
|
let mio_token = mio_token_dispenser.dispense();
|
||||||
mio_token_dispenser.dispense(),
|
mio_poll
|
||||||
Interest::READABLE,
|
.registry()
|
||||||
)?;
|
.register(socket, mio_token, Interest::READABLE)?;
|
||||||
|
let prev = io_source_index.insert(mio_token, AppServerIoSource::Socket(idx));
|
||||||
|
assert!(prev.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
let crypto_site = match keypair {
|
let crypto_site = match keypair {
|
||||||
@@ -615,6 +639,9 @@ impl AppServer {
|
|||||||
verbosity,
|
verbosity,
|
||||||
sockets,
|
sockets,
|
||||||
events,
|
events,
|
||||||
|
short_poll_queue: Default::default(),
|
||||||
|
performed_long_poll: false,
|
||||||
|
io_source_index,
|
||||||
mio_poll,
|
mio_poll,
|
||||||
mio_token_dispenser,
|
mio_token_dispenser,
|
||||||
brokers: BrokerStore::default(),
|
brokers: BrokerStore::default(),
|
||||||
@@ -646,41 +673,57 @@ impl AppServer {
|
|||||||
matches!(self.verbosity, Verbosity::Verbose)
|
matches!(self.verbosity, Verbosity::Verbose)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn register_listen_socket(&mut self, mut sock: mio::net::UdpSocket) -> anyhow::Result<()> {
|
||||||
|
let mio_token = self.mio_token_dispenser.dispense();
|
||||||
|
self.mio_poll
|
||||||
|
.registry()
|
||||||
|
.register(&mut sock, mio_token, mio::Interest::READABLE)?;
|
||||||
|
let io_source = self.sockets.len().apply(AppServerIoSource::Socket);
|
||||||
|
self.sockets.push(sock);
|
||||||
|
self.register_io_source(mio_token, io_source);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_io_source(&mut self, token: mio::Token, io_source: AppServerIoSource) {
|
||||||
|
let prev = self.io_source_index.insert(token, io_source);
|
||||||
|
assert!(prev.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unregister_io_source(&mut self, token: mio::Token) {
|
||||||
|
let value = self.io_source_index.remove(&token);
|
||||||
|
assert!(value.is_some(), "Removed IO source that does not exist");
|
||||||
|
}
|
||||||
|
|
||||||
pub fn register_broker(
|
pub fn register_broker(
|
||||||
&mut self,
|
&mut self,
|
||||||
broker: Box<dyn WireguardBrokerMio<Error = anyhow::Error, MioError = anyhow::Error>>,
|
broker: Box<dyn WireguardBrokerMio<Error = anyhow::Error, MioError = anyhow::Error>>,
|
||||||
) -> Result<BrokerStorePtr> {
|
) -> Result<BrokerStorePtr> {
|
||||||
let ptr = Public::from_slice((self.brokers.store.len() as u64).as_bytes());
|
let ptr = Public::from_slice((self.brokers.store.len() as u64).as_bytes());
|
||||||
|
|
||||||
if self.brokers.store.insert(ptr, broker).is_some() {
|
if self.brokers.store.insert(ptr, broker).is_some() {
|
||||||
bail!("Broker already registered");
|
bail!("Broker already registered");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mio_token = self.mio_token_dispenser.dispense();
|
||||||
|
let io_source = ptr.apply(AppServerIoSource::PskBroker);
|
||||||
//Register broker
|
//Register broker
|
||||||
self.brokers
|
self.brokers
|
||||||
.store
|
.store
|
||||||
.get_mut(&ptr)
|
.get_mut(&ptr)
|
||||||
.ok_or(anyhow::format_err!("Broker wasn't added to registry"))?
|
.ok_or(anyhow::format_err!("Broker wasn't added to registry"))?
|
||||||
.register(
|
.register(self.mio_poll.registry(), mio_token)?;
|
||||||
self.mio_poll.registry(),
|
self.register_io_source(mio_token, io_source);
|
||||||
self.mio_token_dispenser.dispense(),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(BrokerStorePtr(ptr))
|
Ok(BrokerStorePtr(ptr))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn unregister_broker(&mut self, ptr: BrokerStorePtr) -> Result<()> {
|
pub fn unregister_broker(&mut self, ptr: BrokerStorePtr) -> Result<()> {
|
||||||
//Unregister broker
|
let mut broker = self
|
||||||
self.brokers
|
.brokers
|
||||||
.store
|
|
||||||
.get_mut(&ptr.0)
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("Broker not found"))?
|
|
||||||
.unregister(self.mio_poll.registry())?;
|
|
||||||
|
|
||||||
//Remove broker from store
|
|
||||||
self.brokers
|
|
||||||
.store
|
.store
|
||||||
.remove(&ptr.0)
|
.remove(&ptr.0)
|
||||||
.ok_or_else(|| anyhow::anyhow!("Broker not found"))?;
|
.context("Broker not found")?;
|
||||||
|
self.unregister_io_source(broker.mio_token().unwrap());
|
||||||
|
broker.unregister(self.mio_poll.registry())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -998,22 +1041,33 @@ impl AppServer {
|
|||||||
// readiness event seems to be good enough™ for now.
|
// readiness event seems to be good enough™ for now.
|
||||||
|
|
||||||
// only poll if we drained all sockets before
|
// only poll if we drained all sockets before
|
||||||
if self.all_sockets_drained {
|
run(|| -> anyhow::Result<()> {
|
||||||
//Non blocked polling
|
if !self.all_sockets_drained || !self.short_poll_queue.is_empty() {
|
||||||
self.mio_poll
|
self.unpolled_count += 1;
|
||||||
.poll(&mut self.events, Some(Duration::from_secs(0)))?;
|
return Ok(());
|
||||||
|
|
||||||
if self.events.iter().peekable().peek().is_none() {
|
|
||||||
// if there are no events, then add to blocking poll count
|
|
||||||
self.blocking_polls_count += 1;
|
|
||||||
//Execute blocking poll
|
|
||||||
self.mio_poll.poll(&mut self.events, Some(timeout))?;
|
|
||||||
} else {
|
|
||||||
self.non_blocking_polls_count += 1;
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
self.unpolled_count += 1;
|
self.perform_mio_poll_and_register_events(Duration::from_secs(0))?; // Non-blocking poll
|
||||||
}
|
if !self.short_poll_queue.is_empty() {
|
||||||
|
// Got some events in non-blocking mode
|
||||||
|
self.non_blocking_polls_count += 1;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
if !self.performed_long_poll {
|
||||||
|
// pass – go perform a full long poll before we enter blocking poll mode
|
||||||
|
// to make sure our experimental short poll feature did not miss any events
|
||||||
|
// due to being buggy.
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform and register blocking poll
|
||||||
|
self.blocking_polls_count += 1;
|
||||||
|
self.perform_mio_poll_and_register_events(timeout)?;
|
||||||
|
self.performed_long_poll = false;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
if let Some(AppServerTest {
|
if let Some(AppServerTest {
|
||||||
enable_dos_permanently: true,
|
enable_dos_permanently: true,
|
||||||
@@ -1048,26 +1102,58 @@ impl AppServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Focused polling – i.e. actually using mio::Token – is experimental for now.
|
||||||
|
// The reason for this is that we need to figure out how to integrate load detection
|
||||||
|
// and focused polling for one. Mio event-based polling also does not play nice with
|
||||||
|
// the current function signature and its reentrant design which is focused around receiving UDP socket packages
|
||||||
|
// for processing by the crypto protocol server.
|
||||||
|
// Besides that, there are also some parts of the code which intentionally block
|
||||||
|
// despite available data. This is the correct behavior; e.g. api::mio::Connection blocks
|
||||||
|
// further reads from its unix socket until the write buffer is flushed. In other words
|
||||||
|
// the connection handler makes sure that there is a buffer to put the response in while
|
||||||
|
// before reading further request.
|
||||||
|
// The potential problem with this behavior is that we end up ignoring instructions from
|
||||||
|
// epoll() to read from the particular sockets, so epoll will return information about that
|
||||||
|
// particular – blocked – file descriptor every call. We have only so many event slots and
|
||||||
|
// in theory, the event array could fill up entirely with intentionally blocked sockets.
|
||||||
|
// We need to figure out how to deal with this situation.
|
||||||
|
// Mio uses uses epoll in level-triggered mode, so we could handle taint-tracking for ignored
|
||||||
|
// sockets ourselves. The facilities are available in epoll and Mio, but we need to figure out how mio uses those
|
||||||
|
// facilities and how we can integrate them here.
|
||||||
|
// This will involve rewriting a lot of IO code and we should probably have integration
|
||||||
|
// tests before we approach that.
|
||||||
|
//
|
||||||
|
// This hybrid approach is not without merit though; the short poll implementation covers
|
||||||
|
// all our IO sources, so under contention, rosenpass should generally not hit the long
|
||||||
|
// poll mode below. We keep short polling and calling epoll() in non-blocking mode (timeout
|
||||||
|
// of zero) until we run out of IO events processed. Then, just before we would perform a
|
||||||
|
// blocking poll, we go through all available IO sources to see if we missed anything.
|
||||||
|
{
|
||||||
|
while let Some(ev) = self.short_poll_queue.pop_front() {
|
||||||
|
if let Some(v) = self.try_recv_from_mio_token(buf, ev.token())? {
|
||||||
|
return Ok(Some(v));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// drain all sockets
|
// drain all sockets
|
||||||
let mut would_block_count = 0;
|
let mut would_block_count = 0;
|
||||||
for (sock_no, socket) in self.sockets.iter_mut().enumerate() {
|
for sock_no in 0..self.sockets.len() {
|
||||||
match socket.recv_from(buf) {
|
match self
|
||||||
Ok((n, addr)) => {
|
.try_recv_from_listen_socket(buf, sock_no)
|
||||||
|
.io_err_kind_hint()
|
||||||
|
{
|
||||||
|
Ok(None) => continue,
|
||||||
|
Ok(Some(v)) => {
|
||||||
// at least one socket was not drained...
|
// at least one socket was not drained...
|
||||||
self.all_sockets_drained = false;
|
self.all_sockets_drained = false;
|
||||||
return Ok(Some((
|
return Ok(Some(v));
|
||||||
n,
|
|
||||||
Endpoint::SocketBoundAddress(SocketBoundEndpoint::new(
|
|
||||||
SocketPtr(sock_no),
|
|
||||||
addr,
|
|
||||||
)),
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
Err(e) if e.kind() == ErrorKind::WouldBlock => {
|
Err((_, ErrorKind::WouldBlock)) => {
|
||||||
would_block_count += 1;
|
would_block_count += 1;
|
||||||
}
|
}
|
||||||
// TODO if one socket continuously returns an error, then we never poll, thus we never wait for a timeout, thus we have a spin-lock
|
// TODO if one socket continuously returns an error, then we never poll, thus we never wait for a timeout, thus we have a spin-lock
|
||||||
Err(e) => return Err(e.into()),
|
Err((e, _)) => return Err(e)?,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1087,9 +1173,87 @@ impl AppServer {
|
|||||||
MioManagerFocus(self).poll()?;
|
MioManagerFocus(self).poll()?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.performed_long_poll = true;
|
||||||
|
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn perform_mio_poll_and_register_events(&mut self, timeout: Duration) -> io::Result<()> {
|
||||||
|
self.mio_poll.poll(&mut self.events, Some(timeout))?;
|
||||||
|
// Fill the short poll buffer with the acquired events
|
||||||
|
self.events
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.for_each(|v| self.short_poll_queue.push_back(v));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_recv_from_mio_token(
|
||||||
|
&mut self,
|
||||||
|
buf: &mut [u8],
|
||||||
|
token: mio::Token,
|
||||||
|
) -> anyhow::Result<Option<(usize, Endpoint)>> {
|
||||||
|
let io_source = match self.io_source_index.get(&token) {
|
||||||
|
Some(io_source) => *io_source,
|
||||||
|
None => {
|
||||||
|
log::warn!("No IO source assiociated with mio token ({token:?}). Polling using mio tokens directly is an experimental feature and IO handler should recover when all available io sources are polled. This is a developer error. Please report it.");
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.try_recv_from_io_source(buf, io_source)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_recv_from_io_source(
|
||||||
|
&mut self,
|
||||||
|
buf: &mut [u8],
|
||||||
|
io_source: AppServerIoSource,
|
||||||
|
) -> anyhow::Result<Option<(usize, Endpoint)>> {
|
||||||
|
use crate::api::mio::MioManagerContext;
|
||||||
|
|
||||||
|
match io_source {
|
||||||
|
AppServerIoSource::Socket(idx) => self
|
||||||
|
.try_recv_from_listen_socket(buf, idx)
|
||||||
|
.substitute_for_ioerr_wouldblock(None)?
|
||||||
|
.ok(),
|
||||||
|
|
||||||
|
#[cfg(feature = "experiment_api")]
|
||||||
|
AppServerIoSource::PskBroker(key) => self
|
||||||
|
.brokers
|
||||||
|
.store
|
||||||
|
.get_mut(&key)
|
||||||
|
.with_context(|| format!("No PSK broker under key {key:?}"))?
|
||||||
|
.process_poll()
|
||||||
|
.map(|_| None),
|
||||||
|
|
||||||
|
#[cfg(feature = "experiment_api")]
|
||||||
|
AppServerIoSource::MioManager(mmio_src) => MioManagerFocus(self)
|
||||||
|
.poll_particular(mmio_src)
|
||||||
|
.map(|_| None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_recv_from_listen_socket(
|
||||||
|
&mut self,
|
||||||
|
buf: &mut [u8],
|
||||||
|
idx: usize,
|
||||||
|
) -> io::Result<Option<(usize, Endpoint)>> {
|
||||||
|
use std::io::ErrorKind as K;
|
||||||
|
let (n, addr) = loop {
|
||||||
|
match self.sockets[idx].recv_from(buf).io_err_kind_hint() {
|
||||||
|
Ok(v) => break v,
|
||||||
|
Err((_, K::Interrupted)) => continue,
|
||||||
|
Err((e, _)) => return Err(e)?,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
SocketPtr(idx)
|
||||||
|
.apply(|sp| SocketBoundEndpoint::new(sp, addr))
|
||||||
|
.apply(Endpoint::SocketBoundAddress)
|
||||||
|
.apply(|ep| (n, ep))
|
||||||
|
.some()
|
||||||
|
.ok()
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "experiment_api")]
|
#[cfg(feature = "experiment_api")]
|
||||||
pub fn add_api_connection(&mut self, connection: mio::net::UnixStream) -> std::io::Result<()> {
|
pub fn add_api_connection(&mut self, connection: mio::net::UnixStream) -> std::io::Result<()> {
|
||||||
use crate::api::mio::MioManagerContext;
|
use crate::api::mio::MioManagerContext;
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
use std::{
|
use std::{
|
||||||
io::{BufRead, BufReader},
|
borrow::Borrow,
|
||||||
|
io::{BufRead, BufReader, Write},
|
||||||
os::unix::net::UnixStream,
|
os::unix::net::UnixStream,
|
||||||
process::Stdio,
|
process::Stdio,
|
||||||
thread::sleep,
|
thread::sleep,
|
||||||
@@ -7,6 +8,7 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
|
use command_fds::{CommandFdExt, FdMapping};
|
||||||
use hex_literal::hex;
|
use hex_literal::hex;
|
||||||
use rosenpass::api::{
|
use rosenpass::api::{
|
||||||
self, add_listen_socket_response_status, add_psk_broker_response_status,
|
self, add_listen_socket_response_status, add_psk_broker_response_status,
|
||||||
@@ -15,16 +17,27 @@ use rosenpass::api::{
|
|||||||
use rosenpass_util::{
|
use rosenpass_util::{
|
||||||
b64::B64Display,
|
b64::B64Display,
|
||||||
file::LoadValueB64,
|
file::LoadValueB64,
|
||||||
|
io::IoErrorKind,
|
||||||
length_prefix_encoding::{decoder::LengthPrefixDecoder, encoder::LengthPrefixEncoder},
|
length_prefix_encoding::{decoder::LengthPrefixDecoder, encoder::LengthPrefixEncoder},
|
||||||
|
mem::{DiscardResultExt, MoveExt},
|
||||||
mio::WriteWithFileDescriptors,
|
mio::WriteWithFileDescriptors,
|
||||||
zerocopy::ZerocopySliceExt,
|
zerocopy::ZerocopySliceExt,
|
||||||
};
|
};
|
||||||
use rustix::fd::AsFd;
|
use rustix::fd::{AsFd, AsRawFd};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use zerocopy::AsBytes;
|
use zerocopy::AsBytes;
|
||||||
|
|
||||||
use rosenpass::protocol::SymKey;
|
use rosenpass::protocol::SymKey;
|
||||||
|
|
||||||
|
struct KillChild(std::process::Child);
|
||||||
|
|
||||||
|
impl Drop for KillChild {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.0.kill().discard_result();
|
||||||
|
self.0.wait().discard_result()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn api_integration_api_setup() -> anyhow::Result<()> {
|
fn api_integration_api_setup() -> anyhow::Result<()> {
|
||||||
rosenpass_secret_memory::policy::secret_policy_use_only_malloc_secrets();
|
rosenpass_secret_memory::policy::secret_policy_use_only_malloc_secrets();
|
||||||
@@ -111,28 +124,42 @@ fn api_integration_api_setup() -> anyhow::Result<()> {
|
|||||||
peer_a.commit()?;
|
peer_a.commit()?;
|
||||||
peer_b.commit()?;
|
peer_b.commit()?;
|
||||||
|
|
||||||
|
let (deliberate_fail_api_client, deliberate_fail_api_server) =
|
||||||
|
std::os::unix::net::UnixStream::pair()?;
|
||||||
|
let deliberate_fail_child_fd = 3;
|
||||||
|
|
||||||
// Start peer a
|
// Start peer a
|
||||||
let _proc_a = std::process::Command::new(env!("CARGO_BIN_EXE_rosenpass"))
|
let _proc_a = KillChild(
|
||||||
.args([
|
std::process::Command::new(env!("CARGO_BIN_EXE_rosenpass"))
|
||||||
"exchange-config",
|
.args(["--api-stream-fd", &deliberate_fail_child_fd.to_string()])
|
||||||
peer_a.config_file_path.to_str().context("")?,
|
.fd_mappings(vec![FdMapping {
|
||||||
])
|
parent_fd: deliberate_fail_api_server.move_here().as_raw_fd(),
|
||||||
.stdin(Stdio::null())
|
child_fd: 3,
|
||||||
.stdout(Stdio::null())
|
}])?
|
||||||
.spawn()?;
|
.args([
|
||||||
|
"exchange-config",
|
||||||
|
peer_a.config_file_path.to_str().context("")?,
|
||||||
|
])
|
||||||
|
.stdin(Stdio::null())
|
||||||
|
.stdout(Stdio::null())
|
||||||
|
.spawn()?,
|
||||||
|
);
|
||||||
|
|
||||||
// Start peer b
|
// Start peer b
|
||||||
let proc_b = std::process::Command::new(env!("CARGO_BIN_EXE_rosenpass"))
|
let mut proc_b = KillChild(
|
||||||
.args([
|
std::process::Command::new(env!("CARGO_BIN_EXE_rosenpass"))
|
||||||
"exchange-config",
|
.args([
|
||||||
peer_b.config_file_path.to_str().context("")?,
|
"exchange-config",
|
||||||
])
|
peer_b.config_file_path.to_str().context("")?,
|
||||||
.stdin(Stdio::null())
|
])
|
||||||
.stdout(Stdio::piped())
|
.stdin(Stdio::null())
|
||||||
.spawn()?;
|
.stderr(Stdio::null())
|
||||||
|
.stdout(Stdio::piped())
|
||||||
|
.spawn()?,
|
||||||
|
);
|
||||||
|
|
||||||
// Acquire stdout
|
// Acquire stdout
|
||||||
let mut out_b = BufReader::new(proc_b.stdout.context("")?).lines();
|
let mut out_b = BufReader::new(proc_b.0.stdout.take().context("")?).lines();
|
||||||
|
|
||||||
// Now connect to the peers
|
// Now connect to the peers
|
||||||
let api_path = peer_a.api.listen_path[0].as_path();
|
let api_path = peer_a.api.listen_path[0].as_path();
|
||||||
@@ -173,6 +200,23 @@ fn api_integration_api_setup() -> anyhow::Result<()> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deliberately break API connection given via FD; this checks that the
|
||||||
|
// API connections are closed when invalid data is received and it also
|
||||||
|
// implicitly checks that other connections are unaffected
|
||||||
|
{
|
||||||
|
use std::io::ErrorKind as K;
|
||||||
|
let client = deliberate_fail_api_client;
|
||||||
|
let err = loop {
|
||||||
|
if let Err(e) = client.borrow().write(&[0xffu8; 16]) {
|
||||||
|
break e;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
assert!(matches!(
|
||||||
|
err.io_error_kind(),
|
||||||
|
K::ConnectionReset | K::BrokenPipe
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
// Send SupplyKeypairRequest
|
// Send SupplyKeypairRequest
|
||||||
{
|
{
|
||||||
use rustix::fs::{open, Mode, OFlags};
|
use rustix::fs::{open, Mode, OFlags};
|
||||||
|
|||||||
@@ -8,16 +8,25 @@ use std::{
|
|||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use rosenpass::api;
|
use rosenpass::api;
|
||||||
use rosenpass_to::{ops::copy_slice_least_src, To};
|
use rosenpass_to::{ops::copy_slice_least_src, To};
|
||||||
use rosenpass_util::zerocopy::ZerocopySliceExt;
|
|
||||||
use rosenpass_util::{
|
use rosenpass_util::{
|
||||||
file::LoadValueB64,
|
file::LoadValueB64,
|
||||||
length_prefix_encoding::{decoder::LengthPrefixDecoder, encoder::LengthPrefixEncoder},
|
length_prefix_encoding::{decoder::LengthPrefixDecoder, encoder::LengthPrefixEncoder},
|
||||||
};
|
};
|
||||||
|
use rosenpass_util::{mem::DiscardResultExt, zerocopy::ZerocopySliceExt};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use zerocopy::AsBytes;
|
use zerocopy::AsBytes;
|
||||||
|
|
||||||
use rosenpass::protocol::SymKey;
|
use rosenpass::protocol::SymKey;
|
||||||
|
|
||||||
|
struct KillChild(std::process::Child);
|
||||||
|
|
||||||
|
impl Drop for KillChild {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.0.kill().discard_result();
|
||||||
|
self.0.wait().discard_result()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn api_integration_test() -> anyhow::Result<()> {
|
fn api_integration_test() -> anyhow::Result<()> {
|
||||||
rosenpass_secret_memory::policy::secret_policy_use_only_malloc_secrets();
|
rosenpass_secret_memory::policy::secret_policy_use_only_malloc_secrets();
|
||||||
@@ -93,28 +102,32 @@ fn api_integration_test() -> anyhow::Result<()> {
|
|||||||
peer_b.commit()?;
|
peer_b.commit()?;
|
||||||
|
|
||||||
// Start peer a
|
// Start peer a
|
||||||
let proc_a = std::process::Command::new(env!("CARGO_BIN_EXE_rosenpass"))
|
let mut proc_a = KillChild(
|
||||||
.args([
|
std::process::Command::new(env!("CARGO_BIN_EXE_rosenpass"))
|
||||||
"exchange-config",
|
.args([
|
||||||
peer_a.config_file_path.to_str().context("")?,
|
"exchange-config",
|
||||||
])
|
peer_a.config_file_path.to_str().context("")?,
|
||||||
.stdin(Stdio::null())
|
])
|
||||||
.stdout(Stdio::piped())
|
.stdin(Stdio::null())
|
||||||
.spawn()?;
|
.stdout(Stdio::piped())
|
||||||
|
.spawn()?,
|
||||||
|
);
|
||||||
|
|
||||||
// Start peer b
|
// Start peer b
|
||||||
let proc_b = std::process::Command::new(env!("CARGO_BIN_EXE_rosenpass"))
|
let mut proc_b = KillChild(
|
||||||
.args([
|
std::process::Command::new(env!("CARGO_BIN_EXE_rosenpass"))
|
||||||
"exchange-config",
|
.args([
|
||||||
peer_b.config_file_path.to_str().context("")?,
|
"exchange-config",
|
||||||
])
|
peer_b.config_file_path.to_str().context("")?,
|
||||||
.stdin(Stdio::null())
|
])
|
||||||
.stdout(Stdio::piped())
|
.stdin(Stdio::null())
|
||||||
.spawn()?;
|
.stdout(Stdio::piped())
|
||||||
|
.spawn()?,
|
||||||
|
);
|
||||||
|
|
||||||
// Acquire stdout
|
// Acquire stdout
|
||||||
let mut out_a = BufReader::new(proc_a.stdout.context("")?).lines();
|
let mut out_a = BufReader::new(proc_a.0.stdout.take().context("")?).lines();
|
||||||
let mut out_b = BufReader::new(proc_b.stdout.context("")?).lines();
|
let mut out_b = BufReader::new(proc_b.0.stdout.take().context("")?).lines();
|
||||||
|
|
||||||
// Wait for the keys to successfully exchange a key
|
// Wait for the keys to successfully exchange a key
|
||||||
let mut attempt = 0;
|
let mut attempt = 0;
|
||||||
|
|||||||
@@ -293,6 +293,7 @@ struct MockBrokerInner {
|
|||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
struct MockBroker {
|
struct MockBroker {
|
||||||
inner: Arc<Mutex<MockBrokerInner>>,
|
inner: Arc<Mutex<MockBrokerInner>>,
|
||||||
|
mio_token: Option<mio::Token>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WireguardBrokerMio for MockBroker {
|
impl WireguardBrokerMio for MockBroker {
|
||||||
@@ -301,8 +302,9 @@ impl WireguardBrokerMio for MockBroker {
|
|||||||
fn register(
|
fn register(
|
||||||
&mut self,
|
&mut self,
|
||||||
_registry: &mio::Registry,
|
_registry: &mio::Registry,
|
||||||
_token: mio::Token,
|
token: mio::Token,
|
||||||
) -> Result<(), Self::MioError> {
|
) -> Result<(), Self::MioError> {
|
||||||
|
self.mio_token = Some(token);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -311,8 +313,13 @@ impl WireguardBrokerMio for MockBroker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn unregister(&mut self, _registry: &mio::Registry) -> Result<(), Self::MioError> {
|
fn unregister(&mut self, _registry: &mio::Registry) -> Result<(), Self::MioError> {
|
||||||
|
self.mio_token = None;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn mio_token(&self) -> Option<mio::Token> {
|
||||||
|
self.mio_token
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl rosenpass_wireguard_broker::WireGuardBroker for MockBroker {
|
impl rosenpass_wireguard_broker::WireGuardBroker for MockBroker {
|
||||||
|
|||||||
@@ -52,6 +52,56 @@ impl<T, E: TryIoErrorKind> TryIoResultKindHintExt<T> for Result<T, E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub trait SubstituteForIoErrorKindExt<T>: Sized {
|
||||||
|
type Error;
|
||||||
|
fn substitute_for_ioerr_kind_with<F: FnOnce() -> T>(
|
||||||
|
self,
|
||||||
|
kind: io::ErrorKind,
|
||||||
|
f: F,
|
||||||
|
) -> Result<T, Self::Error>;
|
||||||
|
fn substitute_for_ioerr_kind(self, kind: io::ErrorKind, v: T) -> Result<T, Self::Error> {
|
||||||
|
self.substitute_for_ioerr_kind_with(kind, || v)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn substitute_for_ioerr_interrupted_with<F: FnOnce() -> T>(
|
||||||
|
self,
|
||||||
|
f: F,
|
||||||
|
) -> Result<T, Self::Error> {
|
||||||
|
self.substitute_for_ioerr_kind_with(io::ErrorKind::Interrupted, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn substitute_for_ioerr_interrupted(self, v: T) -> Result<T, Self::Error> {
|
||||||
|
self.substitute_for_ioerr_interrupted_with(|| v)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn substitute_for_ioerr_wouldblock_with<F: FnOnce() -> T>(
|
||||||
|
self,
|
||||||
|
f: F,
|
||||||
|
) -> Result<T, Self::Error> {
|
||||||
|
self.substitute_for_ioerr_kind_with(io::ErrorKind::WouldBlock, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn substitute_for_ioerr_wouldblock(self, v: T) -> Result<T, Self::Error> {
|
||||||
|
self.substitute_for_ioerr_wouldblock_with(|| v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E: TryIoErrorKind> SubstituteForIoErrorKindExt<T> for Result<T, E> {
|
||||||
|
type Error = E;
|
||||||
|
|
||||||
|
fn substitute_for_ioerr_kind_with<F: FnOnce() -> T>(
|
||||||
|
self,
|
||||||
|
kind: io::ErrorKind,
|
||||||
|
f: F,
|
||||||
|
) -> Result<T, Self::Error> {
|
||||||
|
match self.try_io_err_kind_hint() {
|
||||||
|
Ok(v) => Ok(v),
|
||||||
|
Err((_, Some(k))) if k == kind => Ok(f()),
|
||||||
|
Err((e, _)) => Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Automatically handles `std::io::ErrorKind::Interrupted`.
|
/// Automatically handles `std::io::ErrorKind::Interrupted`.
|
||||||
///
|
///
|
||||||
/// - If there is no error (i.e. on `Ok(r)`), the function will return `Ok(Some(r))`
|
/// - If there is no error (i.e. on `Ok(r)`), the function will return `Ok(Some(r))`
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ pub mod io;
|
|||||||
pub mod length_prefix_encoding;
|
pub mod length_prefix_encoding;
|
||||||
pub mod mem;
|
pub mod mem;
|
||||||
pub mod mio;
|
pub mod mio;
|
||||||
|
pub mod option;
|
||||||
pub mod ord;
|
pub mod ord;
|
||||||
pub mod result;
|
pub mod result;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
|
|||||||
@@ -136,3 +136,17 @@ impl<T: Default> SwapWithDefaultExt for T {
|
|||||||
self.swap_with(Self::default())
|
self.swap_with(Self::default())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub trait MoveExt {
|
||||||
|
/// Deliberately move the value
|
||||||
|
///
|
||||||
|
/// Usually employed to enforce an object being
|
||||||
|
/// dropped after use.
|
||||||
|
fn move_here(self) -> Self;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Sized> MoveExt for T {
|
||||||
|
fn move_here(self) -> Self {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
7
util/src/option.rs
Normal file
7
util/src/option.rs
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
pub trait SomeExt: Sized {
|
||||||
|
fn some(self) -> Option<Self> {
|
||||||
|
Some(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> SomeExt for T {}
|
||||||
@@ -16,6 +16,7 @@ use crate::{SerializedBrokerConfig, WireGuardBroker, WireguardBrokerMio};
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct MioBrokerClient {
|
pub struct MioBrokerClient {
|
||||||
inner: BrokerClient<MioBrokerClientIo>,
|
inner: BrokerClient<MioBrokerClientIo>,
|
||||||
|
mio_token: Option<mio::Token>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -59,7 +60,10 @@ impl MioBrokerClient {
|
|||||||
write_buffer,
|
write_buffer,
|
||||||
};
|
};
|
||||||
let inner = BrokerClient::new(io);
|
let inner = BrokerClient::new(io);
|
||||||
Self { inner }
|
Self {
|
||||||
|
inner,
|
||||||
|
mio_token: None,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll(&mut self) -> anyhow::Result<()> {
|
fn poll(&mut self) -> anyhow::Result<()> {
|
||||||
@@ -104,6 +108,7 @@ impl WireguardBrokerMio for MioBrokerClient {
|
|||||||
registry: &mio::Registry,
|
registry: &mio::Registry,
|
||||||
token: mio::Token,
|
token: mio::Token,
|
||||||
) -> Result<(), Self::MioError> {
|
) -> Result<(), Self::MioError> {
|
||||||
|
self.mio_token = Some(token);
|
||||||
registry.register(
|
registry.register(
|
||||||
&mut self.inner.io_mut().socket,
|
&mut self.inner.io_mut().socket,
|
||||||
token,
|
token,
|
||||||
@@ -118,9 +123,14 @@ impl WireguardBrokerMio for MioBrokerClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn unregister(&mut self, registry: &mio::Registry) -> Result<(), Self::MioError> {
|
fn unregister(&mut self, registry: &mio::Registry) -> Result<(), Self::MioError> {
|
||||||
|
self.mio_token = None;
|
||||||
registry.deregister(&mut self.inner.io_mut().socket)?;
|
registry.deregister(&mut self.inner.io_mut().socket)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn mio_token(&self) -> Option<mio::Token> {
|
||||||
|
self.mio_token
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BrokerClientIo for MioBrokerClientIo {
|
impl BrokerClientIo for MioBrokerClientIo {
|
||||||
|
|||||||
@@ -16,7 +16,9 @@ const MAX_B64_KEY_SIZE: usize = WG_KEY_LEN * 5 / 3;
|
|||||||
const MAX_B64_PEER_ID_SIZE: usize = WG_PEER_LEN * 5 / 3;
|
const MAX_B64_PEER_ID_SIZE: usize = WG_PEER_LEN * 5 / 3;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct NativeUnixBroker {}
|
pub struct NativeUnixBroker {
|
||||||
|
mio_token: Option<mio::Token>,
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for NativeUnixBroker {
|
impl Default for NativeUnixBroker {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
@@ -26,7 +28,7 @@ impl Default for NativeUnixBroker {
|
|||||||
|
|
||||||
impl NativeUnixBroker {
|
impl NativeUnixBroker {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {}
|
Self { mio_token: None }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -88,8 +90,9 @@ impl WireguardBrokerMio for NativeUnixBroker {
|
|||||||
fn register(
|
fn register(
|
||||||
&mut self,
|
&mut self,
|
||||||
_registry: &mio::Registry,
|
_registry: &mio::Registry,
|
||||||
_token: mio::Token,
|
token: mio::Token,
|
||||||
) -> Result<(), Self::MioError> {
|
) -> Result<(), Self::MioError> {
|
||||||
|
self.mio_token = Some(token);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -98,8 +101,13 @@ impl WireguardBrokerMio for NativeUnixBroker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn unregister(&mut self, _registry: &mio::Registry) -> Result<(), Self::MioError> {
|
fn unregister(&mut self, _registry: &mio::Registry) -> Result<(), Self::MioError> {
|
||||||
|
self.mio_token = None;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn mio_token(&self) -> Option<mio::Token> {
|
||||||
|
self.mio_token
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Builder)]
|
#[derive(Debug, Builder)]
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ pub trait WireguardBrokerMio: WireGuardBroker {
|
|||||||
registry: &mio::Registry,
|
registry: &mio::Registry,
|
||||||
token: mio::Token,
|
token: mio::Token,
|
||||||
) -> Result<(), Self::MioError>;
|
) -> Result<(), Self::MioError>;
|
||||||
|
fn mio_token(&self) -> Option<mio::Token>;
|
||||||
|
|
||||||
/// Run after a mio::poll operation
|
/// Run after a mio::poll operation
|
||||||
fn process_poll(&mut self) -> Result<(), Self::MioError>;
|
fn process_poll(&mut self) -> Result<(), Self::MioError>;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user