umsh_mac/
handle.rs

1use core::future::poll_fn;
2use core::task::Poll;
3
4use rand::Rng;
5use umsh_core::{ChannelId, ChannelKey, PublicKey};
6use umsh_hal::{Clock, CounterStore};
7use umsh_sync::AsyncRefCell;
8
9use crate::{
10    CapacityError, DEFAULT_ACKS, DEFAULT_CHANNELS, DEFAULT_DUP, DEFAULT_FRAME, DEFAULT_IDENTITIES,
11    DEFAULT_PEERS, DEFAULT_TX, Platform,
12    coordinator::{CounterPersistenceError, LocalIdentityId, Mac, MacError, SendError},
13    peers::PeerId,
14    send::{SendOptions, SendReceipt},
15};
16
17/// Lightweight, cloneable handle for queuing MAC operations against shared state.
18///
19/// The handle borrows an [`AsyncRefCell`] that owns the underlying coordinator.
20/// Every operation takes the cell asynchronously: if another caller currently
21/// holds the coordinator (for example, the long-running `run()` loop that is
22/// waiting on the radio), operations wait rather than failing.
23pub struct MacHandle<
24    'a,
25    P: Platform,
26    const IDENTITIES: usize = DEFAULT_IDENTITIES,
27    const PEERS: usize = DEFAULT_PEERS,
28    const CHANNELS: usize = DEFAULT_CHANNELS,
29    const ACKS: usize = DEFAULT_ACKS,
30    const TX: usize = DEFAULT_TX,
31    const FRAME: usize = DEFAULT_FRAME,
32    const DUP: usize = DEFAULT_DUP,
33> {
34    mac: &'a AsyncRefCell<Mac<P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>,
35}
36
37impl<
38    'a,
39    P: Platform,
40    const IDENTITIES: usize,
41    const PEERS: usize,
42    const CHANNELS: usize,
43    const ACKS: usize,
44    const TX: usize,
45    const FRAME: usize,
46    const DUP: usize,
47> Copy for MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>
48{
49}
50
51impl<
52    'a,
53    P: Platform,
54    const IDENTITIES: usize,
55    const PEERS: usize,
56    const CHANNELS: usize,
57    const ACKS: usize,
58    const TX: usize,
59    const FRAME: usize,
60    const DUP: usize,
61> Clone for MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>
62{
63    fn clone(&self) -> Self {
64        *self
65    }
66}
67
68impl<
69    'a,
70    P: Platform,
71    const IDENTITIES: usize,
72    const PEERS: usize,
73    const CHANNELS: usize,
74    const ACKS: usize,
75    const TX: usize,
76    const FRAME: usize,
77    const DUP: usize,
78> MacHandle<'a, P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>
79{
80    /// Creates a cloneable handle backed by shared coordinator state.
81    pub fn new(
82        mac: &'a AsyncRefCell<Mac<P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>>,
83    ) -> Self {
84        Self { mac }
85    }
86
87    /// Registers a local identity with the shared coordinator.
88    pub async fn add_identity(
89        &self,
90        identity: P::Identity,
91    ) -> Result<LocalIdentityId, CapacityError> {
92        self.mac.borrow_mut().await.add_identity(identity)
93    }
94
95    /// Load the persisted frame-counter boundary for one identity.
96    pub async fn load_persisted_counter(
97        &self,
98        id: LocalIdentityId,
99    ) -> Result<u32, CounterPersistenceError<<P::CounterStore as CounterStore>::Error>> {
100        self.mac.borrow_mut().await.load_persisted_counter(id).await
101    }
102
103    /// Persist all currently scheduled frame-counter reservations.
104    pub async fn service_counter_persistence(
105        &self,
106    ) -> Result<usize, <P::CounterStore as CounterStore>::Error> {
107        self.mac
108            .borrow_mut()
109            .await
110            .service_counter_persistence()
111            .await
112    }
113
114    /// Registers or refreshes a remote peer in the shared registry.
115    pub async fn add_peer(&self, key: PublicKey) -> Result<PeerId, CapacityError> {
116        self.mac.borrow_mut().await.add_peer(key)
117    }
118
119    /// Adds or updates a shared channel and derives its multicast keys.
120    pub async fn add_channel(&self, key: ChannelKey) -> Result<(), CapacityError> {
121        self.mac.borrow_mut().await.add_channel(key)
122    }
123
124    /// Adds or updates a named channel using the coordinator's channel-key derivation.
125    pub async fn add_named_channel(&self, name: &str) -> Result<(), CapacityError> {
126        self.mac.borrow_mut().await.add_named_channel(name)
127    }
128
129    /// Return whether inbound secure packets carrying a full source key may auto-register peers.
130    pub async fn auto_register_full_key_peers(&self) -> bool {
131        self.mac.borrow_mut().await.auto_register_full_key_peers()
132    }
133
134    /// Enable or disable inbound full-key peer auto-registration.
135    pub async fn set_auto_register_full_key_peers(&self, enabled: bool) {
136        self.mac
137            .borrow_mut()
138            .await
139            .set_auto_register_full_key_peers(enabled);
140    }
141
142    /// Installs pairwise transport keys for one local identity and remote peer.
143    ///
144    /// This is a crate-internal method. External callers should use the
145    /// `unsafe-advanced` feature or go through the node-layer PFS session manager.
146    #[cfg(any(feature = "unsafe-advanced", test))]
147    pub(crate) async fn install_pairwise_keys(
148        &self,
149        identity_id: LocalIdentityId,
150        peer_id: PeerId,
151        pairwise_keys: umsh_crypto::PairwiseKeys,
152    ) -> Result<Option<crate::peers::PeerCryptoState>, SendError> {
153        self.mac
154            .borrow_mut()
155            .await
156            .install_pairwise_keys(identity_id, peer_id, pairwise_keys)
157    }
158
159    /// Installs pairwise transport keys for one local identity and remote peer.
160    ///
161    /// # Safety (logical)
162    /// Installing wrong keys will silently corrupt the session. This method
163    /// is deliberately gated behind the `unsafe-advanced` feature. Prefer
164    /// going through the node-layer PFS session manager instead.
165    #[cfg(feature = "unsafe-advanced")]
166    pub async fn install_pairwise_keys_advanced(
167        &self,
168        identity_id: LocalIdentityId,
169        peer_id: PeerId,
170        pairwise_keys: umsh_crypto::PairwiseKeys,
171    ) -> Result<Option<crate::peers::PeerCryptoState>, SendError> {
172        self.install_pairwise_keys(identity_id, peer_id, pairwise_keys)
173            .await
174    }
175
176    /// Enqueues a broadcast frame for transmission.
177    pub async fn send_broadcast(
178        &self,
179        from: LocalIdentityId,
180        payload: &[u8],
181        options: &SendOptions,
182    ) -> Result<SendReceipt, SendError> {
183        self.mac
184            .borrow_mut()
185            .await
186            .send_broadcast(from, payload, options)
187            .await
188    }
189
190    /// Enqueues a multicast frame for transmission.
191    pub async fn send_multicast(
192        &self,
193        from: LocalIdentityId,
194        channel: &ChannelId,
195        payload: &[u8],
196        options: &SendOptions,
197    ) -> Result<SendReceipt, SendError> {
198        self.mac
199            .borrow_mut()
200            .await
201            .send_multicast(from, channel, payload, options)
202            .await
203    }
204
205    /// Enqueues a unicast frame for transmission.
206    pub async fn send_unicast(
207        &self,
208        from: LocalIdentityId,
209        dst: &PublicKey,
210        payload: &[u8],
211        options: &SendOptions,
212    ) -> Result<Option<SendReceipt>, SendError> {
213        self.mac
214            .borrow_mut()
215            .await
216            .send_unicast(from, dst, payload, options)
217            .await
218    }
219
220    /// Enqueues a blind-unicast frame for transmission.
221    pub async fn send_blind_unicast(
222        &self,
223        from: LocalIdentityId,
224        dst: &PublicKey,
225        channel: &ChannelId,
226        payload: &[u8],
227        options: &SendOptions,
228    ) -> Result<Option<SendReceipt>, SendError> {
229        self.mac
230            .borrow_mut()
231            .await
232            .send_blind_unicast(from, dst, channel, payload, options)
233            .await
234    }
235
236    /// Drive the shared MAC until one wake cycle completes and invoke `on_event` for emitted events.
237    ///
238    /// The exclusive borrow on the shared coordinator is released between
239    /// every internal phase so that other handles (CLI sends, UI queries,
240    /// counter-persistence services) can interleave their own async work
241    /// while this driver is waiting on the radio or a timer.
242    pub async fn next_event(
243        &self,
244        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
245    ) -> Result<(), MacError<<P::Radio as umsh_hal::Radio>::Error>> {
246        loop {
247            // Phase 1: drain any ready transmit work.
248            self.mac.borrow_mut().await.drain_tx_queue(&mut on_event).await?;
249
250            // Phase 2: wait for a radio frame or timer deadline. Acquire the
251            // borrow briefly each poll so concurrent tasks can obtain it too.
252            let mut buf = [0u8; FRAME];
253            let cond = self.mac.cond();
254            let mut cond_ticket = cond.ticket();
255            let reason = poll_fn(|cx| {
256                // Always register on the cell's wake condition first so we
257                // re-poll when another handle mutates coordinator state
258                // (e.g. `cli.send_unicast` enqueues a frame and drops its
259                // borrow). Without this, we'd only wake on radio I/O or
260                // timer deadlines — TX queued by concurrent handles would
261                // sit until the next unrelated radio event.
262                if cond.is_ticket_triggered(&cond_ticket) {
263                    cond.forget_ticket(&mut cond_ticket);
264                    cond_ticket = cond.ticket();
265                }
266                let _ = cond.poll_wait(cx, &mut cond_ticket);
267
268                if let Some(mut mac) = self.mac.try_borrow_mut() {
269                    // Got the borrow: also register radio/timer wakers and
270                    // check for readiness in one shot.
271                    mac.poll_wait_for_wake(cx, &mut buf)
272                } else {
273                    // Another caller currently holds the cell. The cond
274                    // registration above will wake us when they release.
275                    Poll::Pending
276                }
277            })
278            .await
279            .map_err(MacError::Radio)?;
280            cond.forget_ticket(&mut cond_ticket);
281
282            // Phases 3-5: re-acquire the borrow and finish the cycle.
283            self.mac
284                .borrow_mut()
285                .await
286                .process_wake_reason(reason, &mut buf, &mut on_event)
287                .await?;
288
289            // If new transmit work appeared during processing (e.g. a
290            // retransmit was enqueued), loop back to drain it before
291            // waiting again.
292            let tx_empty = self.mac.borrow().await.tx_queue().is_empty();
293            if !tx_empty {
294                continue;
295            }
296            return Ok(());
297        }
298    }
299
300    /// Drive the shared MAC forever, invoking `on_event` for delivered events.
301    ///
302    /// This is the preferred long-lived driver API for standalone MAC-backed tasks.
303    pub async fn run(
304        &self,
305        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
306    ) -> Result<(), MacError<<P::Radio as umsh_hal::Radio>::Error>> {
307        loop {
308            self.next_event(&mut on_event).await?;
309        }
310    }
311
312    /// Drive the shared MAC forever while ignoring emitted events.
313    pub async fn run_quiet(&self) -> Result<(), MacError<<P::Radio as umsh_hal::Radio>::Error>> {
314        self.run(|_, _| {}).await
315    }
316
317    /// Fills a caller-provided buffer with random bytes from the shared coordinator RNG.
318    pub async fn fill_random(&self, dest: &mut [u8]) {
319        self.mac.borrow_mut().await.rng_mut().fill_bytes(dest);
320    }
321
322    /// Returns the current coordinator clock time in milliseconds.
323    pub async fn now_ms(&self) -> u64 {
324        self.mac.borrow_mut().await.clock().now_ms()
325    }
326
327    #[cfg(feature = "software-crypto")]
328    /// Registers an ephemeral software identity with the shared coordinator.
329    pub async fn register_ephemeral(
330        &self,
331        parent: LocalIdentityId,
332        identity: umsh_crypto::software::SoftwareIdentity,
333    ) -> Result<LocalIdentityId, CapacityError> {
334        self.mac
335            .borrow_mut()
336            .await
337            .register_ephemeral(parent, identity)
338    }
339
340    #[cfg(feature = "software-crypto")]
341    /// Removes a previously registered ephemeral identity.
342    pub async fn remove_ephemeral(&self, id: LocalIdentityId) -> bool {
343        self.mac.borrow_mut().await.remove_ephemeral(id)
344    }
345
346    /// Cancel a pending ACK-requested send, stopping retransmissions.
347    ///
348    /// Returns `true` if the pending ACK was found and removed.
349    pub async fn cancel_pending_ack(
350        &self,
351        identity_id: LocalIdentityId,
352        receipt: SendReceipt,
353    ) -> bool {
354        self.mac
355            .borrow_mut()
356            .await
357            .cancel_pending_ack(identity_id, receipt)
358    }
359}