umsh_node/
node.rs

1use alloc::boxed::Box;
2use alloc::rc::Rc;
3#[cfg(feature = "software-crypto")]
4use alloc::vec::Vec;
5use core::cell::RefCell;
6use core::num::NonZeroU32;
7
8use umsh_core::NodeHint;
9use umsh_core::PublicKey;
10use umsh_mac::{LocalIdentityId, SendOptions};
11
12#[cfg(feature = "software-crypto")]
13use crate::channel::Channel;
14use crate::dispatch::EventDispatcher;
15use crate::mac::MacBackend;
16use crate::peer::PeerConnection;
17#[cfg(feature = "software-crypto")]
18use crate::pfs::{PfsSessionManager, PfsState};
19use crate::receive::ReceivedPacketRef;
20use crate::ticket::{SendProgressTicket, SendToken};
21use crate::transport::Transport;
22use crate::{AppEncodeError, OwnedMacCommand};
23
24/// Per-node shared membership state. All cloned `LocalNode` handles and
25/// their `BoundChannel`s share the same instance via `Rc<RefCell<...>>`.
26pub(crate) struct NodeMembership {
27    #[cfg(feature = "software-crypto")]
28    pub channels: Vec<ChannelMembershipEntry>,
29}
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
32pub(crate) struct SubscriptionHandle(NonZeroU32);
33
34/// Owned subscription guard.
35///
36/// Dropping the value automatically unregisters the callback.
37pub struct Subscription {
38    cancel: Rc<RefCell<Option<Box<dyn FnMut() -> bool>>>>,
39}
40
41impl Subscription {
42    pub(crate) fn new(cancel: impl FnMut() -> bool + 'static) -> Self {
43        Self {
44            cancel: Rc::new(RefCell::new(Some(Box::new(cancel)))),
45        }
46    }
47
48    /// Unregister immediately instead of waiting for drop.
49    pub fn unsubscribe(self) -> bool {
50        Self::run_cancel(&self.cancel)
51    }
52
53    fn run_cancel(cancel: &Rc<RefCell<Option<Box<dyn FnMut() -> bool>>>>) -> bool {
54        let Some(mut cancel) = cancel.borrow_mut().take() else {
55            return false;
56        };
57        cancel()
58    }
59}
60
61impl Drop for Subscription {
62    fn drop(&mut self) {
63        let _ = Self::run_cancel(&self.cancel);
64    }
65}
66
67pub(crate) struct HandlerTable<T> {
68    slots: Vec<Option<T>>,
69}
70
71impl<T> Default for HandlerTable<T> {
72    fn default() -> Self {
73        Self { slots: Vec::new() }
74    }
75}
76
77impl<T> HandlerTable<T> {
78    pub(crate) fn insert(&mut self, handler: T) -> SubscriptionHandle {
79        if let Some((index, slot)) = self
80            .slots
81            .iter_mut()
82            .enumerate()
83            .find(|(_, slot)| slot.is_none())
84        {
85            *slot = Some(handler);
86            return SubscriptionHandle(NonZeroU32::new((index + 1) as u32).unwrap());
87        }
88        self.slots.push(Some(handler));
89        SubscriptionHandle(NonZeroU32::new(self.slots.len() as u32).unwrap())
90    }
91
92    pub(crate) fn remove(&mut self, handle: SubscriptionHandle) -> bool {
93        let index = handle.0.get() as usize - 1;
94        let Some(slot) = self.slots.get_mut(index) else {
95            return false;
96        };
97        slot.take().is_some()
98    }
99
100    fn any_mut(&mut self, mut f: impl FnMut(&mut T) -> bool) -> bool {
101        for slot in &mut self.slots {
102            let Some(handler) = slot.as_mut() else {
103                continue;
104            };
105            if f(handler) {
106                return true;
107            }
108        }
109        false
110    }
111
112    fn for_each_mut(&mut self, mut f: impl FnMut(&mut T)) {
113        for slot in &mut self.slots {
114            let Some(handler) = slot.as_mut() else {
115                continue;
116            };
117            f(handler);
118        }
119    }
120}
121
122pub(crate) struct PeerSubscriptions {
123    peer: PublicKey,
124    pub(crate) receive_handlers: HandlerTable<Box<dyn FnMut(&ReceivedPacketRef<'_>) -> bool>>,
125    pub(crate) ack_received_handlers: HandlerTable<Box<dyn FnMut(SendToken)>>,
126    pub(crate) ack_timeout_handlers: HandlerTable<Box<dyn FnMut(SendToken)>>,
127    pub(crate) pfs_established_handlers: HandlerTable<Box<dyn FnMut()>>,
128    pub(crate) pfs_ended_handlers: HandlerTable<Box<dyn FnMut()>>,
129}
130
131impl PeerSubscriptions {
132    fn new(peer: PublicKey) -> Self {
133        Self {
134            peer,
135            receive_handlers: HandlerTable::default(),
136            ack_received_handlers: HandlerTable::default(),
137            ack_timeout_handlers: HandlerTable::default(),
138            pfs_established_handlers: HandlerTable::default(),
139            pfs_ended_handlers: HandlerTable::default(),
140        }
141    }
142}
143
144pub(crate) struct LocalNodeState {
145    receive_handlers: HandlerTable<Box<dyn FnMut(&ReceivedPacketRef<'_>) -> bool>>,
146    node_discovered_handlers: HandlerTable<Box<dyn FnMut(PublicKey, Option<&str>)>>,
147    beacon_handlers: HandlerTable<Box<dyn FnMut(NodeHint, Option<PublicKey>)>>,
148    mac_command_handlers: HandlerTable<Box<dyn FnMut(PublicKey, &OwnedMacCommand)>>,
149    ack_received_handlers: HandlerTable<Box<dyn FnMut(PublicKey, SendToken)>>,
150    ack_timeout_handlers: HandlerTable<Box<dyn FnMut(PublicKey, SendToken)>>,
151    pfs_established_handlers: HandlerTable<Box<dyn FnMut(PublicKey)>>,
152    pfs_ended_handlers: HandlerTable<Box<dyn FnMut(PublicKey)>>,
153    peer_subscriptions: Vec<PeerSubscriptions>,
154    #[cfg(feature = "software-crypto")]
155    pfs: PfsSessionManager,
156}
157
158impl LocalNodeState {
159    pub(crate) fn new() -> Self {
160        Self {
161            receive_handlers: HandlerTable::default(),
162            node_discovered_handlers: HandlerTable::default(),
163            beacon_handlers: HandlerTable::default(),
164            mac_command_handlers: HandlerTable::default(),
165            ack_received_handlers: HandlerTable::default(),
166            ack_timeout_handlers: HandlerTable::default(),
167            pfs_established_handlers: HandlerTable::default(),
168            pfs_ended_handlers: HandlerTable::default(),
169            peer_subscriptions: Vec::new(),
170            #[cfg(feature = "software-crypto")]
171            pfs: PfsSessionManager::new(),
172        }
173    }
174
175    pub(crate) fn peer_subscriptions_mut(&mut self, peer: PublicKey) -> &mut PeerSubscriptions {
176        if let Some(index) = self
177            .peer_subscriptions
178            .iter()
179            .position(|entry| entry.peer == peer)
180        {
181            return &mut self.peer_subscriptions[index];
182        }
183        self.peer_subscriptions.push(PeerSubscriptions::new(peer));
184        self.peer_subscriptions
185            .last_mut()
186            .expect("peer subscriptions just inserted")
187    }
188
189    pub(crate) fn find_peer_subscriptions_mut(
190        &mut self,
191        peer: PublicKey,
192    ) -> Option<&mut PeerSubscriptions> {
193        self.peer_subscriptions
194            .iter_mut()
195            .find(|entry| entry.peer == peer)
196    }
197}
198
199#[derive(Clone, Copy, Debug, PartialEq, Eq)]
200#[cfg(feature = "software-crypto")]
201pub enum PfsStatus {
202    Inactive,
203    Requested,
204    Active {
205        local_ephemeral_id: LocalIdentityId,
206        peer_ephemeral: PublicKey,
207        expires_ms: u64,
208    },
209}
210
211#[derive(Clone, Copy, Debug, PartialEq, Eq)]
212pub(crate) enum PfsLifecycle {
213    Established(PublicKey),
214    Ended(PublicKey),
215}
216
217#[cfg(feature = "software-crypto")]
218pub(crate) struct ChannelMembershipEntry {
219    pub channel: Channel,
220    /// Monotonically increasing per this (node, channel) pair.
221    /// Bumped on leave; BoundChannel snapshots this at creation.
222    pub generation: u64,
223    /// False after leave(); entry kept until re-joined or GC'd.
224    pub active: bool,
225}
226
227impl NodeMembership {
228    pub fn new() -> Self {
229        Self {
230            #[cfg(feature = "software-crypto")]
231            channels: Vec::new(),
232        }
233    }
234}
235
236/// Errors produced by node-layer operations.
237#[derive(Clone, PartialEq, Eq)]
238pub enum NodeError<M: MacBackend> {
239    /// Underlying MAC-layer failure.
240    Mac(crate::mac::MacBackendError<M::SendError, M::CapacityError>),
241    /// The node has left this channel since the handle was created.
242    ChannelLeft,
243    /// The peer is not registered.
244    PeerMissing,
245    /// Control-payload encode failure.
246    AppEncode(AppEncodeError),
247    /// The referenced PFS session was missing.
248    #[cfg(feature = "software-crypto")]
249    PfsSessionMissing,
250    /// The PFS session table is full.
251    #[cfg(feature = "software-crypto")]
252    PfsSessionTableFull,
253    /// Crypto failure during PFS processing.
254    #[cfg(feature = "software-crypto")]
255    Crypto(umsh_crypto::CryptoError),
256}
257
258impl<M> core::fmt::Debug for NodeError<M>
259where
260    M: MacBackend,
261    M::SendError: core::fmt::Debug,
262    M::CapacityError: core::fmt::Debug,
263{
264    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
265        match self {
266            Self::Mac(e) => f.debug_tuple("Mac").field(e).finish(),
267            Self::ChannelLeft => f.write_str("ChannelLeft"),
268            Self::PeerMissing => f.write_str("PeerMissing"),
269            Self::AppEncode(e) => f.debug_tuple("AppEncode").field(e).finish(),
270            #[cfg(feature = "software-crypto")]
271            Self::PfsSessionMissing => f.write_str("PfsSessionMissing"),
272            #[cfg(feature = "software-crypto")]
273            Self::PfsSessionTableFull => f.write_str("PfsSessionTableFull"),
274            #[cfg(feature = "software-crypto")]
275            Self::Crypto(e) => f.debug_tuple("Crypto").field(e).finish(),
276        }
277    }
278}
279
280impl<M: MacBackend> From<crate::mac::MacBackendError<M::SendError, M::CapacityError>>
281    for NodeError<M>
282{
283    fn from(e: crate::mac::MacBackendError<M::SendError, M::CapacityError>) -> Self {
284        Self::Mac(e)
285    }
286}
287
288impl<M: MacBackend> From<AppEncodeError> for NodeError<M> {
289    fn from(e: AppEncodeError) -> Self {
290        Self::AppEncode(e)
291    }
292}
293
294#[cfg(feature = "software-crypto")]
295impl<M: MacBackend> From<umsh_crypto::CryptoError> for NodeError<M> {
296    fn from(e: umsh_crypto::CryptoError) -> Self {
297        Self::Crypto(e)
298    }
299}
300
301/// Per-identity application handle.
302///
303/// `LocalNode` owns its channel membership set via shared interior state.
304/// Different `LocalNode` instances (for different identities) may join
305/// different channel sets.
306#[derive(Clone)]
307pub struct LocalNode<M: MacBackend> {
308    identity_id: LocalIdentityId,
309    mac: M,
310    dispatcher: Rc<RefCell<EventDispatcher>>,
311    #[allow(dead_code)] // Used by channel methods (software-crypto feature)
312    membership: Rc<RefCell<NodeMembership>>,
313    state: Rc<RefCell<LocalNodeState>>,
314}
315
316impl<M: MacBackend> LocalNode<M> {
317    /// Create a new local node.
318    pub(crate) fn new(
319        identity_id: LocalIdentityId,
320        mac: M,
321        dispatcher: Rc<RefCell<EventDispatcher>>,
322        membership: Rc<RefCell<NodeMembership>>,
323        state: Rc<RefCell<LocalNodeState>>,
324    ) -> Self {
325        Self {
326            identity_id,
327            mac,
328            dispatcher,
329            membership,
330            state,
331        }
332    }
333
334    /// The identity slot this node operates on.
335    pub fn identity_id(&self) -> LocalIdentityId {
336        self.identity_id
337    }
338
339    /// Create a peer connection (registers peer in MAC if new).
340    pub async fn peer(&self, key: PublicKey) -> Result<PeerConnection<Self>, NodeError<M>> {
341        self.mac.add_peer(key).await?;
342        Ok(PeerConnection::new(self.clone(), key))
343    }
344
345    fn add_receive_handler<F>(&self, handler: F) -> SubscriptionHandle
346    where
347        F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
348    {
349        self.state
350            .borrow_mut()
351            .receive_handlers
352            .insert(Box::new(handler))
353    }
354
355    pub fn on_receive<F>(&self, handler: F) -> Subscription
356    where
357        F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
358    {
359        let handle = self.add_receive_handler(handler);
360        let state = self.state.clone();
361        Subscription::new(move || state.borrow_mut().receive_handlers.remove(handle))
362    }
363
364    fn add_node_discovered_handler<F>(&self, handler: F) -> SubscriptionHandle
365    where
366        F: FnMut(PublicKey, Option<&str>) + 'static,
367    {
368        self.state
369            .borrow_mut()
370            .node_discovered_handlers
371            .insert(Box::new(handler))
372    }
373
374    pub fn on_node_discovered<F>(&self, handler: F) -> Subscription
375    where
376        F: FnMut(PublicKey, Option<&str>) + 'static,
377    {
378        let handle = self.add_node_discovered_handler(handler);
379        let state = self.state.clone();
380        Subscription::new(move || state.borrow_mut().node_discovered_handlers.remove(handle))
381    }
382
383    fn add_beacon_handler<F>(&self, handler: F) -> SubscriptionHandle
384    where
385        F: FnMut(NodeHint, Option<PublicKey>) + 'static,
386    {
387        self.state
388            .borrow_mut()
389            .beacon_handlers
390            .insert(Box::new(handler))
391    }
392
393    pub fn on_beacon<F>(&self, handler: F) -> Subscription
394    where
395        F: FnMut(NodeHint, Option<PublicKey>) + 'static,
396    {
397        let handle = self.add_beacon_handler(handler);
398        let state = self.state.clone();
399        Subscription::new(move || state.borrow_mut().beacon_handlers.remove(handle))
400    }
401
402    fn add_mac_command_handler<F>(&self, handler: F) -> SubscriptionHandle
403    where
404        F: FnMut(PublicKey, &OwnedMacCommand) + 'static,
405    {
406        self.state
407            .borrow_mut()
408            .mac_command_handlers
409            .insert(Box::new(handler))
410    }
411
412    pub fn on_mac_command<F>(&self, handler: F) -> Subscription
413    where
414        F: FnMut(PublicKey, &OwnedMacCommand) + 'static,
415    {
416        let handle = self.add_mac_command_handler(handler);
417        let state = self.state.clone();
418        Subscription::new(move || state.borrow_mut().mac_command_handlers.remove(handle))
419    }
420
421    fn add_ack_received_handler<F>(&self, handler: F) -> SubscriptionHandle
422    where
423        F: FnMut(PublicKey, SendToken) + 'static,
424    {
425        self.state
426            .borrow_mut()
427            .ack_received_handlers
428            .insert(Box::new(handler))
429    }
430
431    pub fn on_ack_received<F>(&self, handler: F) -> Subscription
432    where
433        F: FnMut(PublicKey, SendToken) + 'static,
434    {
435        let handle = self.add_ack_received_handler(handler);
436        let state = self.state.clone();
437        Subscription::new(move || state.borrow_mut().ack_received_handlers.remove(handle))
438    }
439
440    fn add_ack_timeout_handler<F>(&self, handler: F) -> SubscriptionHandle
441    where
442        F: FnMut(PublicKey, SendToken) + 'static,
443    {
444        self.state
445            .borrow_mut()
446            .ack_timeout_handlers
447            .insert(Box::new(handler))
448    }
449
450    pub fn on_ack_timeout<F>(&self, handler: F) -> Subscription
451    where
452        F: FnMut(PublicKey, SendToken) + 'static,
453    {
454        let handle = self.add_ack_timeout_handler(handler);
455        let state = self.state.clone();
456        Subscription::new(move || state.borrow_mut().ack_timeout_handlers.remove(handle))
457    }
458
459    fn add_pfs_established_handler<F>(&self, handler: F) -> SubscriptionHandle
460    where
461        F: FnMut(PublicKey) + 'static,
462    {
463        self.state
464            .borrow_mut()
465            .pfs_established_handlers
466            .insert(Box::new(handler))
467    }
468
469    pub fn on_pfs_established<F>(&self, handler: F) -> Subscription
470    where
471        F: FnMut(PublicKey) + 'static,
472    {
473        let handle = self.add_pfs_established_handler(handler);
474        let state = self.state.clone();
475        Subscription::new(move || state.borrow_mut().pfs_established_handlers.remove(handle))
476    }
477
478    fn add_pfs_ended_handler<F>(&self, handler: F) -> SubscriptionHandle
479    where
480        F: FnMut(PublicKey) + 'static,
481    {
482        self.state
483            .borrow_mut()
484            .pfs_ended_handlers
485            .insert(Box::new(handler))
486    }
487
488    pub fn on_pfs_ended<F>(&self, handler: F) -> Subscription
489    where
490        F: FnMut(PublicKey) + 'static,
491    {
492        let handle = self.add_pfs_ended_handler(handler);
493        let state = self.state.clone();
494        Subscription::new(move || state.borrow_mut().pfs_ended_handlers.remove(handle))
495    }
496
497    #[cfg(feature = "software-crypto")]
498    pub async fn request_pfs(
499        &self,
500        peer: &PublicKey,
501        duration_minutes: u16,
502        options: &SendOptions,
503    ) -> Result<SendProgressTicket, NodeError<M>> {
504        let receipt = self
505            .state
506            .borrow_mut()
507            .pfs
508            .request_session(&self.mac, self.identity_id, peer, duration_minutes, options)
509            .await?;
510        Ok(self.register_ack_send(self.identity_id, receipt))
511    }
512
513    #[cfg(feature = "software-crypto")]
514    pub async fn end_pfs(
515        &self,
516        peer: &PublicKey,
517        options: &SendOptions,
518    ) -> Result<(), NodeError<M>> {
519        let _ = self
520            .state
521            .borrow_mut()
522            .pfs
523            .end_session(&self.mac, self.identity_id, peer, true, options)
524            .await?;
525        Ok(())
526    }
527
528    #[cfg(feature = "software-crypto")]
529    pub async fn pfs_status(&self, peer: &PublicKey) -> Result<PfsStatus, NodeError<M>> {
530        let now_ms = self.mac.now_ms().await;
531        let state = self.state.borrow();
532        if let Some(session) = state
533            .pfs
534            .sessions()
535            .iter()
536            .find(|session| session.peer_long_term == *peer)
537        {
538            return Ok(match session.state {
539                PfsState::Requested => PfsStatus::Requested,
540                PfsState::Active => PfsStatus::Active {
541                    local_ephemeral_id: session.local_ephemeral_id,
542                    peer_ephemeral: session.peer_ephemeral,
543                    expires_ms: session.expires_ms,
544                },
545            });
546        }
547        if state.pfs.active_route(peer, now_ms).is_some() {
548            // Defensive fallback for any future session bookkeeping changes.
549            return Ok(PfsStatus::Requested);
550        }
551        Ok(PfsStatus::Inactive)
552    }
553
554    /// Join a channel. Registers the channel key in the MAC if this is
555    /// the first node to join it. Returns the bound channel handle.
556    #[cfg(feature = "software-crypto")]
557    pub async fn join(&self, channel: &Channel) -> Result<BoundChannel<M>, NodeError<M>> {
558        let mut membership = self.membership.borrow_mut();
559
560        // Check if already joined.
561        if let Some(entry) = membership
562            .channels
563            .iter_mut()
564            .find(|e| e.channel == *channel)
565        {
566            if entry.active {
567                return Ok(BoundChannel {
568                    node: self.clone(),
569                    channel: entry.channel.clone(),
570                    join_generation: entry.generation,
571                });
572            }
573            // Re-joining after leave — mark active again, bump generation.
574            entry.active = true;
575            entry.generation = entry.generation.wrapping_add(1);
576            return Ok(BoundChannel {
577                node: self.clone(),
578                channel: entry.channel.clone(),
579                join_generation: entry.generation,
580            });
581        }
582
583        // New channel — register with MAC.
584        drop(membership);
585        self.mac.add_private_channel(channel.key().clone()).await?;
586        let mut membership = self.membership.borrow_mut();
587
588        let generation = 0;
589        membership.channels.push(ChannelMembershipEntry {
590            channel: channel.clone(),
591            generation,
592            active: true,
593        });
594
595        Ok(BoundChannel {
596            node: self.clone(),
597            channel: channel.clone(),
598            join_generation: generation,
599        })
600    }
601
602    /// Leave a channel. Marks the membership entry inactive and bumps
603    /// that entry's generation counter.
604    #[cfg(feature = "software-crypto")]
605    pub fn leave(&self, channel: &Channel) -> Result<(), NodeError<M>> {
606        let mut membership = self.membership.borrow_mut();
607        if let Some(entry) = membership
608            .channels
609            .iter_mut()
610            .find(|e| e.channel == *channel && e.active)
611        {
612            entry.active = false;
613            entry.generation = entry.generation.wrapping_add(1);
614        }
615        Ok(())
616    }
617
618    /// Get a handle to an already-joined channel.
619    #[cfg(feature = "software-crypto")]
620    pub fn bound_channel(&self, channel: &Channel) -> Option<BoundChannel<M>> {
621        let membership = self.membership.borrow();
622        membership
623            .channels
624            .iter()
625            .find(|e| e.channel == *channel && e.active)
626            .map(|entry| BoundChannel {
627                node: self.clone(),
628                channel: entry.channel.clone(),
629                join_generation: entry.generation,
630            })
631    }
632
633    /// List all joined channels.
634    #[cfg(feature = "software-crypto")]
635    pub fn bound_channels(&self) -> Vec<BoundChannel<M>> {
636        let membership = self.membership.borrow();
637        membership
638            .channels
639            .iter()
640            .filter(|e| e.active)
641            .map(|entry| BoundChannel {
642                node: self.clone(),
643                channel: entry.channel.clone(),
644                join_generation: entry.generation,
645            })
646            .collect()
647    }
648
649    /// Register an ACK-tracked send with the dispatcher and return a progress ticket.
650    fn register_ack_send(
651        &self,
652        send_identity_id: LocalIdentityId,
653        receipt: Option<umsh_mac::SendReceipt>,
654    ) -> SendProgressTicket {
655        match receipt {
656            Some(receipt) => {
657                let token = SendToken::new(send_identity_id, receipt);
658                let state = self.dispatcher.borrow_mut().register_ticket(token, false);
659                SendProgressTicket::new(token, state)
660            }
661            // Unicast/blind-unicast without ACK requested — no tracking.
662            None => SendProgressTicket::fire_and_forget(),
663        }
664    }
665
666    /// Register a non-ACK send (broadcast/multicast) with the dispatcher.
667    ///
668    /// The ticket starts unfinished. The dispatcher marks it transmitted and
669    /// finished when the MAC fires the `Transmitted` event with this receipt.
670    fn register_non_ack_send(
671        &self,
672        send_identity_id: LocalIdentityId,
673        receipt: umsh_mac::SendReceipt,
674    ) -> SendProgressTicket {
675        let token = SendToken::new(send_identity_id, receipt);
676        let state = self.dispatcher.borrow_mut().register_ticket(token, true);
677        SendProgressTicket::new(token, state)
678    }
679
680    pub(crate) fn state(&self) -> &Rc<RefCell<LocalNodeState>> {
681        &self.state
682    }
683
684    #[cfg(feature = "software-crypto")]
685    pub(crate) fn owns_ephemeral_identity(&self, identity_id: LocalIdentityId) -> bool {
686        self.state
687            .borrow()
688            .pfs
689            .sessions()
690            .iter()
691            .any(|session| session.local_ephemeral_id == identity_id)
692    }
693
694    #[cfg(feature = "software-crypto")]
695    pub(crate) async fn handle_pfs_command(
696        &self,
697        from: &PublicKey,
698        command: &OwnedMacCommand,
699        options: &SendOptions,
700    ) -> Result<Option<PfsLifecycle>, NodeError<M>> {
701        match *command {
702            OwnedMacCommand::PfsSessionRequest {
703                ephemeral_key,
704                duration_minutes,
705            } => {
706                self.state
707                    .borrow_mut()
708                    .pfs
709                    .accept_request(
710                        &self.mac,
711                        self.identity_id,
712                        *from,
713                        ephemeral_key,
714                        duration_minutes,
715                        options,
716                    )
717                    .await?;
718                Ok(Some(PfsLifecycle::Established(*from)))
719            }
720            OwnedMacCommand::PfsSessionResponse {
721                ephemeral_key,
722                duration_minutes,
723            } => {
724                if self
725                    .state
726                    .borrow_mut()
727                    .pfs
728                    .accept_response(
729                        &self.mac,
730                        self.identity_id,
731                        *from,
732                        ephemeral_key,
733                        duration_minutes,
734                    )
735                    .await?
736                {
737                    Ok(Some(PfsLifecycle::Established(*from)))
738                } else {
739                    Ok(None)
740                }
741            }
742            OwnedMacCommand::EndPfsSession => {
743                let _ = self
744                    .state
745                    .borrow_mut()
746                    .pfs
747                    .end_session(&self.mac, self.identity_id, from, false, options)
748                    .await?;
749                Ok(Some(PfsLifecycle::Ended(*from)))
750            }
751            _ => Ok(None),
752        }
753    }
754
755    pub(crate) fn dispatch_received_packet(&self, packet: &ReceivedPacketRef<'_>) -> bool {
756        let peer = packet.from_key();
757        let mut state = self.state.borrow_mut();
758
759        if let Some(peer) = peer.map(|peer| canonical_peer(&state, peer)) {
760            if let Some(entry) = state
761                .peer_subscriptions
762                .iter_mut()
763                .find(|entry| entry.peer == peer)
764            {
765                if entry.receive_handlers.any_mut(|handler| handler(packet)) {
766                    return true;
767                }
768            }
769        }
770
771        state.receive_handlers.any_mut(|handler| handler(packet))
772    }
773
774    pub(crate) fn dispatch_node_discovered(&self, key: PublicKey, name: Option<&str>) {
775        self.state
776            .borrow_mut()
777            .node_discovered_handlers
778            .for_each_mut(|handler| handler(key, name));
779    }
780
781    pub(crate) fn dispatch_beacon(&self, from_hint: NodeHint, from_key: Option<PublicKey>) {
782        self.state
783            .borrow_mut()
784            .beacon_handlers
785            .for_each_mut(|handler| handler(from_hint, from_key));
786    }
787
788    pub(crate) fn dispatch_mac_command(&self, from: PublicKey, command: &OwnedMacCommand) {
789        self.state
790            .borrow_mut()
791            .mac_command_handlers
792            .for_each_mut(|handler| handler(from, command));
793    }
794
795    pub(crate) fn dispatch_ack_received(&self, peer: PublicKey, token: SendToken) {
796        let mut state = self.state.borrow_mut();
797        let peer = canonical_peer(&state, peer);
798        if let Some(entry) = state
799            .peer_subscriptions
800            .iter_mut()
801            .find(|entry| entry.peer == peer)
802        {
803            entry
804                .ack_received_handlers
805                .for_each_mut(|handler| handler(token));
806        }
807        state
808            .ack_received_handlers
809            .for_each_mut(|handler| handler(peer, token));
810    }
811
812    pub(crate) fn dispatch_ack_timeout(&self, peer: PublicKey, token: SendToken) {
813        let mut state = self.state.borrow_mut();
814        let peer = canonical_peer(&state, peer);
815        if let Some(entry) = state
816            .peer_subscriptions
817            .iter_mut()
818            .find(|entry| entry.peer == peer)
819        {
820            entry
821                .ack_timeout_handlers
822                .for_each_mut(|handler| handler(token));
823        }
824        state
825            .ack_timeout_handlers
826            .for_each_mut(|handler| handler(peer, token));
827    }
828
829    pub(crate) fn dispatch_pfs_established(&self, peer: PublicKey) {
830        let mut state = self.state.borrow_mut();
831        let peer = canonical_peer(&state, peer);
832        if let Some(entry) = state
833            .peer_subscriptions
834            .iter_mut()
835            .find(|entry| entry.peer == peer)
836        {
837            entry
838                .pfs_established_handlers
839                .for_each_mut(|handler| handler());
840        }
841        state
842            .pfs_established_handlers
843            .for_each_mut(|handler| handler(peer));
844    }
845
846    pub(crate) fn dispatch_pfs_ended(&self, peer: PublicKey) {
847        let mut state = self.state.borrow_mut();
848        let peer = canonical_peer(&state, peer);
849        if let Some(entry) = state
850            .peer_subscriptions
851            .iter_mut()
852            .find(|entry| entry.peer == peer)
853        {
854            entry.pfs_ended_handlers.for_each_mut(|handler| handler());
855        }
856        state
857            .pfs_ended_handlers
858            .for_each_mut(|handler| handler(peer));
859    }
860
861    pub(crate) async fn expire_pfs_sessions(&self) -> Result<Vec<PublicKey>, NodeError<M>> {
862        #[cfg(feature = "software-crypto")]
863        {
864            let now_ms = self.mac.now_ms().await;
865            return self
866                .state
867                .borrow_mut()
868                .pfs
869                .expire_sessions(&self.mac, now_ms)
870                .await;
871        }
872        #[cfg(not(feature = "software-crypto"))]
873        {
874            Ok(Vec::new())
875        }
876    }
877}
878
879fn canonical_peer(state: &LocalNodeState, peer: PublicKey) -> PublicKey {
880    #[cfg(feature = "software-crypto")]
881    {
882        if let Some(session) = state
883            .pfs
884            .sessions()
885            .iter()
886            .find(|session| session.state == PfsState::Active && session.peer_ephemeral == peer)
887        {
888            return session.peer_long_term;
889        }
890    }
891    peer
892}
893
894impl<M: MacBackend> Transport for LocalNode<M> {
895    type Error = NodeError<M>;
896
897    async fn send(
898        &self,
899        to: &PublicKey,
900        payload: &[u8],
901        options: &SendOptions,
902    ) -> Result<SendProgressTicket, Self::Error> {
903        #[cfg(feature = "software-crypto")]
904        let (send_identity_id, receipt) = {
905            let now_ms = self.mac.now_ms().await;
906            if let Some((local_id, peer_ephemeral)) =
907                self.state.borrow().pfs.active_route(to, now_ms)
908            {
909                let receipt = self
910                    .mac
911                    .send_unicast(local_id, &peer_ephemeral, payload, options)
912                    .await?;
913                (local_id, receipt)
914            } else {
915                let receipt = self
916                    .mac
917                    .send_unicast(self.identity_id, to, payload, options)
918                    .await?;
919                (self.identity_id, receipt)
920            }
921        };
922        #[cfg(not(feature = "software-crypto"))]
923        let (send_identity_id, receipt) = (
924            self.identity_id,
925            self.mac
926                .send_unicast(self.identity_id, to, payload, options)
927                .await?,
928        );
929        Ok(self.register_ack_send(send_identity_id, receipt))
930    }
931
932    async fn send_all(
933        &self,
934        payload: &[u8],
935        options: &SendOptions,
936    ) -> Result<SendProgressTicket, Self::Error> {
937        let receipt = self
938            .mac
939            .send_broadcast(self.identity_id, payload, options)
940            .await?;
941        Ok(self.register_non_ack_send(self.identity_id, receipt))
942    }
943}
944
945/// A channel bound to a specific `LocalNode`. Implements `Transport`.
946///
947/// Holds a snapshot of the per-channel membership generation at creation
948/// time. If the node leaves this channel, operations return
949/// `NodeError::ChannelLeft`.
950#[cfg(feature = "software-crypto")]
951#[derive(Clone)]
952pub struct BoundChannel<M: MacBackend> {
953    node: LocalNode<M>,
954    channel: Channel,
955    join_generation: u64,
956}
957
958#[cfg(feature = "software-crypto")]
959impl<M: MacBackend> BoundChannel<M> {
960    /// The underlying channel descriptor.
961    pub fn channel(&self) -> &Channel {
962        &self.channel
963    }
964
965    /// True if the node is still a member of this channel.
966    pub fn is_active(&self) -> bool {
967        let membership = self.node.membership.borrow();
968        membership
969            .channels
970            .iter()
971            .any(|e| e.channel == self.channel && e.active && e.generation == self.join_generation)
972    }
973
974    /// Create a peer connection through this channel.
975    pub fn peer(&self, key: PublicKey) -> PeerConnection<Self> {
976        PeerConnection::new(self.clone(), key)
977    }
978
979    /// Check membership is still valid.
980    fn check_active(&self) -> Result<(), NodeError<M>> {
981        if self.is_active() {
982            Ok(())
983        } else {
984            Err(NodeError::ChannelLeft)
985        }
986    }
987
988    /// Return the owning local node for this bound channel.
989    pub fn node(&self) -> &LocalNode<M> {
990        &self.node
991    }
992}
993
994#[cfg(feature = "software-crypto")]
995impl<M: MacBackend> Transport for BoundChannel<M> {
996    type Error = NodeError<M>;
997
998    async fn send(
999        &self,
1000        to: &PublicKey,
1001        payload: &[u8],
1002        options: &SendOptions,
1003    ) -> Result<SendProgressTicket, Self::Error> {
1004        self.check_active()?;
1005        let receipt = self
1006            .node
1007            .mac
1008            .send_blind_unicast(
1009                self.node.identity_id,
1010                to,
1011                self.channel.channel_id(),
1012                payload,
1013                options,
1014            )
1015            .await?;
1016        Ok(self.node.register_ack_send(self.node.identity_id, receipt))
1017    }
1018
1019    async fn send_all(
1020        &self,
1021        payload: &[u8],
1022        options: &SendOptions,
1023    ) -> Result<SendProgressTicket, Self::Error> {
1024        self.check_active()?;
1025        let receipt = self
1026            .node
1027            .mac
1028            .send_multicast(
1029                self.node.identity_id,
1030                self.channel.channel_id(),
1031                payload,
1032                options,
1033            )
1034            .await?;
1035        Ok(self
1036            .node
1037            .register_non_ack_send(self.node.identity_id, receipt))
1038    }
1039}