diff --git a/rosenpass/src/app_server.rs b/rosenpass/src/app_server.rs index 43cdc4a..da72aa0 100644 --- a/rosenpass/src/app_server.rs +++ b/rosenpass/src/app_server.rs @@ -1,3 +1,5 @@ +/// This contains the bulk of the rosenpass server IO handling code whereas +/// the actual cryptographic code lives in the [crate::protocol] module use anyhow::bail; use anyhow::Context; @@ -49,33 +51,78 @@ use crate::{ use rosenpass_util::attempt; use rosenpass_util::b64::B64Display; -const MAX_B64_KEY_SIZE: usize = 32 * 5 / 3; -const MAX_B64_PEER_ID_SIZE: usize = 32 * 5 / 3; +/// The maximum size of a base64 encoded symmetric key (estimate) +pub const MAX_B64_KEY_SIZE: usize = 32 * 5 / 3; +/// The maximum size of a base64 peer ID (estimate) +pub const MAX_B64_PEER_ID_SIZE: usize = 32 * 5 / 3; +/// The zero IPv4 address; this is generally used to tell network servers to choose any interface +/// when listening const IPV4_ANY_ADDR: Ipv4Addr = Ipv4Addr::new(0, 0, 0, 0); +/// The zero IPv6 address; this is generally used to tell network servers to choose any interface +/// when listening const IPV6_ANY_ADDR: Ipv6Addr = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0); +/// Ratio of blocking epoll(7) polls to non-blocking polls at which the rosenpass server +/// assumes it is under load (i.e. a DOS attack may be happening) const UNDER_LOAD_RATIO: f64 = 0.5; +/// Period at which the DOS detection code updates whether there is an "under load" status const DURATION_UPDATE_UNDER_LOAD_STATUS: Duration = Duration::from_millis(500); -const BROKER_ID_BYTES: usize = 8; +pub const BROKER_ID_BYTES: usize = 8; -fn ipv4_any_binding() -> SocketAddr { +/// IPv4 address that tells the network layer to listen on any interface +/// +/// # Examples +/// +/// See [AppServer::new]. +pub fn ipv4_any_binding() -> SocketAddr { // addr, port SocketAddr::V4(SocketAddrV4::new(IPV4_ANY_ADDR, 0)) } -fn ipv6_any_binding() -> SocketAddr { +/// IPv6 address that tells the network layer to listen on any interface +/// +/// # Examples +/// +/// See [AppServer::new]. +pub fn ipv6_any_binding() -> SocketAddr { // addr, port, flowinfo, scope_id SocketAddr::V6(SocketAddrV6::new(IPV6_ANY_ADDR, 0, 0, 0)) } +/// This is used to assign indices to MIO (epoll) event sources #[derive(Debug, Default)] pub struct MioTokenDispenser { - counter: usize, + pub counter: usize, } impl MioTokenDispenser { + /// Produces a single IO event source ID + /// + /// # Examples + /// + /// Use is quite straightforward: + /// + /// ``` + /// use rosenpass::app_server::MioTokenDispenser; + /// use mio::Token; + /// + /// let mut dispenser = MioTokenDispenser { + /// counter: 0 + /// }; + /// + /// let t1 = dispenser.dispense(); + /// let t2 = dispenser.dispense(); + /// + /// assert_ne!(t1, t2); + /// + /// // If you inspected the output, you would find that the dispenser is really just a counter. + /// // Though this is an implementation detail + /// assert_eq!(t1, Token(0)); + /// assert_eq!(t2, Token(1)); + /// ``` + /// pub fn dispense(&mut self) -> Token { let r = self.counter; self.counter += 1; @@ -83,42 +130,132 @@ impl MioTokenDispenser { } } +/// List of WireGuard brokers +/// +/// Each WireGuard peer ([AppPeer]) is assigned a broker ([AppPeer::broker_peer]). +/// +/// When a new has been exchanged, then its associated broker is called to transmit the key +/// to WireGuard (or to do something else). +/// +/// Brokers live in [AppServer::brokers]. They are added/removed from the AppServer via +/// [AppServer::register_broker] and [AppServer::unregister_broker]. PSKs are distributed +/// to their respective brokers via [AppPeerPtr::set_psk]. +/// +/// Note that the entire broker system is an experimental feature; it is not up to the +/// same quality standards as the rest of the code. In particular, the use of [BROKER_ID_BYTES] +/// and a hash map is simultaneously overengineered and not really that useful for what it is +/// doing. #[derive(Debug, Default)] pub struct BrokerStore { + /// The collection of WireGuard brokers. See [Self]. pub store: HashMap< Public, Box>, >, } +/// Reference to a broker imbued with utility methods #[derive(Debug, Clone)] pub struct BrokerStorePtr(pub Public); +/// This is the broker configuration for a particular broker peer #[derive(Debug)] pub struct BrokerPeer { + /// Reference to the broker used for this particular peer ptr: BrokerStorePtr, + /// Configuration for a WireGuard broker. + /// + /// This is woefully overengineered and there is very little reason why the broker + /// configuration should not live in the particular WireGuard broker. peer_cfg: Box, } impl BrokerPeer { + /// Create a broker peer pub fn new(ptr: BrokerStorePtr, peer_cfg: Box) -> Self { Self { ptr, peer_cfg } } + /// Retrieve the pointer to WireGuard PSK broker used with this peer pub fn ptr(&self) -> &BrokerStorePtr { &self.ptr } } +/// IO/BusinessLogic information for a particular protocol peer. +/// +/// There is a one-to-one correspondence between this struct and [crate::protocol::Peer]; +/// whereas the struct in the protocol module stores information specific to the cryptographic layer, +/// this struct stores information IO information. #[derive(Default, Debug)] pub struct AppPeer { + /// If set, then [AppServer::output_key] will write generated output keys + /// to a file configured here and produce information on standard out to + /// notify the calling process that pub outfile: Option, + /// If this option is set, then [AppServer::output_key] will send generated output + /// keys to the broker configured here pub broker_peer: Option, + /// This is the network address configured for a particular peer at program start. + /// + /// I.e. this is the address the rosenpass program will send [crate::msgs::InitHello] + /// packets to, trying to exchange a key. + /// + /// Note that the remote peer may connect with another address. See [Self::current_endpoint]. pub initial_endpoint: Option, + /// The network address currently used for a particular peer. + /// + /// This is not necessarily the address that was configured at program start (see [Self::initial_endpoint]), + /// because the remote peer can initiate handshakes from an arbitrary network address. + /// + /// If another peer successfully connects to this one from any address, then this field will + /// be updated to reflect which address this was. pub current_endpoint: Option, } impl AppPeer { + /// Retrieve the [Endpoint] associated with this peer. + /// + /// I.e. the [Self::current_endpoint] if set and [Self::initial_endpoint] otherwise. + /// + /// # Examples + /// + /// ``` + /// use rosenpass::app_server::{Endpoint, AppPeer}; + /// use rosenpass_util::functional::run; + /// + /// let mut peer = AppPeer { + /// outfile: None, + /// broker_peer: None, + /// initial_endpoint: Some(Endpoint::discovery_from_hostname("0.0.0.0:0".to_string())?), + /// current_endpoint: Some(Endpoint::discovery_from_hostname("0.0.0.0:1".to_string())?), + /// }; + /// + /// fn same(fst: Option<&Endpoint>, snd: Option<&Endpoint>) -> bool { + /// if fst.is_none() && snd.is_none() { + /// return true; + /// } + /// + /// run(|| Some(std::ptr::eq(fst?, snd?)) ) + /// .unwrap_or(fst.is_some() == snd.is_some()) + /// } + /// + /// assert!(same(peer.endpoint(), peer.current_endpoint.as_ref())); + /// + /// let mut tmp = None; + /// std::mem::swap(&mut tmp, &mut peer.initial_endpoint); + /// assert!(same(peer.endpoint(), peer.current_endpoint.as_ref())); + /// + /// std::mem::swap(&mut tmp, &mut peer.initial_endpoint); + /// std::mem::swap(&mut tmp, &mut peer.current_endpoint); + /// assert!(same(peer.endpoint(), peer.initial_endpoint.as_ref())); + /// + /// peer.initial_endpoint = None; + /// peer.current_endpoint = None; + /// assert!(peer.endpoint().is_none()); + /// + /// Ok::<(), anyhow::Error>(()) + /// ``` pub fn endpoint(&self) -> Option<&Endpoint> { self.current_endpoint .as_ref() @@ -126,6 +263,8 @@ impl AppPeer { } } +/// No longer in use since we have the broker system (see [BrokerPeer]) +/// TODO: Remove #[derive(Default, Debug)] pub struct WireguardOut { // impl KeyOutput @@ -134,12 +273,20 @@ pub struct WireguardOut { pub extra_params: Vec, } +/// Used to indicate whether the rosenpass server is in normal operating +/// conditions or under load (i.e. a DOS attack is happening) #[derive(Debug, Clone, Copy, PartialEq)] pub enum DoSOperation { UnderLoad, Normal, } /// Integration test helpers for AppServer +/// +/// TODO: Remove; this is no way to write integration tests +/// +/// # Examples +/// +/// See [AppServer] #[derive(Debug, Builder)] #[builder(pattern = "owned")] pub struct AppServerTest { @@ -151,43 +298,100 @@ pub struct AppServerTest { pub termination_handler: Option>, } +/// This represents a some source of IO operations in the context of the Rosenpass server +/// +/// I.e. this identifies some structure that could be marked as "ready for IO" by [mio] #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub enum AppServerIoSource { + /// IO source refers to a socket in [AppServer::sockets] Socket(usize), + /// IO source refers to a PSK broker in [AppServer::brokers] PskBroker(Public), + /// IO source refers to some IO sources used in the API; + /// see [AppServer::api_manager] #[cfg(feature = "experiment_api")] MioManager(crate::api::mio::MioManagerIoSource), } +/// Number of epoll(7) events Rosenpass can receive at a time const EVENT_CAPACITY: usize = 20; -/// Holds the state of the application, namely the external IO +/// This holds pretty much all of the state of the Rosenpass application +/// including the cryptographic state in [Self::crypto_site] /// -/// Responsible for file IO, network IO -// TODO add user control via unix domain socket and stdin/stdout +/// Responsible for file IO, network IO, etc… +/// +/// # Examples +/// +#[doc = "```ignore"] +#[doc = include_str!("../tests/app_server_example.rs")] +#[doc = "```"] #[derive(Debug)] pub struct AppServer { + /// Contains the actual cryptographic implementation. + /// + /// Because the API supports initializing the server with a keypair + /// and CryptoServer needs to be initialized with a keypair, the struct + /// struct is wrapped in a ConstructionSite + pub crypto_site: ConstructionSite, + /// The UDP sockets used to send and receive protocol messages + pub sockets: Vec, + /// Buffer for [mio] (epoll(7), async IO handling) IO events + pub events: mio::Events, + /// Supplemental buffer for [mio] events. See the inline documentation of [AppServer::try_recv] + /// for details. + pub short_poll_queue: VecDeque, + /// We have two different polling modes; long polling (legacy) and short polling (new and + /// somewhat experimental). See [AppServer::try_recv] for details + pub performed_long_poll: bool, + /// Events produced by [mio] refer to IO sources by a numeric token assigned to them + /// (see [MioTokenDispenser]). This index associateds mio token with the specific IO + /// source stored in this object + pub io_source_index: HashMap, + /// Asynchronous IO source + pub mio_poll: mio::Poll, + /// MIO associates IO sources with numeric tokens. This struct takes care of generating these + /// tokens + pub mio_token_dispenser: MioTokenDispenser, + /// Helpers handling communication with WireGuard; these take a generated key and forward it to + /// WireGuard + pub brokers: BrokerStore, + /// This is our view of the peers; generally every peer in here is associated with one peer in + /// CryptoServer + pub peers: Vec, + /// If set to [Verbosity::Verbose], then some extra information will be printed + /// at the info log level + pub verbosity: Verbosity, + /// Used by [AppServer::try_recv] to ensure that all packages have been read + /// from the UDP sockets + pub all_sockets_drained: bool, + /// Whether network message handling determined that a Denial of Service attack is happening + pub under_load: DoSOperation, + /// State kept by the [AppServer::try_recv] for polling + pub blocking_polls_count: usize, + /// State kept by the [AppServer::try_recv] for polling + pub non_blocking_polls_count: usize, + /// State kept by the [AppServer::try_recv] for polling + pub unpolled_count: usize, + /// State kept by the [AppServer::try_recv] for polling + pub last_update_time: Instant, + /// Used by integration tests to force [Self] into DoS condition + /// and to terminate the AppServer after the test is complete + pub test_helpers: Option, + /// Helper for integration tests running rosenpass as a subprocess + /// to terminate properly upon receiving an appropriate system signal. + /// + /// This is primarily needed for coverage testing, since llvm-cov does not + /// write coverage reports to disk when a process is stopped by the default + /// signal handler. + /// + /// See #[cfg(feature = "internal_signal_handling_for_coverage_reports")] pub term_signal: terminate::TerminateRequested, - pub crypto_site: ConstructionSite, - pub sockets: Vec, - pub events: mio::Events, - pub short_poll_queue: VecDeque, - pub performed_long_poll: bool, - pub io_source_index: HashMap, - pub mio_poll: mio::Poll, - pub mio_token_dispenser: MioTokenDispenser, - pub brokers: BrokerStore, - pub peers: Vec, - pub verbosity: Verbosity, - pub all_sockets_drained: bool, - pub under_load: DoSOperation, - pub blocking_polls_count: usize, - pub non_blocking_polls_count: usize, - pub unpolled_count: usize, - pub last_update_time: Instant, - pub test_helpers: Option, #[cfg(feature = "experiment_api")] + /// The Rosenpass unix socket API handler; this is an experimental + /// feature that can be used to embed Rosenpass in external applications + /// via communication by unix socket pub api_manager: crate::api::mio::MioManager, } @@ -201,14 +405,19 @@ pub struct AppServer { pub struct SocketPtr(pub usize); impl SocketPtr { + /// Retrieve the concrete udp socket associated with the pointer pub fn get<'a>(&self, srv: &'a AppServer) -> &'a mio::net::UdpSocket { &srv.sockets[self.0] } + /// Retrieve the concrete udp socket associated with the pointer, mutably pub fn get_mut<'a>(&self, srv: &'a mut AppServer) -> &'a mut mio::net::UdpSocket { &mut srv.sockets[self.0] } + /// Send a UDP packet to another address. + /// + /// Merely forwards to [mio::net::UdpSocket::send_to] pub fn send_to(&self, srv: &AppServer, buf: &[u8], addr: SocketAddr) -> anyhow::Result<()> { self.get(srv).send_to(buf, addr)?; Ok(()) @@ -216,28 +425,41 @@ impl SocketPtr { } /// Index based pointer to a Peer +/// +/// This allows retrieving both the io-oriented and the cryptographic information +/// about a peer. #[derive(Debug, Copy, Clone)] pub struct AppPeerPtr(pub usize); impl AppPeerPtr { - /// Takes an index based handle and returns the actual peer + /// Takes an pointer from the cryptography subsystem + /// in [AppServer::crypto_site] and derives the associated AppPeerPtr + /// in [AppServer] pub fn lift(p: PeerPtr) -> Self { Self(p.0) } - /// Returns an index based handle to one Peer + /// Turns this pointer into a cryptographic peer pointer for [CryptoServer] + /// in [AppServer::crypto_site] pub fn lower(&self) -> PeerPtr { PeerPtr(self.0) } + /// Retrieve the [AppPeer] pointed to by [Self] pub fn get_app<'a>(&self, srv: &'a AppServer) -> &'a AppPeer { &srv.peers[self.0] } + /// Retrieve the [AppPeer] pointed to by [Self], mutably pub fn get_app_mut<'a>(&self, srv: &'a mut AppServer) -> &'a mut AppPeer { &mut srv.peers[self.0] } + /// Use the associated WireGuard PSK broker via [BrokerStorePtr] + /// to upload a new PSK. + /// + /// If no PSK broker is set and [AppPeer::outfile] is none, then + /// this prints a warning pub fn set_psk(&self, server: &mut AppServer, psk: &Secret) -> anyhow::Result<()> { if let Some(broker) = server.peers[self.0].broker_peer.as_ref() { let config = broker.peer_cfg.create_config(psk); @@ -250,17 +472,34 @@ impl AppPeerPtr { } } +/// The result of [AppServer::poll]. +/// +/// Instructs [AppServer::event_loop_without_error_handling] on how to proceed. #[derive(Debug)] pub enum AppPollResult { + /// Erase the key for a given peer. Corresponds to [crate::protocol::PollResult::DeleteKey] DeleteKey(AppPeerPtr), + /// Send an initiation to the given peer. Corresponds to [crate::protocol::PollResult::SendInitiation] SendInitiation(AppPeerPtr), + /// Send a retransmission to the given peer. Corresponds to + /// [crate::protocol::PollResult::SendRetransmission] SendRetransmission(AppPeerPtr), + /// Received a network message. + /// + /// This is the only case without a correspondence in [crate::protocol::PollResult] ReceivedMessage(usize, Endpoint), } +/// The reason why we are outputting a key #[derive(Debug)] pub enum KeyOutputReason { + /// The reason is that a new key for the given peer was successfully exchanged Exchanged, + /// The reason is, that no key could be exchanged with the peer before the output + /// key lifetime was reached; a [AppPollResult::DeleteKey] event was issued. + /// + /// The key we output in this case is chosen randomly and serves to securely + /// erase whatever key is currently stored. Stale, } @@ -299,14 +538,21 @@ impl std::fmt::Display for Endpoint { } } +/// A network address bound to a particular socket. +/// +/// We need the information which socket is used because different listen sockets +/// might be on different networks. #[derive(Debug)] pub struct SocketBoundEndpoint { /// The socket the address can be reached under; this is generally /// determined when we actually receive an RespHello message socket: SocketPtr, - /// Just the address + /// The network address addr: SocketAddr, - /// identifier + /// Byte representation of this socket bound network address. + /// Generated through [SocketBoundEndpoint::to_bytes]. + /// + /// Read though [HostIdentification::encode] bytes: (usize, [u8; SocketBoundEndpoint::BUFFER_SIZE]), } @@ -317,16 +563,22 @@ impl std::fmt::Display for SocketBoundEndpoint { } impl SocketBoundEndpoint { + /// Length in bytes of the serialized socket index const SOCKET_SIZE: usize = usize::BITS as usize / 8; + /// Length in bytes of the serialized ipv6 address const IPV6_SIZE: usize = 16; + /// Length in bytes of the serialized port const PORT_SIZE: usize = 2; + /// Length in bytes of the serialized ipv6 address scope (see [SocketAddrV6::scope_id]) const SCOPE_ID_SIZE: usize = 4; + /// Length in size of const BUFFER_SIZE: usize = SocketBoundEndpoint::SOCKET_SIZE + SocketBoundEndpoint::IPV6_SIZE + SocketBoundEndpoint::PORT_SIZE + SocketBoundEndpoint::SCOPE_ID_SIZE; + /// Produce a new [Self] pub fn new(socket: SocketPtr, addr: SocketAddr) -> Self { let bytes = Self::to_bytes(&socket, &addr); Self { @@ -336,6 +588,7 @@ impl SocketBoundEndpoint { } } + /// Computes [HostIdentification::encode] for [Self]. Value cached in [Self::bytes]. fn to_bytes( socket: &SocketPtr, addr: &SocketAddr, @@ -370,12 +623,18 @@ impl HostIdentification for SocketBoundEndpoint { } impl Endpoint { - /// Start discovery from some addresses + /// Given a list of potential network addresses, start peer discovery. + /// + /// Will send initiations to different addresses given here on each [crate::msgs::InitHello] + /// retransmission during the peer discovery phase. pub fn discovery_from_addresses(addresses: Vec) -> Self { Endpoint::Discovery(HostPathDiscoveryEndpoint::from_addresses(addresses)) } - /// Start endpoint discovery from a hostname + /// Given a hostname, start peer discovery. + /// + /// Will send initiations to different addresses assigned to the host name + /// on each [crate::msgs::InitHello] retransmission during the peer discovery phase. pub fn discovery_from_hostname(hostname: String) -> anyhow::Result { let host = HostPathDiscoveryEndpoint::lookup(hostname)?; Ok(Endpoint::Discovery(host)) @@ -406,6 +665,8 @@ impl Endpoint { Some(Self::discovery_from_addresses(addrs)) } + /// Send a message to the address referenced to by this endpoint or to one of + /// the endpoints if we are in the peer discovery phase for this endpoint pub fn send(&self, srv: &AppServer, buf: &[u8]) -> anyhow::Result<()> { use Endpoint::*; match self { @@ -414,6 +675,9 @@ impl Endpoint { } } + /// List of addresses this endpoint may be associated with. + /// + /// During peer discovery, this can be multiple addresses. fn addresses(&self) -> &[SocketAddr] { use Endpoint::*; match self { @@ -451,7 +715,14 @@ impl Endpoint { // TODO: We might consider adjusting the retransmission handling to account for host-path discovery #[derive(Debug)] pub struct HostPathDiscoveryEndpoint { - scouting_state: Cell<(usize, usize)>, // addr_off, sock_off + /// Round robin index the next [Self::send_scouting] call should send packets to + /// + /// (address offset, socket offset) + /// + /// Including the socket here accounts for the fact that some network addresses may be + /// reachable only through particular UDP sockets + scouting_state: Cell<(usize, usize)>, + /// List of addresses fir oeer discovery addresses: Vec, } @@ -462,6 +733,7 @@ impl std::fmt::Display for HostPathDiscoveryEndpoint { } impl HostPathDiscoveryEndpoint { + /// Initiate a peer discovery process through a list of potential addresses pub fn from_addresses(addresses: Vec) -> Self { let scouting_state = Cell::new((0, 0)); Self { @@ -470,7 +742,7 @@ impl HostPathDiscoveryEndpoint { } } - /// Lookup a hostname + /// Initiate a peer discovery process through hostname lookup pub fn lookup(hostname: String) -> anyhow::Result { Ok(Self { addresses: ToSocketAddrs::to_socket_addrs(&hostname)?.collect(), @@ -478,10 +750,14 @@ impl HostPathDiscoveryEndpoint { }) } + /// List of address candidates for the peer pub fn addresses(&self) -> &Vec { &self.addresses } + /// Calculates and stores the next value for [Self::scouting_state] + /// given the address and socket we just send a scouting [crate::msgs::InitHello] message + /// to fn insert_next_scout_offset(&self, srv: &AppServer, addr_no: usize, sock_no: usize) { self.scouting_state.set(( (addr_no + 1) % self.addresses.len(), @@ -536,6 +812,11 @@ impl HostPathDiscoveryEndpoint { } impl AppServer { + /// Construct a new AppServer + /// + /// # Examples + /// + /// See [Self]. pub fn new( keypair: Option<(SSk, SPk)>, addrs: Vec, @@ -660,22 +941,37 @@ impl AppServer { }) } + /// Access the cryptographic protocol server + /// + /// This may return an error if [Self] was initialized without a keypair + /// and no keypair has been supplied since then. + /// + /// I.e. will return an error if [Self::crypto_site] is not fully erect pub fn crypto_server(&self) -> anyhow::Result<&CryptoServer> { self.crypto_site .product_ref() .context("Cryptography handler not initialized") } + /// Access the cryptographic protocol server, mutably + /// + /// This may return an error if [Self] was initialized without a keypair + /// and no keypair has been supplied since then. + /// + /// I.e. will return an error if [Self::crypto_site] is not fully erect pub fn crypto_server_mut(&mut self) -> anyhow::Result<&mut CryptoServer> { self.crypto_site .product_mut() .context("Cryptography handler not initialized") } + /// If set to [Verbosity::Verbose], then some extra information will be printed + /// at the info log level pub fn verbose(&self) -> bool { matches!(self.verbosity, Verbosity::Verbose) } + /// Used by [Self::new] to register a new udp listen source pub fn register_listen_socket(&mut self, mut sock: mio::net::UdpSocket) -> anyhow::Result<()> { let mio_token = self.mio_token_dispenser.dispense(); self.mio_poll @@ -687,16 +983,19 @@ impl AppServer { Ok(()) } + /// Used to register a source of IO such as a listen socket with [Self::io_source_index] pub fn register_io_source(&mut self, token: mio::Token, io_source: AppServerIoSource) { let prev = self.io_source_index.insert(token, io_source); assert!(prev.is_none()); } + /// Unregister an IO source registered with [Self::register_io_source] pub fn unregister_io_source(&mut self, token: mio::Token) { let value = self.io_source_index.remove(&token); assert!(value.is_some(), "Removed IO source that does not exist"); } + /// Register a new WireGuard PSK broker pub fn register_broker( &mut self, broker: Box>, @@ -719,6 +1018,7 @@ impl AppServer { Ok(BrokerStorePtr(ptr)) } + /// Unregister a WireGuard PSK broker registered with [Self::register_broker] pub fn unregister_broker(&mut self, ptr: BrokerStorePtr) -> Result<()> { let mut broker = self .brokers @@ -730,6 +1030,11 @@ impl AppServer { Ok(()) } + /// Register a new protocol peer + /// + /// # Examples + /// + /// See [Self::new]. pub fn add_peer( &mut self, psk: Option, @@ -758,6 +1063,11 @@ impl AppServer { Ok(AppPeerPtr(pn)) } + /// Main IO handler; this generally does not terminate + /// + /// # Examples + /// + /// See [Self::new]. pub fn event_loop(&mut self) -> anyhow::Result<()> { const INIT_SLEEP: f64 = 0.01; const MAX_FAILURES: i32 = 10; @@ -811,6 +1121,9 @@ impl AppServer { } } + /// IO handler without proactive restarts after errors. + /// + /// This is used internally in [Self::event_loop]. pub fn event_loop_without_error_handling(&mut self) -> anyhow::Result<()> { let (mut rx, mut tx) = (MsgBuf::zero(), MsgBuf::zero()); @@ -930,6 +1243,8 @@ impl AppServer { } } + /// Helper for [Self::event_loop_without_error_handling] to handle network messages + /// under DoS condition fn handle_msg_under_load( &mut self, endpoint: &Endpoint, @@ -946,6 +1261,8 @@ impl AppServer { } } + /// Used as a helper by [Self::event_loop_without_error_handling] when + /// a new output key has been echanged pub fn output_key( &mut self, peer: AppPeerPtr, @@ -995,6 +1312,10 @@ impl AppServer { Ok(()) } + /// Poll for events from the cryptographic server ([Self::crypto_server()]) + /// and for IO events through [Self::poll]. + /// + /// Used internally in [Self::event_loop_without_error_handling] pub fn poll(&mut self, rx_buf: &mut [u8]) -> anyhow::Result { use crate::protocol::PollResult as C; use AppPollResult as A; @@ -1026,7 +1347,9 @@ impl AppServer { Ok(res) } - /// Tries to receive a new message + /// Tries to receive a new message from the network sockets. + /// + /// Used internally in [Self::poll] /// /// - might wait for an duration up to `timeout` /// - returns immediately if an error occurs @@ -1198,6 +1521,7 @@ impl AppServer { Ok(None) } + /// Internal helper for [Self::try_recv] fn perform_mio_poll_and_register_events(&mut self, timeout: Duration) -> io::Result<()> { self.mio_poll.poll(&mut self.events, Some(timeout))?; // Fill the short poll buffer with the acquired events @@ -1208,6 +1532,7 @@ impl AppServer { Ok(()) } + /// Internal helper for [Self::try_recv] fn try_recv_from_mio_token( &mut self, buf: &mut [u8], @@ -1224,6 +1549,7 @@ impl AppServer { self.try_recv_from_io_source(buf, io_source) } + /// Internal helper for [Self::try_recv] fn try_recv_from_io_source( &mut self, buf: &mut [u8], @@ -1254,6 +1580,7 @@ impl AppServer { } } + /// Internal helper for [Self::try_recv] fn try_recv_from_listen_socket( &mut self, buf: &mut [u8], @@ -1289,6 +1616,20 @@ impl AppServer { } #[cfg(feature = "experiment_api")] +/// This is a wrapper around a reference to [AppServer] that +/// dishes out references to [AppServer::api_manager] and +/// to [AppServer] itself. +/// +/// It really just implements [crate::api::mio::MioManagerContext] which +/// provides the methods operating on [crate::api::mio::MioManager] provided +/// through a trait. +/// +/// This is a rather complicated way of providing just a few functions. The entire +/// point of this exercise is to decouple the code in the API from [AppServer] and +/// this file a bit, despite those functions all needing access to [AppServer]. +/// +/// We want the code to live in its own module instead of expanding and expanding the source +/// file with [AppServer] more and more. struct MioManagerFocus<'a>(&'a mut AppServer); #[cfg(feature = "experiment_api")] diff --git a/rosenpass/tests/app_server_example.rs b/rosenpass/tests/app_server_example.rs new file mode 100644 index 0000000..23ac462 --- /dev/null +++ b/rosenpass/tests/app_server_example.rs @@ -0,0 +1,130 @@ +use std::{ + net::SocketAddr, + ops::DerefMut, + path::PathBuf, + str::FromStr, + sync::mpsc, + thread::{self, sleep}, + time::Duration, +}; + +use anyhow::ensure; +use rosenpass::{ + app_server::{ipv4_any_binding, ipv6_any_binding, AppServer, AppServerTest, MAX_B64_KEY_SIZE}, + protocol::{SPk, SSk, SymKey}, +}; +use rosenpass_cipher_traits::Kem; +use rosenpass_ciphers::kem::StaticKem; +use rosenpass_secret_memory::Secret; +use rosenpass_util::{file::LoadValueB64, functional::run, mem::DiscardResultExt, result::OkExt}; + +#[test] +fn key_exchange_with_app_server() -> anyhow::Result<()> { + let tmpdir = tempfile::tempdir()?; + let outfile_a = tmpdir.path().join("osk_a"); + let outfile_b = tmpdir.path().join("osk_b"); + + // Set security policy for storing secrets; choose the one that is faster for testing + rosenpass_secret_memory::policy::secret_policy_use_only_malloc_secrets(); + + // Introduce the servers to each other + let psk_a = SymKey::random(); + let psk_b = psk_a.clone(); + + let (tx_a, rx_b) = mpsc::sync_channel(1); + let (tx_b, rx_a) = mpsc::sync_channel(1); + + let (tx_term_a, rx_term_a) = mpsc::channel(); + let (tx_term_b, rx_term_b) = mpsc::channel(); + + let configs = [ + (false, outfile_a.clone(), psk_a, tx_a, rx_a, rx_term_a), + (true, outfile_b.clone(), psk_b, tx_b, rx_b, rx_term_b), + ]; + + for (is_client, osk, psk, tx, rx, rx_term) in configs { + thread::spawn(move || { + run(move || -> anyhow::Result<()> { + let mut srv = TestServer::new(rx_term)?; + + tx.send((srv.loopback_port()?, srv.public_key()?.clone()))?; + let (otr_port, otr_pk) = rx.recv()?; + + let psk = Some(psk); + let broker_peer = None; + let pk = otr_pk; + let outfile = Some(osk); + let port = otr_port; + let hostname = is_client.then(|| format!("[::1]:{port}")); + srv.app_srv + .add_peer(psk, pk, outfile, broker_peer, hostname)?; + + srv.app_srv.event_loop() + }) + .unwrap(); + }); + } + + // Busy wait for both keys to be exchanged + let mut successful_exchange = false; + for _ in 0..2000 { + // 40s + sleep(Duration::from_millis(20)); + run(|| -> anyhow::Result<()> { + let osk_a = SymKey::load_b64::(&outfile_a)?; + let osk_b = SymKey::load_b64::(&outfile_b)?; + successful_exchange = rosenpass_constant_time::memcmp(osk_a.secret(), osk_b.secret()); + Ok(()) + }) + .discard_result(); + if successful_exchange { + break; + } + } + + // Tell the parties to terminate + tx_term_a.send(())?; + tx_term_b.send(())?; + + assert!( + successful_exchange, + "Test did not complete successfully within the deadline" + ); + + Ok(()) +} + +struct TestServer { + app_srv: AppServer, +} + +impl TestServer { + fn new(termination_queue: mpsc::Receiver<()>) -> anyhow::Result { + let (mut sk, mut pk) = (SSk::zero(), SPk::zero()); + StaticKem::keygen(sk.secret_mut(), pk.deref_mut())?; + + let keypair = Some((sk, pk)); + let addrs = vec![ + SocketAddr::from_str("[::1]:0")?, // Localhost, any port. For connecting to the test server. + // ipv4_any_binding(), // any IPv4 interface + // ipv6_any_binding(), // any IPv6 interface + ]; + let verbosity = rosenpass::config::Verbosity::Verbose; + let test_helpers = Some(AppServerTest { + enable_dos_permanently: false, + termination_handler: Some(termination_queue), + }); + + let app_srv = AppServer::new(keypair, addrs, verbosity, test_helpers)?; + + Self { app_srv }.ok() + } + + fn loopback_port(&self) -> anyhow::Result { + self.app_srv.sockets[0].local_addr()?.port().ok() + } + + fn public_key(&self) -> anyhow::Result<&SPk> { + Ok(&self.app_srv.crypto_server()?.spkm) + } +}