umsh_node/
host.rs

1use alloc::rc::Rc;
2use alloc::vec::Vec;
3use core::cell::RefCell;
4
5use umsh_mac::{LocalIdentityId, MacError, MacHandle, MacHandleError, Platform, SendOptions};
6
7use crate::dispatch::EventDispatcher;
8use crate::node::{LocalNode, LocalNodeState, NodeMembership, PfsLifecycle};
9use crate::receive::ReceivedPacketRef;
10use crate::{OwnedMacCommand, OwnedNodeIdentityPayload, identity_payload, mac_command};
11use umsh_core::PayloadType;
12
13/// Error returned when a [`Host`] cannot make progress.
14///
15/// `Busy` means some other caller already holds the underlying shared
16/// [`MacHandle`](umsh_mac::MacHandle) borrow. `Mac` wraps the underlying runtime failure.
17#[derive(Debug)]
18pub enum HostError<E> {
19    Busy,
20    Mac(E),
21}
22
23impl<E> From<MacHandleError<E>> for HostError<E> {
24    fn from(value: MacHandleError<E>) -> Self {
25        match value {
26            MacHandleError::Busy => Self::Busy,
27            MacHandleError::Inner(inner) => Self::Mac(inner),
28        }
29    }
30}
31
32/// Multi-identity orchestration layer for the node API.
33///
34/// `Host` owns the shared MAC run loop and routes inbound events to the correct
35/// [`LocalNode`], including ephemeral PFS identities that belong to a long-term node.
36/// Applications typically:
37///
38/// 1. construct a `Host` from a shared [`MacHandle`](umsh_mac::MacHandle)
39/// 2. register one or more [`LocalNode`]s with [`add_node`](Self::add_node)
40/// 3. attach callbacks to nodes / peers / wrappers
41/// 4. drive progress with [`run`](Self::run) or [`pump_once`](Self::pump_once)
42///
43/// `run()` is the preferred long-lived driver. `pump_once()` exists for callers that need to
44/// multiplex UMSH progress with other async work using `select!`.
45pub struct Host<
46    'a,
47    P: Platform,
48    const IDENTITIES: usize,
49    const PEERS: usize,
50    const CHANNELS: usize,
51    const ACKS: usize,
52    const TX: usize,
53    const FRAME: usize,
54    const DUP: usize,
55> {
56    mac: MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>,
57    dispatcher: Rc<RefCell<EventDispatcher>>,
58    nodes: Vec<(
59        LocalIdentityId,
60        LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>,
61    )>,
62    pfs_control_options: SendOptions,
63}
64
65impl<
66    'a,
67    P: Platform,
68    const IDENTITIES: usize,
69    const PEERS: usize,
70    const CHANNELS: usize,
71    const ACKS: usize,
72    const TX: usize,
73    const FRAME: usize,
74    const DUP: usize,
75> Host<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>
76{
77    /// Create a host around a shared MAC handle.
78    pub fn new(mac: MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>) -> Self {
79        Self {
80            mac,
81            dispatcher: Rc::new(RefCell::new(EventDispatcher::new())),
82            nodes: Vec::new(),
83            pfs_control_options: SendOptions::default()
84                .with_ack_requested(true)
85                .with_flood_hops(5),
86        }
87    }
88
89    /// Return the underlying shared MAC handle.
90    pub fn mac(&self) -> MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP> {
91        self.mac
92    }
93
94    /// Borrow the send options used for node-managed PFS control messages.
95    pub fn pfs_control_options(&self) -> &SendOptions {
96        &self.pfs_control_options
97    }
98
99    /// Replace the send options used for node-managed PFS control messages.
100    pub fn set_pfs_control_options(&mut self, options: SendOptions) {
101        self.pfs_control_options = options;
102    }
103
104    /// Create and register a [`LocalNode`] for an already-registered local identity.
105    ///
106    /// The returned handle is cheap to clone and becomes the application-facing entry point
107    /// for sending traffic and attaching callbacks for that identity.
108    pub fn add_node(
109        &mut self,
110        identity_id: LocalIdentityId,
111    ) -> LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>> {
112        let membership = Rc::new(RefCell::new(NodeMembership::new()));
113        let state = Rc::new(RefCell::new(LocalNodeState::new()));
114        let node = LocalNode::new(
115            identity_id,
116            self.mac,
117            self.dispatcher.clone(),
118            membership,
119            state,
120        );
121        self.nodes.push((identity_id, node.clone()));
122        node
123    }
124
125    /// Look up a previously added node by identity id.
126    pub fn node(
127        &self,
128        identity_id: LocalIdentityId,
129    ) -> Option<LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>>
130    {
131        self.nodes
132            .iter()
133            .find(|(id, _)| *id == identity_id)
134            .map(|(_, node)| node.clone())
135    }
136
137    fn route_node(
138        &self,
139        identity_id: LocalIdentityId,
140    ) -> Option<LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>>
141    {
142        if let Some(node) = self.node(identity_id) {
143            return Some(node);
144        }
145
146        #[cfg(feature = "software-crypto")]
147        {
148            return self
149                .nodes
150                .iter()
151                .find(|(_, node)| node.owns_ephemeral_identity(identity_id))
152                .map(|(_, node)| node.clone());
153        }
154
155        #[cfg(not(feature = "software-crypto"))]
156        {
157            None
158        }
159    }
160
161    /// Drive the shared MAC until one wake cycle completes.
162    ///
163    /// This is a single wake-driven step, not a non-blocking poll. It waits until the MAC has
164    /// meaningful work to do (radio activity or a protocol deadline), dispatches any resulting
165    /// callbacks, services PFS command handling, and then returns.
166    ///
167    /// Use this when you need to multiplex UMSH progress with other async sources using
168    /// `select!`. If UMSH owns the task, prefer [`run`](Self::run).
169    pub async fn pump_once(
170        &mut self,
171    ) -> Result<(), HostError<MacError<<P::Radio as umsh_hal::Radio>::Error>>> {
172        let pending_pfs = Rc::new(RefCell::new(Vec::<(
173            LocalIdentityId,
174            umsh_core::PublicKey,
175            OwnedMacCommand,
176        )>::new()));
177        let pending_pfs_ref = pending_pfs.clone();
178        let dispatcher = self.dispatcher.clone();
179        let nodes = self.nodes.clone();
180        self.mac
181            .next_event(move |identity_id, event| {
182                dispatcher
183                    .borrow_mut()
184                    .dispatch_ticket_state(identity_id, &event);
185                let Some(node) = route_node(&nodes, identity_id) else {
186                    return;
187                };
188                match event {
189                    umsh_mac::MacEventRef::Received(packet) => {
190                        let _ = node.dispatch_received_packet(&packet);
191                        if packet.packet_type() == umsh_core::PacketType::Broadcast
192                            && packet.payload().is_empty()
193                        {
194                            if let Some(from_hint) = packet.from_hint() {
195                                node.dispatch_beacon(from_hint, packet.from_key());
196                            }
197                        } else if let Some(from) = packet.from_key() {
198                            dispatch_payload_callbacks(&node, &packet, from, &pending_pfs_ref);
199                        }
200                    }
201                    umsh_mac::MacEventRef::AckReceived { peer, receipt } => {
202                        node.dispatch_ack_received(
203                            peer,
204                            crate::SendToken::new(identity_id, receipt),
205                        );
206                    }
207                    umsh_mac::MacEventRef::AckTimeout { peer, receipt } => {
208                        node.dispatch_ack_timeout(
209                            peer,
210                            crate::SendToken::new(identity_id, receipt),
211                        );
212                    }
213                    umsh_mac::MacEventRef::Transmitted { .. }
214                    | umsh_mac::MacEventRef::Forwarded { .. } => {}
215                }
216            })
217            .await?;
218
219        let queued: Vec<(LocalIdentityId, umsh_core::PublicKey, OwnedMacCommand)> =
220            pending_pfs.borrow_mut().drain(..).collect();
221        for (identity_id, from, command) in queued {
222            self.handle_pfs_command(identity_id, from, command).await;
223        }
224
225        #[cfg(feature = "software-crypto")]
226        for (_, node) in &self.nodes {
227            if let Ok(expired) = node.expire_pfs_sessions() {
228                for peer in expired {
229                    node.dispatch_pfs_ended(peer);
230                }
231            }
232        }
233
234        Ok(())
235    }
236
237    /// Run the shared MAC/Host loop forever.
238    ///
239    /// This is the preferred long-lived driver for node-based applications. It keeps the
240    /// runtime wake policy inside the MAC/Host stack rather than requiring callers to write
241    /// poll/sleep loops themselves.
242    pub async fn run(
243        &mut self,
244    ) -> Result<(), HostError<MacError<<P::Radio as umsh_hal::Radio>::Error>>> {
245        loop {
246            self.pump_once().await?;
247        }
248    }
249
250    async fn handle_pfs_command(
251        &mut self,
252        identity_id: LocalIdentityId,
253        from: umsh_core::PublicKey,
254        command: OwnedMacCommand,
255    ) {
256        let Some(node) = self.route_node(identity_id) else {
257            return;
258        };
259
260        if let Ok(Some(lifecycle)) = node
261            .handle_pfs_command(&from, &command, &self.pfs_control_options)
262            .await
263        {
264            match lifecycle {
265                PfsLifecycle::Established(peer) => node.dispatch_pfs_established(peer),
266                PfsLifecycle::Ended(peer) => node.dispatch_pfs_ended(peer),
267            }
268        }
269    }
270}
271
272fn route_node<
273    'a,
274    P: Platform,
275    const IDENTITIES: usize,
276    const PEERS: usize,
277    const CHANNELS: usize,
278    const ACKS: usize,
279    const TX: usize,
280    const FRAME: usize,
281    const DUP: usize,
282>(
283    nodes: &[(
284        LocalIdentityId,
285        LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>,
286    )],
287    identity_id: LocalIdentityId,
288) -> Option<LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>> {
289    nodes
290        .iter()
291        .find(|(id, _)| *id == identity_id)
292        .map(|(_, node)| node.clone())
293        .or_else(|| {
294            nodes
295                .iter()
296                .find(|(_, node)| node.owns_ephemeral_identity(identity_id))
297                .map(|(_, node)| node.clone())
298        })
299}
300
301fn dispatch_payload_callbacks<
302    'a,
303    P: Platform,
304    const IDENTITIES: usize,
305    const PEERS: usize,
306    const CHANNELS: usize,
307    const ACKS: usize,
308    const TX: usize,
309    const FRAME: usize,
310    const DUP: usize,
311>(
312    node: &LocalNode<MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>,
313    packet: &ReceivedPacketRef<'_>,
314    from: umsh_core::PublicKey,
315    pending_pfs: &Rc<RefCell<Vec<(LocalIdentityId, umsh_core::PublicKey, OwnedMacCommand)>>>,
316) {
317    if packet.payload_type() == PayloadType::NodeIdentity {
318        if let Ok(identity) = identity_payload::parse(packet.payload()) {
319            let owned = OwnedNodeIdentityPayload::from(identity);
320            node.dispatch_node_discovered(from, owned.name.as_deref());
321        }
322        return;
323    }
324
325    if packet.payload_type() == PayloadType::MacCommand {
326        if let Ok(command) = mac_command::parse(packet.payload()) {
327            let owned = OwnedMacCommand::from(command);
328            node.dispatch_mac_command(from, &owned);
329            if matches!(
330                owned,
331                OwnedMacCommand::PfsSessionRequest { .. }
332                    | OwnedMacCommand::PfsSessionResponse { .. }
333                    | OwnedMacCommand::EndPfsSession
334            ) {
335                pending_pfs
336                    .borrow_mut()
337                    .push((node.identity_id(), from, owned));
338            }
339        }
340    }
341}