umsh_node/
peer.rs

1use alloc::boxed::Box;
2
3use umsh_core::PublicKey;
4use umsh_mac::SendOptions;
5
6use crate::node::{LocalNode, NodeError, Subscription, SubscriptionHandle};
7use crate::receive::ReceivedPacketRef;
8use crate::ticket::SendProgressTicket;
9use crate::transport::Transport;
10
11/// Relationship with one remote peer, bound to a transport context.
12///
13/// Generic over `T: Transport` — works with `LocalNode` (unicast) or
14/// `BoundChannel` (blind unicast) identically.
15#[derive(Clone)]
16pub struct PeerConnection<T: Transport> {
17    transport: T,
18    peer: PublicKey,
19}
20
21impl<T: Transport> PeerConnection<T> {
22    /// Create a new peer connection.
23    pub(crate) fn new(transport: T, peer: PublicKey) -> Self {
24        Self { transport, peer }
25    }
26
27    /// The remote peer's public key.
28    pub fn peer(&self) -> &PublicKey {
29        &self.peer
30    }
31
32    /// Send a raw payload to this peer (delegates to `transport.send()`).
33    pub async fn send(
34        &self,
35        payload: &[u8],
36        options: &SendOptions,
37    ) -> Result<SendProgressTicket, T::Error> {
38        self.transport.send(&self.peer, payload, options).await
39    }
40}
41
42impl<M: crate::mac::MacBackend> PeerConnection<LocalNode<M>> {
43    fn add_receive_handler<F>(&self, handler: F) -> SubscriptionHandle
44    where
45        F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
46    {
47        self.transport
48            .state()
49            .borrow_mut()
50            .peer_subscriptions_mut(self.peer)
51            .receive_handlers
52            .insert(Box::new(handler))
53    }
54
55    pub fn on_receive<F>(&self, handler: F) -> Subscription
56    where
57        F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
58    {
59        let handle = self.add_receive_handler(handler);
60        let state = self.transport.state().clone();
61        let peer = self.peer;
62        Subscription::new(move || {
63            let mut state = state.borrow_mut();
64            let Some(entry) = state.find_peer_subscriptions_mut(peer) else {
65                return false;
66            };
67            entry.receive_handlers.remove(handle)
68        })
69    }
70
71    fn add_ack_received_handler<F>(&self, handler: F) -> SubscriptionHandle
72    where
73        F: FnMut(crate::SendToken) + 'static,
74    {
75        self.transport
76            .state()
77            .borrow_mut()
78            .peer_subscriptions_mut(self.peer)
79            .ack_received_handlers
80            .insert(Box::new(handler))
81    }
82
83    pub fn on_ack_received<F>(&self, handler: F) -> Subscription
84    where
85        F: FnMut(crate::SendToken) + 'static,
86    {
87        let handle = self.add_ack_received_handler(handler);
88        let state = self.transport.state().clone();
89        let peer = self.peer;
90        Subscription::new(move || {
91            let mut state = state.borrow_mut();
92            let Some(entry) = state.find_peer_subscriptions_mut(peer) else {
93                return false;
94            };
95            entry.ack_received_handlers.remove(handle)
96        })
97    }
98
99    fn add_ack_timeout_handler<F>(&self, handler: F) -> SubscriptionHandle
100    where
101        F: FnMut(crate::SendToken) + 'static,
102    {
103        self.transport
104            .state()
105            .borrow_mut()
106            .peer_subscriptions_mut(self.peer)
107            .ack_timeout_handlers
108            .insert(Box::new(handler))
109    }
110
111    pub fn on_ack_timeout<F>(&self, handler: F) -> Subscription
112    where
113        F: FnMut(crate::SendToken) + 'static,
114    {
115        let handle = self.add_ack_timeout_handler(handler);
116        let state = self.transport.state().clone();
117        let peer = self.peer;
118        Subscription::new(move || {
119            let mut state = state.borrow_mut();
120            let Some(entry) = state.find_peer_subscriptions_mut(peer) else {
121                return false;
122            };
123            entry.ack_timeout_handlers.remove(handle)
124        })
125    }
126
127    pub fn on_pfs_established<F>(&self, handler: F) -> Subscription
128    where
129        F: FnMut() + 'static,
130    {
131        let handle = self
132            .transport
133            .state()
134            .borrow_mut()
135            .peer_subscriptions_mut(self.peer)
136            .pfs_established_handlers
137            .insert(Box::new(handler));
138        let state = self.transport.state().clone();
139        let peer = self.peer;
140        Subscription::new(move || {
141            let mut state = state.borrow_mut();
142            let Some(entry) = state.find_peer_subscriptions_mut(peer) else {
143                return false;
144            };
145            entry.pfs_established_handlers.remove(handle)
146        })
147    }
148
149    pub fn on_pfs_ended<F>(&self, handler: F) -> Subscription
150    where
151        F: FnMut() + 'static,
152    {
153        let handle = self
154            .transport
155            .state()
156            .borrow_mut()
157            .peer_subscriptions_mut(self.peer)
158            .pfs_ended_handlers
159            .insert(Box::new(handler));
160        let state = self.transport.state().clone();
161        let peer = self.peer;
162        Subscription::new(move || {
163            let mut state = state.borrow_mut();
164            let Some(entry) = state.find_peer_subscriptions_mut(peer) else {
165                return false;
166            };
167            entry.pfs_ended_handlers.remove(handle)
168        })
169    }
170
171    #[cfg(feature = "software-crypto")]
172    pub async fn request_pfs(
173        &self,
174        duration_minutes: u16,
175        options: &SendOptions,
176    ) -> Result<SendProgressTicket, NodeError<M>> {
177        self.transport
178            .request_pfs(&self.peer, duration_minutes, options)
179            .await
180    }
181
182    #[cfg(feature = "software-crypto")]
183    pub async fn end_pfs(&self, options: &SendOptions) -> Result<(), NodeError<M>> {
184        self.transport.end_pfs(&self.peer, options).await
185    }
186
187    #[cfg(feature = "software-crypto")]
188    pub fn pfs_status(&self) -> Result<crate::node::PfsStatus, NodeError<M>> {
189        self.transport.pfs_status(&self.peer)
190    }
191}