1use alloc::boxed::Box;
2
3use umsh_core::PublicKey;
4use umsh_mac::SendOptions;
5
6use crate::node::{LocalNode, NodeError, Subscription, SubscriptionHandle};
7use crate::receive::ReceivedPacketRef;
8use crate::ticket::SendProgressTicket;
9use crate::transport::Transport;
10
11#[derive(Clone)]
16pub struct PeerConnection<T: Transport> {
17 transport: T,
18 peer: PublicKey,
19}
20
21impl<T: Transport> PeerConnection<T> {
22 pub(crate) fn new(transport: T, peer: PublicKey) -> Self {
24 Self { transport, peer }
25 }
26
27 pub fn peer(&self) -> &PublicKey {
29 &self.peer
30 }
31
32 pub async fn send(
34 &self,
35 payload: &[u8],
36 options: &SendOptions,
37 ) -> Result<SendProgressTicket, T::Error> {
38 self.transport.send(&self.peer, payload, options).await
39 }
40}
41
42impl<M: crate::mac::MacBackend> PeerConnection<LocalNode<M>> {
43 fn add_receive_handler<F>(&self, handler: F) -> SubscriptionHandle
44 where
45 F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
46 {
47 self.transport
48 .state()
49 .borrow_mut()
50 .peer_subscriptions_mut(self.peer)
51 .receive_handlers
52 .insert(Box::new(handler))
53 }
54
55 pub fn on_receive<F>(&self, handler: F) -> Subscription
56 where
57 F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
58 {
59 let handle = self.add_receive_handler(handler);
60 let state = self.transport.state().clone();
61 let peer = self.peer;
62 Subscription::new(move || {
63 let mut state = state.borrow_mut();
64 let Some(entry) = state.find_peer_subscriptions_mut(peer) else {
65 return false;
66 };
67 entry.receive_handlers.remove(handle)
68 })
69 }
70
71 fn add_ack_received_handler<F>(&self, handler: F) -> SubscriptionHandle
72 where
73 F: FnMut(crate::SendToken) + 'static,
74 {
75 self.transport
76 .state()
77 .borrow_mut()
78 .peer_subscriptions_mut(self.peer)
79 .ack_received_handlers
80 .insert(Box::new(handler))
81 }
82
83 pub fn on_ack_received<F>(&self, handler: F) -> Subscription
84 where
85 F: FnMut(crate::SendToken) + 'static,
86 {
87 let handle = self.add_ack_received_handler(handler);
88 let state = self.transport.state().clone();
89 let peer = self.peer;
90 Subscription::new(move || {
91 let mut state = state.borrow_mut();
92 let Some(entry) = state.find_peer_subscriptions_mut(peer) else {
93 return false;
94 };
95 entry.ack_received_handlers.remove(handle)
96 })
97 }
98
99 fn add_ack_timeout_handler<F>(&self, handler: F) -> SubscriptionHandle
100 where
101 F: FnMut(crate::SendToken) + 'static,
102 {
103 self.transport
104 .state()
105 .borrow_mut()
106 .peer_subscriptions_mut(self.peer)
107 .ack_timeout_handlers
108 .insert(Box::new(handler))
109 }
110
111 pub fn on_ack_timeout<F>(&self, handler: F) -> Subscription
112 where
113 F: FnMut(crate::SendToken) + 'static,
114 {
115 let handle = self.add_ack_timeout_handler(handler);
116 let state = self.transport.state().clone();
117 let peer = self.peer;
118 Subscription::new(move || {
119 let mut state = state.borrow_mut();
120 let Some(entry) = state.find_peer_subscriptions_mut(peer) else {
121 return false;
122 };
123 entry.ack_timeout_handlers.remove(handle)
124 })
125 }
126
127 pub fn on_pfs_established<F>(&self, handler: F) -> Subscription
128 where
129 F: FnMut() + 'static,
130 {
131 let handle = self
132 .transport
133 .state()
134 .borrow_mut()
135 .peer_subscriptions_mut(self.peer)
136 .pfs_established_handlers
137 .insert(Box::new(handler));
138 let state = self.transport.state().clone();
139 let peer = self.peer;
140 Subscription::new(move || {
141 let mut state = state.borrow_mut();
142 let Some(entry) = state.find_peer_subscriptions_mut(peer) else {
143 return false;
144 };
145 entry.pfs_established_handlers.remove(handle)
146 })
147 }
148
149 pub fn on_pfs_ended<F>(&self, handler: F) -> Subscription
150 where
151 F: FnMut() + 'static,
152 {
153 let handle = self
154 .transport
155 .state()
156 .borrow_mut()
157 .peer_subscriptions_mut(self.peer)
158 .pfs_ended_handlers
159 .insert(Box::new(handler));
160 let state = self.transport.state().clone();
161 let peer = self.peer;
162 Subscription::new(move || {
163 let mut state = state.borrow_mut();
164 let Some(entry) = state.find_peer_subscriptions_mut(peer) else {
165 return false;
166 };
167 entry.pfs_ended_handlers.remove(handle)
168 })
169 }
170
171 #[cfg(feature = "software-crypto")]
172 pub async fn request_pfs(
173 &self,
174 duration_minutes: u16,
175 options: &SendOptions,
176 ) -> Result<SendProgressTicket, NodeError<M>> {
177 self.transport
178 .request_pfs(&self.peer, duration_minutes, options)
179 .await
180 }
181
182 #[cfg(feature = "software-crypto")]
183 pub async fn end_pfs(&self, options: &SendOptions) -> Result<(), NodeError<M>> {
184 self.transport.end_pfs(&self.peer, options).await
185 }
186
187 #[cfg(feature = "software-crypto")]
188 pub fn pfs_status(&self) -> Result<crate::node::PfsStatus, NodeError<M>> {
189 self.transport.pfs_status(&self.peer)
190 }
191}