1use alloc::rc::Rc;
2use alloc::vec::Vec;
3use core::cell::RefCell;
4
5use umsh_mac::{LocalIdentityId, MacError, MacHandle, MacHandleError, Platform, SendOptions};
6
7use crate::dispatch::EventDispatcher;
8use crate::node::{LocalNode, LocalNodeState, NodeMembership, PfsLifecycle};
9use crate::receive::ReceivedPacketRef;
10use crate::{OwnedMacCommand, OwnedNodeIdentityPayload, identity_payload, mac_command};
11use umsh_core::PayloadType;
12
13#[derive(Debug)]
18pub enum HostError<E> {
19 Busy,
20 Mac(E),
21}
22
23impl<E> From<MacHandleError<E>> for HostError<E> {
24 fn from(value: MacHandleError<E>) -> Self {
25 match value {
26 MacHandleError::Busy => Self::Busy,
27 MacHandleError::Inner(inner) => Self::Mac(inner),
28 }
29 }
30}
31
32pub struct Host<
46 'a,
47 P: Platform,
48 const IDENTITIES: usize,
49 const PEERS: usize,
50 const CHANNELS: usize,
51 const ACKS: usize,
52 const TX: usize,
53 const FRAME: usize,
54 const DUP: usize,
55> {
56 mac: MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>,
57 dispatcher: Rc<RefCell<EventDispatcher>>,
58 nodes: Vec<(
59 LocalIdentityId,
60 LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>,
61 )>,
62 pfs_control_options: SendOptions,
63}
64
65impl<
66 'a,
67 P: Platform,
68 const IDENTITIES: usize,
69 const PEERS: usize,
70 const CHANNELS: usize,
71 const ACKS: usize,
72 const TX: usize,
73 const FRAME: usize,
74 const DUP: usize,
75> Host<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>
76{
77 pub fn new(mac: MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>) -> Self {
79 Self {
80 mac,
81 dispatcher: Rc::new(RefCell::new(EventDispatcher::new())),
82 nodes: Vec::new(),
83 pfs_control_options: SendOptions::default()
84 .with_ack_requested(true)
85 .with_flood_hops(5),
86 }
87 }
88
89 pub fn mac(&self) -> MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP> {
91 self.mac
92 }
93
94 pub fn pfs_control_options(&self) -> &SendOptions {
96 &self.pfs_control_options
97 }
98
99 pub fn set_pfs_control_options(&mut self, options: SendOptions) {
101 self.pfs_control_options = options;
102 }
103
104 pub fn add_node(
109 &mut self,
110 identity_id: LocalIdentityId,
111 ) -> LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>> {
112 let membership = Rc::new(RefCell::new(NodeMembership::new()));
113 let state = Rc::new(RefCell::new(LocalNodeState::new()));
114 let node = LocalNode::new(
115 identity_id,
116 self.mac,
117 self.dispatcher.clone(),
118 membership,
119 state,
120 );
121 self.nodes.push((identity_id, node.clone()));
122 node
123 }
124
125 pub fn node(
127 &self,
128 identity_id: LocalIdentityId,
129 ) -> Option<LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>>
130 {
131 self.nodes
132 .iter()
133 .find(|(id, _)| *id == identity_id)
134 .map(|(_, node)| node.clone())
135 }
136
137 fn route_node(
138 &self,
139 identity_id: LocalIdentityId,
140 ) -> Option<LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>>
141 {
142 if let Some(node) = self.node(identity_id) {
143 return Some(node);
144 }
145
146 #[cfg(feature = "software-crypto")]
147 {
148 return self
149 .nodes
150 .iter()
151 .find(|(_, node)| node.owns_ephemeral_identity(identity_id))
152 .map(|(_, node)| node.clone());
153 }
154
155 #[cfg(not(feature = "software-crypto"))]
156 {
157 None
158 }
159 }
160
161 pub async fn pump_once(
170 &mut self,
171 ) -> Result<(), HostError<MacError<<P::Radio as umsh_hal::Radio>::Error>>> {
172 let pending_pfs = Rc::new(RefCell::new(Vec::<(
173 LocalIdentityId,
174 umsh_core::PublicKey,
175 OwnedMacCommand,
176 )>::new()));
177 let pending_pfs_ref = pending_pfs.clone();
178 let dispatcher = self.dispatcher.clone();
179 let nodes = self.nodes.clone();
180 self.mac
181 .next_event(move |identity_id, event| {
182 dispatcher
183 .borrow_mut()
184 .dispatch_ticket_state(identity_id, &event);
185 let Some(node) = route_node(&nodes, identity_id) else {
186 return;
187 };
188 match event {
189 umsh_mac::MacEventRef::Received(packet) => {
190 let _ = node.dispatch_received_packet(&packet);
191 if packet.packet_type() == umsh_core::PacketType::Broadcast
192 && packet.payload().is_empty()
193 {
194 if let Some(from_hint) = packet.from_hint() {
195 node.dispatch_beacon(from_hint, packet.from_key());
196 }
197 } else if let Some(from) = packet.from_key() {
198 dispatch_payload_callbacks(&node, &packet, from, &pending_pfs_ref);
199 }
200 }
201 umsh_mac::MacEventRef::AckReceived { peer, receipt } => {
202 node.dispatch_ack_received(
203 peer,
204 crate::SendToken::new(identity_id, receipt),
205 );
206 }
207 umsh_mac::MacEventRef::AckTimeout { peer, receipt } => {
208 node.dispatch_ack_timeout(
209 peer,
210 crate::SendToken::new(identity_id, receipt),
211 );
212 }
213 umsh_mac::MacEventRef::Transmitted { .. }
214 | umsh_mac::MacEventRef::Forwarded { .. } => {}
215 }
216 })
217 .await?;
218
219 let queued: Vec<(LocalIdentityId, umsh_core::PublicKey, OwnedMacCommand)> =
220 pending_pfs.borrow_mut().drain(..).collect();
221 for (identity_id, from, command) in queued {
222 self.handle_pfs_command(identity_id, from, command).await;
223 }
224
225 #[cfg(feature = "software-crypto")]
226 for (_, node) in &self.nodes {
227 if let Ok(expired) = node.expire_pfs_sessions() {
228 for peer in expired {
229 node.dispatch_pfs_ended(peer);
230 }
231 }
232 }
233
234 Ok(())
235 }
236
237 pub async fn run(
243 &mut self,
244 ) -> Result<(), HostError<MacError<<P::Radio as umsh_hal::Radio>::Error>>> {
245 loop {
246 self.pump_once().await?;
247 }
248 }
249
250 async fn handle_pfs_command(
251 &mut self,
252 identity_id: LocalIdentityId,
253 from: umsh_core::PublicKey,
254 command: OwnedMacCommand,
255 ) {
256 let Some(node) = self.route_node(identity_id) else {
257 return;
258 };
259
260 if let Ok(Some(lifecycle)) = node
261 .handle_pfs_command(&from, &command, &self.pfs_control_options)
262 .await
263 {
264 match lifecycle {
265 PfsLifecycle::Established(peer) => node.dispatch_pfs_established(peer),
266 PfsLifecycle::Ended(peer) => node.dispatch_pfs_ended(peer),
267 }
268 }
269 }
270}
271
272fn route_node<
273 'a,
274 P: Platform,
275 const IDENTITIES: usize,
276 const PEERS: usize,
277 const CHANNELS: usize,
278 const ACKS: usize,
279 const TX: usize,
280 const FRAME: usize,
281 const DUP: usize,
282>(
283 nodes: &[(
284 LocalIdentityId,
285 LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>,
286 )],
287 identity_id: LocalIdentityId,
288) -> Option<LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>> {
289 nodes
290 .iter()
291 .find(|(id, _)| *id == identity_id)
292 .map(|(_, node)| node.clone())
293 .or_else(|| {
294 nodes
295 .iter()
296 .find(|(_, node)| node.owns_ephemeral_identity(identity_id))
297 .map(|(_, node)| node.clone())
298 })
299}
300
301fn dispatch_payload_callbacks<
302 'a,
303 P: Platform,
304 const IDENTITIES: usize,
305 const PEERS: usize,
306 const CHANNELS: usize,
307 const ACKS: usize,
308 const TX: usize,
309 const FRAME: usize,
310 const DUP: usize,
311>(
312 node: &LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>,
313 packet: &ReceivedPacketRef<'_>,
314 from: umsh_core::PublicKey,
315 pending_pfs: &Rc<RefCell<Vec<(LocalIdentityId, umsh_core::PublicKey, OwnedMacCommand)>>>,
316) {
317 if packet.payload_type() == PayloadType::NodeIdentity {
318 if let Ok(identity) = identity_payload::parse(packet.payload()) {
319 let owned = OwnedNodeIdentityPayload::from(identity);
320 node.dispatch_node_discovered(from, owned.name.as_deref());
321 }
322 return;
323 }
324
325 if packet.payload_type() == PayloadType::MacCommand {
326 if let Ok(command) = mac_command::parse(packet.payload()) {
327 let owned = OwnedMacCommand::from(command);
328 node.dispatch_mac_command(from, &owned);
329 if matches!(
330 owned,
331 OwnedMacCommand::PfsSessionRequest { .. }
332 | OwnedMacCommand::PfsSessionResponse { .. }
333 | OwnedMacCommand::EndPfsSession
334 ) {
335 pending_pfs
336 .borrow_mut()
337 .push((node.identity_id(), from, owned));
338 }
339 }
340 }
341}