umsh_node/
host.rs

1use alloc::rc::Rc;
2use alloc::vec::Vec;
3use core::cell::RefCell;
4
5use umsh_mac::{LocalIdentityId, MacError, MacHandle, Platform, SendOptions};
6
7use crate::dispatch::EventDispatcher;
8use crate::node::{LocalNode, LocalNodeState, NodeMembership, PfsLifecycle};
9use crate::receive::ReceivedPacketRef;
10use crate::{NodeIdentityPayload, OwnedMacCommand, mac_command};
11use umsh_core::PayloadType;
12
13/// Error returned when a [`Host`] cannot make progress.
14///
15/// Wraps the underlying MAC runtime failure.
16#[derive(Debug)]
17pub enum HostError<E> {
18    Mac(E),
19}
20
21/// Multi-identity orchestration layer for the node API.
22///
23/// `Host` owns the shared MAC run loop and routes inbound events to the correct
24/// [`LocalNode`], including ephemeral PFS identities that belong to a long-term node.
25/// Applications typically:
26///
27/// 1. construct a `Host` from a shared [`MacHandle`](umsh_mac::MacHandle)
28/// 2. register one or more [`LocalNode`]s with [`add_node`](Self::add_node)
29/// 3. attach callbacks to nodes / peers / wrappers
30/// 4. drive progress with [`run`](Self::run) or [`pump_once`](Self::pump_once)
31///
32/// `run()` is the preferred long-lived driver. `pump_once()` exists for callers that need to
33/// multiplex UMSH progress with other async work using `select!`.
34pub struct Host<
35    'a,
36    P: Platform,
37    const IDENTITIES: usize,
38    const PEERS: usize,
39    const CHANNELS: usize,
40    const ACKS: usize,
41    const TX: usize,
42    const FRAME: usize,
43    const DUP: usize,
44> {
45    mac: MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>,
46    dispatcher: Rc<RefCell<EventDispatcher>>,
47    nodes: Vec<(
48        LocalIdentityId,
49        LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>,
50    )>,
51    pfs_control_options: SendOptions,
52}
53
54impl<
55    'a,
56    P: Platform,
57    const IDENTITIES: usize,
58    const PEERS: usize,
59    const CHANNELS: usize,
60    const ACKS: usize,
61    const TX: usize,
62    const FRAME: usize,
63    const DUP: usize,
64> Host<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>
65{
66    /// Create a host around a shared MAC handle.
67    pub fn new(mac: MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>) -> Self {
68        Self {
69            mac,
70            dispatcher: Rc::new(RefCell::new(EventDispatcher::new())),
71            nodes: Vec::new(),
72            pfs_control_options: SendOptions::default()
73                .with_ack_requested(true)
74                .with_flood_hops(5),
75        }
76    }
77
78    /// Return the underlying shared MAC handle.
79    pub fn mac(&self) -> MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP> {
80        self.mac
81    }
82
83    /// Borrow the send options used for node-managed PFS control messages.
84    pub fn pfs_control_options(&self) -> &SendOptions {
85        &self.pfs_control_options
86    }
87
88    /// Replace the send options used for node-managed PFS control messages.
89    pub fn set_pfs_control_options(&mut self, options: SendOptions) {
90        self.pfs_control_options = options;
91    }
92
93    /// Create and register a [`LocalNode`] for an already-registered local identity.
94    ///
95    /// The returned handle is cheap to clone and becomes the application-facing entry point
96    /// for sending traffic and attaching callbacks for that identity.
97    pub fn add_node(
98        &mut self,
99        identity_id: LocalIdentityId,
100    ) -> LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>> {
101        let membership = Rc::new(RefCell::new(NodeMembership::new()));
102        let state = Rc::new(RefCell::new(LocalNodeState::new()));
103        let node = LocalNode::new(
104            identity_id,
105            self.mac,
106            self.dispatcher.clone(),
107            membership,
108            state,
109        );
110        self.nodes.push((identity_id, node.clone()));
111        node
112    }
113
114    /// Look up a previously added node by identity id.
115    pub fn node(
116        &self,
117        identity_id: LocalIdentityId,
118    ) -> Option<LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>>
119    {
120        self.nodes
121            .iter()
122            .find(|(id, _)| *id == identity_id)
123            .map(|(_, node)| node.clone())
124    }
125
126    fn route_node(
127        &self,
128        identity_id: LocalIdentityId,
129    ) -> Option<LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>>
130    {
131        if let Some(node) = self.node(identity_id) {
132            return Some(node);
133        }
134
135        #[cfg(feature = "software-crypto")]
136        {
137            return self
138                .nodes
139                .iter()
140                .find(|(_, node)| node.owns_ephemeral_identity(identity_id))
141                .map(|(_, node)| node.clone());
142        }
143
144        #[cfg(not(feature = "software-crypto"))]
145        {
146            None
147        }
148    }
149
150    /// Drive the shared MAC until one wake cycle completes.
151    ///
152    /// This is a single wake-driven step, not a non-blocking poll. It waits until the MAC has
153    /// meaningful work to do (radio activity or a protocol deadline), dispatches any resulting
154    /// callbacks, services PFS command handling, and then returns.
155    ///
156    /// Use this when you need to multiplex UMSH progress with other async sources using
157    /// `select!`. If UMSH owns the task, prefer [`run`](Self::run).
158    pub async fn pump_once(
159        &mut self,
160    ) -> Result<(), HostError<MacError<<P::Radio as umsh_hal::Radio>::Error>>> {
161        let pending_pfs = Rc::new(RefCell::new(Vec::<(
162            LocalIdentityId,
163            umsh_core::PublicKey,
164            OwnedMacCommand,
165        )>::new()));
166        let pending_pfs_ref = pending_pfs.clone();
167        let dispatcher = self.dispatcher.clone();
168        let nodes = self.nodes.clone();
169        self.mac
170            .next_event(move |identity_id, event| {
171                dispatcher
172                    .borrow_mut()
173                    .dispatch_ticket_state(identity_id, &event);
174                let Some(node) = route_node(&nodes, identity_id) else {
175                    return;
176                };
177                match event {
178                    umsh_mac::MacEventRef::Received(packet) => {
179                        let _ = node.dispatch_received_packet(&packet);
180                        if packet.packet_type() == umsh_core::PacketType::Broadcast
181                            && packet.payload().is_empty()
182                        {
183                            if let Some(from_hint) = packet.from_hint() {
184                                node.dispatch_beacon(from_hint, packet.from_key());
185                            }
186                        } else if let Some(from) = packet.from_key() {
187                            dispatch_payload_callbacks(&node, &packet, from, &pending_pfs_ref);
188                        }
189                    }
190                    umsh_mac::MacEventRef::AckReceived { peer, receipt } => {
191                        node.dispatch_ack_received(
192                            peer,
193                            crate::SendToken::new(identity_id, receipt),
194                        );
195                    }
196                    umsh_mac::MacEventRef::AckTimeout { peer, receipt } => {
197                        node.dispatch_ack_timeout(
198                            peer,
199                            crate::SendToken::new(identity_id, receipt),
200                        );
201                    }
202                    umsh_mac::MacEventRef::Transmitted { .. }
203                    | umsh_mac::MacEventRef::Forwarded { .. } => {}
204                }
205            })
206            .await
207            .map_err(HostError::Mac)?;
208
209        let queued: Vec<(LocalIdentityId, umsh_core::PublicKey, OwnedMacCommand)> =
210            pending_pfs.borrow_mut().drain(..).collect();
211        for (identity_id, from, command) in queued {
212            self.handle_pfs_command(identity_id, from, command).await;
213        }
214
215        #[cfg(feature = "software-crypto")]
216        for (_, node) in &self.nodes {
217            if let Ok(expired) = node.expire_pfs_sessions().await {
218                for peer in expired {
219                    node.dispatch_pfs_ended(peer);
220                }
221            }
222        }
223
224        Ok(())
225    }
226
227    /// Run the shared MAC/Host loop forever.
228    ///
229    /// This is the preferred long-lived driver for node-based applications. It keeps the
230    /// runtime wake policy inside the MAC/Host stack rather than requiring callers to write
231    /// poll/sleep loops themselves.
232    pub async fn run(
233        &mut self,
234    ) -> Result<(), HostError<MacError<<P::Radio as umsh_hal::Radio>::Error>>> {
235        loop {
236            self.pump_once().await?;
237        }
238    }
239
240    async fn handle_pfs_command(
241        &mut self,
242        identity_id: LocalIdentityId,
243        from: umsh_core::PublicKey,
244        command: OwnedMacCommand,
245    ) {
246        let Some(node) = self.route_node(identity_id) else {
247            return;
248        };
249
250        if let Ok(Some(lifecycle)) = node
251            .handle_pfs_command(&from, &command, &self.pfs_control_options)
252            .await
253        {
254            match lifecycle {
255                PfsLifecycle::Established(peer) => node.dispatch_pfs_established(peer),
256                PfsLifecycle::Ended(peer) => node.dispatch_pfs_ended(peer),
257            }
258        }
259    }
260}
261
262fn route_node<
263    'a,
264    P: Platform,
265    const IDENTITIES: usize,
266    const PEERS: usize,
267    const CHANNELS: usize,
268    const ACKS: usize,
269    const TX: usize,
270    const FRAME: usize,
271    const DUP: usize,
272>(
273    nodes: &[(
274        LocalIdentityId,
275        LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>,
276    )],
277    identity_id: LocalIdentityId,
278) -> Option<LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>> {
279    nodes
280        .iter()
281        .find(|(id, _)| *id == identity_id)
282        .map(|(_, node)| node.clone())
283        .or_else(|| {
284            nodes
285                .iter()
286                .find(|(_, node)| node.owns_ephemeral_identity(identity_id))
287                .map(|(_, node)| node.clone())
288        })
289}
290
291fn dispatch_payload_callbacks<
292    'a,
293    P: Platform,
294    const IDENTITIES: usize,
295    const PEERS: usize,
296    const CHANNELS: usize,
297    const ACKS: usize,
298    const TX: usize,
299    const FRAME: usize,
300    const DUP: usize,
301>(
302    node: &LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>,
303    packet: &ReceivedPacketRef<'_>,
304    from: umsh_core::PublicKey,
305    pending_pfs: &Rc<RefCell<Vec<(LocalIdentityId, umsh_core::PublicKey, OwnedMacCommand)>>>,
306) {
307    if packet.payload_type() == PayloadType::NodeIdentity {
308        if let Ok(identity) = NodeIdentityPayload::from_bytes(packet.payload()) {
309            node.dispatch_node_discovered(from, identity.name.as_deref());
310        }
311        return;
312    }
313
314    if packet.payload_type() == PayloadType::MacCommand {
315        if let Ok(command) = mac_command::parse(packet.payload()) {
316            let owned = OwnedMacCommand::from(command);
317            node.dispatch_mac_command(from, &owned);
318            if matches!(
319                owned,
320                OwnedMacCommand::PfsSessionRequest { .. }
321                    | OwnedMacCommand::PfsSessionResponse { .. }
322                    | OwnedMacCommand::EndPfsSession
323            ) {
324                pending_pfs
325                    .borrow_mut()
326                    .push((node.identity_id(), from, owned));
327            }
328        }
329    }
330}