umsh_node/
node.rs

1use alloc::boxed::Box;
2use alloc::rc::Rc;
3#[cfg(feature = "software-crypto")]
4use alloc::vec::Vec;
5use core::cell::RefCell;
6use core::num::NonZeroU32;
7
8use umsh_core::NodeHint;
9use umsh_core::PublicKey;
10use umsh_mac::{LocalIdentityId, SendOptions};
11
12#[cfg(feature = "software-crypto")]
13use crate::channel::Channel;
14use crate::dispatch::EventDispatcher;
15use crate::mac::MacBackend;
16use crate::peer::PeerConnection;
17#[cfg(feature = "software-crypto")]
18use crate::pfs::{PfsSessionManager, PfsState};
19use crate::receive::ReceivedPacketRef;
20use crate::ticket::{SendProgressTicket, SendToken};
21use crate::transport::Transport;
22use crate::{AppEncodeError, OwnedMacCommand};
23
24/// Per-node shared membership state. All cloned `LocalNode` handles and
25/// their `BoundChannel`s share the same instance via `Rc<RefCell<...>>`.
26pub(crate) struct NodeMembership {
27    #[cfg(feature = "software-crypto")]
28    pub channels: Vec<ChannelMembershipEntry>,
29}
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
32pub(crate) struct SubscriptionHandle(NonZeroU32);
33
34/// Owned subscription guard.
35///
36/// Dropping the value automatically unregisters the callback.
37pub struct Subscription {
38    cancel: Rc<RefCell<Option<Box<dyn FnMut() -> bool>>>>,
39}
40
41impl Subscription {
42    pub(crate) fn new(cancel: impl FnMut() -> bool + 'static) -> Self {
43        Self {
44            cancel: Rc::new(RefCell::new(Some(Box::new(cancel)))),
45        }
46    }
47
48    /// Unregister immediately instead of waiting for drop.
49    pub fn unsubscribe(self) -> bool {
50        Self::run_cancel(&self.cancel)
51    }
52
53    fn run_cancel(cancel: &Rc<RefCell<Option<Box<dyn FnMut() -> bool>>>>) -> bool {
54        let Some(mut cancel) = cancel.borrow_mut().take() else {
55            return false;
56        };
57        cancel()
58    }
59}
60
61impl Drop for Subscription {
62    fn drop(&mut self) {
63        let _ = Self::run_cancel(&self.cancel);
64    }
65}
66
67pub(crate) struct HandlerTable<T> {
68    slots: Vec<Option<T>>,
69}
70
71impl<T> Default for HandlerTable<T> {
72    fn default() -> Self {
73        Self { slots: Vec::new() }
74    }
75}
76
77impl<T> HandlerTable<T> {
78    pub(crate) fn insert(&mut self, handler: T) -> SubscriptionHandle {
79        if let Some((index, slot)) = self
80            .slots
81            .iter_mut()
82            .enumerate()
83            .find(|(_, slot)| slot.is_none())
84        {
85            *slot = Some(handler);
86            return SubscriptionHandle(NonZeroU32::new((index + 1) as u32).unwrap());
87        }
88        self.slots.push(Some(handler));
89        SubscriptionHandle(NonZeroU32::new(self.slots.len() as u32).unwrap())
90    }
91
92    pub(crate) fn remove(&mut self, handle: SubscriptionHandle) -> bool {
93        let index = handle.0.get() as usize - 1;
94        let Some(slot) = self.slots.get_mut(index) else {
95            return false;
96        };
97        slot.take().is_some()
98    }
99
100    fn any_mut(&mut self, mut f: impl FnMut(&mut T) -> bool) -> bool {
101        for slot in &mut self.slots {
102            let Some(handler) = slot.as_mut() else {
103                continue;
104            };
105            if f(handler) {
106                return true;
107            }
108        }
109        false
110    }
111
112    fn for_each_mut(&mut self, mut f: impl FnMut(&mut T)) {
113        for slot in &mut self.slots {
114            let Some(handler) = slot.as_mut() else {
115                continue;
116            };
117            f(handler);
118        }
119    }
120}
121
122pub(crate) struct PeerSubscriptions {
123    peer: PublicKey,
124    pub(crate) receive_handlers: HandlerTable<Box<dyn FnMut(&ReceivedPacketRef<'_>) -> bool>>,
125    pub(crate) ack_received_handlers: HandlerTable<Box<dyn FnMut(SendToken)>>,
126    pub(crate) ack_timeout_handlers: HandlerTable<Box<dyn FnMut(SendToken)>>,
127    pub(crate) pfs_established_handlers: HandlerTable<Box<dyn FnMut()>>,
128    pub(crate) pfs_ended_handlers: HandlerTable<Box<dyn FnMut()>>,
129}
130
131impl PeerSubscriptions {
132    fn new(peer: PublicKey) -> Self {
133        Self {
134            peer,
135            receive_handlers: HandlerTable::default(),
136            ack_received_handlers: HandlerTable::default(),
137            ack_timeout_handlers: HandlerTable::default(),
138            pfs_established_handlers: HandlerTable::default(),
139            pfs_ended_handlers: HandlerTable::default(),
140        }
141    }
142}
143
144pub(crate) struct LocalNodeState {
145    receive_handlers: HandlerTable<Box<dyn FnMut(&ReceivedPacketRef<'_>) -> bool>>,
146    node_discovered_handlers: HandlerTable<Box<dyn FnMut(PublicKey, Option<&str>)>>,
147    beacon_handlers: HandlerTable<Box<dyn FnMut(NodeHint, Option<PublicKey>)>>,
148    mac_command_handlers: HandlerTable<Box<dyn FnMut(PublicKey, &OwnedMacCommand)>>,
149    ack_received_handlers: HandlerTable<Box<dyn FnMut(PublicKey, SendToken)>>,
150    ack_timeout_handlers: HandlerTable<Box<dyn FnMut(PublicKey, SendToken)>>,
151    pfs_established_handlers: HandlerTable<Box<dyn FnMut(PublicKey)>>,
152    pfs_ended_handlers: HandlerTable<Box<dyn FnMut(PublicKey)>>,
153    peer_subscriptions: Vec<PeerSubscriptions>,
154    #[cfg(feature = "software-crypto")]
155    pfs: PfsSessionManager,
156}
157
158impl LocalNodeState {
159    pub(crate) fn new() -> Self {
160        Self {
161            receive_handlers: HandlerTable::default(),
162            node_discovered_handlers: HandlerTable::default(),
163            beacon_handlers: HandlerTable::default(),
164            mac_command_handlers: HandlerTable::default(),
165            ack_received_handlers: HandlerTable::default(),
166            ack_timeout_handlers: HandlerTable::default(),
167            pfs_established_handlers: HandlerTable::default(),
168            pfs_ended_handlers: HandlerTable::default(),
169            peer_subscriptions: Vec::new(),
170            #[cfg(feature = "software-crypto")]
171            pfs: PfsSessionManager::new(),
172        }
173    }
174
175    pub(crate) fn peer_subscriptions_mut(&mut self, peer: PublicKey) -> &mut PeerSubscriptions {
176        if let Some(index) = self
177            .peer_subscriptions
178            .iter()
179            .position(|entry| entry.peer == peer)
180        {
181            return &mut self.peer_subscriptions[index];
182        }
183        self.peer_subscriptions.push(PeerSubscriptions::new(peer));
184        self.peer_subscriptions
185            .last_mut()
186            .expect("peer subscriptions just inserted")
187    }
188
189    pub(crate) fn find_peer_subscriptions_mut(
190        &mut self,
191        peer: PublicKey,
192    ) -> Option<&mut PeerSubscriptions> {
193        self.peer_subscriptions
194            .iter_mut()
195            .find(|entry| entry.peer == peer)
196    }
197}
198
199#[derive(Clone, Copy, Debug, PartialEq, Eq)]
200#[cfg(feature = "software-crypto")]
201pub enum PfsStatus {
202    Inactive,
203    Requested,
204    Active {
205        local_ephemeral_id: LocalIdentityId,
206        peer_ephemeral: PublicKey,
207        expires_ms: u64,
208    },
209}
210
211#[derive(Clone, Copy, Debug, PartialEq, Eq)]
212pub(crate) enum PfsLifecycle {
213    Established(PublicKey),
214    Ended(PublicKey),
215}
216
217#[cfg(feature = "software-crypto")]
218pub(crate) struct ChannelMembershipEntry {
219    pub channel: Channel,
220    /// Monotonically increasing per this (node, channel) pair.
221    /// Bumped on leave; BoundChannel snapshots this at creation.
222    pub generation: u64,
223    /// False after leave(); entry kept until re-joined or GC'd.
224    pub active: bool,
225}
226
227impl NodeMembership {
228    pub fn new() -> Self {
229        Self {
230            #[cfg(feature = "software-crypto")]
231            channels: Vec::new(),
232        }
233    }
234}
235
236/// Errors produced by node-layer operations.
237#[derive(Clone, PartialEq, Eq)]
238pub enum NodeError<M: MacBackend> {
239    /// Underlying MAC-layer failure.
240    Mac(crate::mac::MacBackendError<M::SendError, M::CapacityError>),
241    /// The node has left this channel since the handle was created.
242    ChannelLeft,
243    /// The peer is not registered.
244    PeerMissing,
245    /// Control-payload encode failure.
246    AppEncode(AppEncodeError),
247    /// The referenced PFS session was missing.
248    #[cfg(feature = "software-crypto")]
249    PfsSessionMissing,
250    /// The PFS session table is full.
251    #[cfg(feature = "software-crypto")]
252    PfsSessionTableFull,
253    /// Crypto failure during PFS processing.
254    #[cfg(feature = "software-crypto")]
255    Crypto(umsh_crypto::CryptoError),
256}
257
258impl<M> core::fmt::Debug for NodeError<M>
259where
260    M: MacBackend,
261    M::SendError: core::fmt::Debug,
262    M::CapacityError: core::fmt::Debug,
263{
264    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
265        match self {
266            Self::Mac(e) => f.debug_tuple("Mac").field(e).finish(),
267            Self::ChannelLeft => f.write_str("ChannelLeft"),
268            Self::PeerMissing => f.write_str("PeerMissing"),
269            Self::AppEncode(e) => f.debug_tuple("AppEncode").field(e).finish(),
270            #[cfg(feature = "software-crypto")]
271            Self::PfsSessionMissing => f.write_str("PfsSessionMissing"),
272            #[cfg(feature = "software-crypto")]
273            Self::PfsSessionTableFull => f.write_str("PfsSessionTableFull"),
274            #[cfg(feature = "software-crypto")]
275            Self::Crypto(e) => f.debug_tuple("Crypto").field(e).finish(),
276        }
277    }
278}
279
280impl<M: MacBackend> From<crate::mac::MacBackendError<M::SendError, M::CapacityError>>
281    for NodeError<M>
282{
283    fn from(e: crate::mac::MacBackendError<M::SendError, M::CapacityError>) -> Self {
284        Self::Mac(e)
285    }
286}
287
288impl<M: MacBackend> From<AppEncodeError> for NodeError<M> {
289    fn from(e: AppEncodeError) -> Self {
290        Self::AppEncode(e)
291    }
292}
293
294#[cfg(feature = "software-crypto")]
295impl<M: MacBackend> From<umsh_crypto::CryptoError> for NodeError<M> {
296    fn from(e: umsh_crypto::CryptoError) -> Self {
297        Self::Crypto(e)
298    }
299}
300
301/// Per-identity application handle.
302///
303/// `LocalNode` owns its channel membership set via shared interior state.
304/// Different `LocalNode` instances (for different identities) may join
305/// different channel sets.
306#[derive(Clone)]
307pub struct LocalNode<M: MacBackend> {
308    identity_id: LocalIdentityId,
309    mac: M,
310    dispatcher: Rc<RefCell<EventDispatcher>>,
311    #[allow(dead_code)] // Used by channel methods (software-crypto feature)
312    membership: Rc<RefCell<NodeMembership>>,
313    state: Rc<RefCell<LocalNodeState>>,
314}
315
316impl<M: MacBackend> LocalNode<M> {
317    /// Create a new local node.
318    pub(crate) fn new(
319        identity_id: LocalIdentityId,
320        mac: M,
321        dispatcher: Rc<RefCell<EventDispatcher>>,
322        membership: Rc<RefCell<NodeMembership>>,
323        state: Rc<RefCell<LocalNodeState>>,
324    ) -> Self {
325        Self {
326            identity_id,
327            mac,
328            dispatcher,
329            membership,
330            state,
331        }
332    }
333
334    /// The identity slot this node operates on.
335    pub fn identity_id(&self) -> LocalIdentityId {
336        self.identity_id
337    }
338
339    /// Create a peer connection (registers peer in MAC if new).
340    pub fn peer(&self, key: PublicKey) -> Result<PeerConnection<Self>, NodeError<M>> {
341        self.mac.add_peer(key)?;
342        Ok(PeerConnection::new(self.clone(), key))
343    }
344
345    fn add_receive_handler<F>(&self, handler: F) -> SubscriptionHandle
346    where
347        F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
348    {
349        self.state
350            .borrow_mut()
351            .receive_handlers
352            .insert(Box::new(handler))
353    }
354
355    pub fn on_receive<F>(&self, handler: F) -> Subscription
356    where
357        F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
358    {
359        let handle = self.add_receive_handler(handler);
360        let state = self.state.clone();
361        Subscription::new(move || state.borrow_mut().receive_handlers.remove(handle))
362    }
363
364    fn add_node_discovered_handler<F>(&self, handler: F) -> SubscriptionHandle
365    where
366        F: FnMut(PublicKey, Option<&str>) + 'static,
367    {
368        self.state
369            .borrow_mut()
370            .node_discovered_handlers
371            .insert(Box::new(handler))
372    }
373
374    pub fn on_node_discovered<F>(&self, handler: F) -> Subscription
375    where
376        F: FnMut(PublicKey, Option<&str>) + 'static,
377    {
378        let handle = self.add_node_discovered_handler(handler);
379        let state = self.state.clone();
380        Subscription::new(move || state.borrow_mut().node_discovered_handlers.remove(handle))
381    }
382
383    fn add_beacon_handler<F>(&self, handler: F) -> SubscriptionHandle
384    where
385        F: FnMut(NodeHint, Option<PublicKey>) + 'static,
386    {
387        self.state
388            .borrow_mut()
389            .beacon_handlers
390            .insert(Box::new(handler))
391    }
392
393    pub fn on_beacon<F>(&self, handler: F) -> Subscription
394    where
395        F: FnMut(NodeHint, Option<PublicKey>) + 'static,
396    {
397        let handle = self.add_beacon_handler(handler);
398        let state = self.state.clone();
399        Subscription::new(move || state.borrow_mut().beacon_handlers.remove(handle))
400    }
401
402    fn add_mac_command_handler<F>(&self, handler: F) -> SubscriptionHandle
403    where
404        F: FnMut(PublicKey, &OwnedMacCommand) + 'static,
405    {
406        self.state
407            .borrow_mut()
408            .mac_command_handlers
409            .insert(Box::new(handler))
410    }
411
412    pub fn on_mac_command<F>(&self, handler: F) -> Subscription
413    where
414        F: FnMut(PublicKey, &OwnedMacCommand) + 'static,
415    {
416        let handle = self.add_mac_command_handler(handler);
417        let state = self.state.clone();
418        Subscription::new(move || state.borrow_mut().mac_command_handlers.remove(handle))
419    }
420
421    fn add_ack_received_handler<F>(&self, handler: F) -> SubscriptionHandle
422    where
423        F: FnMut(PublicKey, SendToken) + 'static,
424    {
425        self.state
426            .borrow_mut()
427            .ack_received_handlers
428            .insert(Box::new(handler))
429    }
430
431    pub fn on_ack_received<F>(&self, handler: F) -> Subscription
432    where
433        F: FnMut(PublicKey, SendToken) + 'static,
434    {
435        let handle = self.add_ack_received_handler(handler);
436        let state = self.state.clone();
437        Subscription::new(move || state.borrow_mut().ack_received_handlers.remove(handle))
438    }
439
440    fn add_ack_timeout_handler<F>(&self, handler: F) -> SubscriptionHandle
441    where
442        F: FnMut(PublicKey, SendToken) + 'static,
443    {
444        self.state
445            .borrow_mut()
446            .ack_timeout_handlers
447            .insert(Box::new(handler))
448    }
449
450    pub fn on_ack_timeout<F>(&self, handler: F) -> Subscription
451    where
452        F: FnMut(PublicKey, SendToken) + 'static,
453    {
454        let handle = self.add_ack_timeout_handler(handler);
455        let state = self.state.clone();
456        Subscription::new(move || state.borrow_mut().ack_timeout_handlers.remove(handle))
457    }
458
459    fn add_pfs_established_handler<F>(&self, handler: F) -> SubscriptionHandle
460    where
461        F: FnMut(PublicKey) + 'static,
462    {
463        self.state
464            .borrow_mut()
465            .pfs_established_handlers
466            .insert(Box::new(handler))
467    }
468
469    pub fn on_pfs_established<F>(&self, handler: F) -> Subscription
470    where
471        F: FnMut(PublicKey) + 'static,
472    {
473        let handle = self.add_pfs_established_handler(handler);
474        let state = self.state.clone();
475        Subscription::new(move || state.borrow_mut().pfs_established_handlers.remove(handle))
476    }
477
478    fn add_pfs_ended_handler<F>(&self, handler: F) -> SubscriptionHandle
479    where
480        F: FnMut(PublicKey) + 'static,
481    {
482        self.state
483            .borrow_mut()
484            .pfs_ended_handlers
485            .insert(Box::new(handler))
486    }
487
488    pub fn on_pfs_ended<F>(&self, handler: F) -> Subscription
489    where
490        F: FnMut(PublicKey) + 'static,
491    {
492        let handle = self.add_pfs_ended_handler(handler);
493        let state = self.state.clone();
494        Subscription::new(move || state.borrow_mut().pfs_ended_handlers.remove(handle))
495    }
496
497    #[cfg(feature = "software-crypto")]
498    pub async fn request_pfs(
499        &self,
500        peer: &PublicKey,
501        duration_minutes: u16,
502        options: &SendOptions,
503    ) -> Result<SendProgressTicket, NodeError<M>> {
504        let receipt = self
505            .state
506            .borrow_mut()
507            .pfs
508            .request_session(&self.mac, self.identity_id, peer, duration_minutes, options)
509            .await?;
510        Ok(self.register_ack_send(self.identity_id, receipt))
511    }
512
513    #[cfg(feature = "software-crypto")]
514    pub async fn end_pfs(
515        &self,
516        peer: &PublicKey,
517        options: &SendOptions,
518    ) -> Result<(), NodeError<M>> {
519        let _ = self
520            .state
521            .borrow_mut()
522            .pfs
523            .end_session(&self.mac, self.identity_id, peer, true, options)
524            .await?;
525        Ok(())
526    }
527
528    #[cfg(feature = "software-crypto")]
529    pub fn pfs_status(&self, peer: &PublicKey) -> Result<PfsStatus, NodeError<M>> {
530        let now_ms = self.mac.now_ms()?;
531        let state = self.state.borrow();
532        if let Some(session) = state
533            .pfs
534            .sessions()
535            .iter()
536            .find(|session| session.peer_long_term == *peer)
537        {
538            return Ok(match session.state {
539                PfsState::Requested => PfsStatus::Requested,
540                PfsState::Active => PfsStatus::Active {
541                    local_ephemeral_id: session.local_ephemeral_id,
542                    peer_ephemeral: session.peer_ephemeral,
543                    expires_ms: session.expires_ms,
544                },
545            });
546        }
547        if state.pfs.active_route(peer, now_ms).is_some() {
548            // Defensive fallback for any future session bookkeeping changes.
549            return Ok(PfsStatus::Requested);
550        }
551        Ok(PfsStatus::Inactive)
552    }
553
554    /// Join a channel. Registers the channel key in the MAC if this is
555    /// the first node to join it. Returns the bound channel handle.
556    #[cfg(feature = "software-crypto")]
557    pub fn join(&self, channel: &Channel) -> Result<BoundChannel<M>, NodeError<M>> {
558        let mut membership = self.membership.borrow_mut();
559
560        // Check if already joined.
561        if let Some(entry) = membership
562            .channels
563            .iter_mut()
564            .find(|e| e.channel == *channel)
565        {
566            if entry.active {
567                return Ok(BoundChannel {
568                    node: self.clone(),
569                    channel: entry.channel.clone(),
570                    join_generation: entry.generation,
571                });
572            }
573            // Re-joining after leave — mark active again, bump generation.
574            entry.active = true;
575            entry.generation = entry.generation.wrapping_add(1);
576            return Ok(BoundChannel {
577                node: self.clone(),
578                channel: entry.channel.clone(),
579                join_generation: entry.generation,
580            });
581        }
582
583        // New channel — register with MAC.
584        self.mac.add_private_channel(channel.key().clone())?;
585
586        let generation = 0;
587        membership.channels.push(ChannelMembershipEntry {
588            channel: channel.clone(),
589            generation,
590            active: true,
591        });
592
593        Ok(BoundChannel {
594            node: self.clone(),
595            channel: channel.clone(),
596            join_generation: generation,
597        })
598    }
599
600    /// Leave a channel. Marks the membership entry inactive and bumps
601    /// that entry's generation counter.
602    #[cfg(feature = "software-crypto")]
603    pub fn leave(&self, channel: &Channel) -> Result<(), NodeError<M>> {
604        let mut membership = self.membership.borrow_mut();
605        if let Some(entry) = membership
606            .channels
607            .iter_mut()
608            .find(|e| e.channel == *channel && e.active)
609        {
610            entry.active = false;
611            entry.generation = entry.generation.wrapping_add(1);
612        }
613        Ok(())
614    }
615
616    /// Get a handle to an already-joined channel.
617    #[cfg(feature = "software-crypto")]
618    pub fn bound_channel(&self, channel: &Channel) -> Option<BoundChannel<M>> {
619        let membership = self.membership.borrow();
620        membership
621            .channels
622            .iter()
623            .find(|e| e.channel == *channel && e.active)
624            .map(|entry| BoundChannel {
625                node: self.clone(),
626                channel: entry.channel.clone(),
627                join_generation: entry.generation,
628            })
629    }
630
631    /// List all joined channels.
632    #[cfg(feature = "software-crypto")]
633    pub fn bound_channels(&self) -> Vec<BoundChannel<M>> {
634        let membership = self.membership.borrow();
635        membership
636            .channels
637            .iter()
638            .filter(|e| e.active)
639            .map(|entry| BoundChannel {
640                node: self.clone(),
641                channel: entry.channel.clone(),
642                join_generation: entry.generation,
643            })
644            .collect()
645    }
646
647    /// Register an ACK-tracked send with the dispatcher and return a progress ticket.
648    fn register_ack_send(
649        &self,
650        send_identity_id: LocalIdentityId,
651        receipt: Option<umsh_mac::SendReceipt>,
652    ) -> SendProgressTicket {
653        match receipt {
654            Some(receipt) => {
655                let token = SendToken::new(send_identity_id, receipt);
656                let state = self.dispatcher.borrow_mut().register_ticket(token, false);
657                SendProgressTicket::new(token, state)
658            }
659            // Unicast/blind-unicast without ACK requested — no tracking.
660            None => SendProgressTicket::fire_and_forget(),
661        }
662    }
663
664    /// Register a non-ACK send (broadcast/multicast) with the dispatcher.
665    ///
666    /// The ticket starts unfinished. The dispatcher marks it transmitted and
667    /// finished when the MAC fires the `Transmitted` event with this receipt.
668    fn register_non_ack_send(
669        &self,
670        send_identity_id: LocalIdentityId,
671        receipt: umsh_mac::SendReceipt,
672    ) -> SendProgressTicket {
673        let token = SendToken::new(send_identity_id, receipt);
674        let state = self.dispatcher.borrow_mut().register_ticket(token, true);
675        SendProgressTicket::new(token, state)
676    }
677
678    pub(crate) fn state(&self) -> &Rc<RefCell<LocalNodeState>> {
679        &self.state
680    }
681
682    #[cfg(feature = "software-crypto")]
683    pub(crate) fn owns_ephemeral_identity(&self, identity_id: LocalIdentityId) -> bool {
684        self.state
685            .borrow()
686            .pfs
687            .sessions()
688            .iter()
689            .any(|session| session.local_ephemeral_id == identity_id)
690    }
691
692    #[cfg(feature = "software-crypto")]
693    pub(crate) async fn handle_pfs_command(
694        &self,
695        from: &PublicKey,
696        command: &OwnedMacCommand,
697        options: &SendOptions,
698    ) -> Result<Option<PfsLifecycle>, NodeError<M>> {
699        match *command {
700            OwnedMacCommand::PfsSessionRequest {
701                ephemeral_key,
702                duration_minutes,
703            } => {
704                self.state
705                    .borrow_mut()
706                    .pfs
707                    .accept_request(
708                        &self.mac,
709                        self.identity_id,
710                        *from,
711                        ephemeral_key,
712                        duration_minutes,
713                        options,
714                    )
715                    .await?;
716                Ok(Some(PfsLifecycle::Established(*from)))
717            }
718            OwnedMacCommand::PfsSessionResponse {
719                ephemeral_key,
720                duration_minutes,
721            } => {
722                if self.state.borrow_mut().pfs.accept_response(
723                    &self.mac,
724                    self.identity_id,
725                    *from,
726                    ephemeral_key,
727                    duration_minutes,
728                )? {
729                    Ok(Some(PfsLifecycle::Established(*from)))
730                } else {
731                    Ok(None)
732                }
733            }
734            OwnedMacCommand::EndPfsSession => {
735                let _ = self
736                    .state
737                    .borrow_mut()
738                    .pfs
739                    .end_session(&self.mac, self.identity_id, from, false, options)
740                    .await?;
741                Ok(Some(PfsLifecycle::Ended(*from)))
742            }
743            _ => Ok(None),
744        }
745    }
746
747    pub(crate) fn dispatch_received_packet(&self, packet: &ReceivedPacketRef<'_>) -> bool {
748        let peer = packet.from_key();
749        let mut state = self.state.borrow_mut();
750
751        if let Some(peer) = peer.map(|peer| canonical_peer(&state, peer)) {
752            if let Some(entry) = state
753                .peer_subscriptions
754                .iter_mut()
755                .find(|entry| entry.peer == peer)
756            {
757                if entry.receive_handlers.any_mut(|handler| handler(packet)) {
758                    return true;
759                }
760            }
761        }
762
763        state.receive_handlers.any_mut(|handler| handler(packet))
764    }
765
766    pub(crate) fn dispatch_node_discovered(&self, key: PublicKey, name: Option<&str>) {
767        self.state
768            .borrow_mut()
769            .node_discovered_handlers
770            .for_each_mut(|handler| handler(key, name));
771    }
772
773    pub(crate) fn dispatch_beacon(&self, from_hint: NodeHint, from_key: Option<PublicKey>) {
774        self.state
775            .borrow_mut()
776            .beacon_handlers
777            .for_each_mut(|handler| handler(from_hint, from_key));
778    }
779
780    pub(crate) fn dispatch_mac_command(&self, from: PublicKey, command: &OwnedMacCommand) {
781        self.state
782            .borrow_mut()
783            .mac_command_handlers
784            .for_each_mut(|handler| handler(from, command));
785    }
786
787    pub(crate) fn dispatch_ack_received(&self, peer: PublicKey, token: SendToken) {
788        let mut state = self.state.borrow_mut();
789        let peer = canonical_peer(&state, peer);
790        if let Some(entry) = state
791            .peer_subscriptions
792            .iter_mut()
793            .find(|entry| entry.peer == peer)
794        {
795            entry
796                .ack_received_handlers
797                .for_each_mut(|handler| handler(token));
798        }
799        state
800            .ack_received_handlers
801            .for_each_mut(|handler| handler(peer, token));
802    }
803
804    pub(crate) fn dispatch_ack_timeout(&self, peer: PublicKey, token: SendToken) {
805        let mut state = self.state.borrow_mut();
806        let peer = canonical_peer(&state, peer);
807        if let Some(entry) = state
808            .peer_subscriptions
809            .iter_mut()
810            .find(|entry| entry.peer == peer)
811        {
812            entry
813                .ack_timeout_handlers
814                .for_each_mut(|handler| handler(token));
815        }
816        state
817            .ack_timeout_handlers
818            .for_each_mut(|handler| handler(peer, token));
819    }
820
821    pub(crate) fn dispatch_pfs_established(&self, peer: PublicKey) {
822        let mut state = self.state.borrow_mut();
823        let peer = canonical_peer(&state, peer);
824        if let Some(entry) = state
825            .peer_subscriptions
826            .iter_mut()
827            .find(|entry| entry.peer == peer)
828        {
829            entry
830                .pfs_established_handlers
831                .for_each_mut(|handler| handler());
832        }
833        state
834            .pfs_established_handlers
835            .for_each_mut(|handler| handler(peer));
836    }
837
838    pub(crate) fn dispatch_pfs_ended(&self, peer: PublicKey) {
839        let mut state = self.state.borrow_mut();
840        let peer = canonical_peer(&state, peer);
841        if let Some(entry) = state
842            .peer_subscriptions
843            .iter_mut()
844            .find(|entry| entry.peer == peer)
845        {
846            entry.pfs_ended_handlers.for_each_mut(|handler| handler());
847        }
848        state
849            .pfs_ended_handlers
850            .for_each_mut(|handler| handler(peer));
851    }
852
853    pub(crate) fn expire_pfs_sessions(&self) -> Result<Vec<PublicKey>, NodeError<M>> {
854        #[cfg(feature = "software-crypto")]
855        {
856            let now_ms = self.mac.now_ms()?;
857            return self
858                .state
859                .borrow_mut()
860                .pfs
861                .expire_sessions(&self.mac, now_ms);
862        }
863        #[cfg(not(feature = "software-crypto"))]
864        {
865            Ok(Vec::new())
866        }
867    }
868}
869
870fn canonical_peer(state: &LocalNodeState, peer: PublicKey) -> PublicKey {
871    #[cfg(feature = "software-crypto")]
872    {
873        if let Some(session) = state
874            .pfs
875            .sessions()
876            .iter()
877            .find(|session| session.state == PfsState::Active && session.peer_ephemeral == peer)
878        {
879            return session.peer_long_term;
880        }
881    }
882    peer
883}
884
885impl<M: MacBackend> Transport for LocalNode<M> {
886    type Error = NodeError<M>;
887
888    async fn send(
889        &self,
890        to: &PublicKey,
891        payload: &[u8],
892        options: &SendOptions,
893    ) -> Result<SendProgressTicket, Self::Error> {
894        #[cfg(feature = "software-crypto")]
895        let (send_identity_id, receipt) = {
896            let now_ms = self.mac.now_ms()?;
897            if let Some((local_id, peer_ephemeral)) =
898                self.state.borrow().pfs.active_route(to, now_ms)
899            {
900                let receipt = self
901                    .mac
902                    .send_unicast(local_id, &peer_ephemeral, payload, options)
903                    .await?;
904                (local_id, receipt)
905            } else {
906                let receipt = self
907                    .mac
908                    .send_unicast(self.identity_id, to, payload, options)
909                    .await?;
910                (self.identity_id, receipt)
911            }
912        };
913        #[cfg(not(feature = "software-crypto"))]
914        let (send_identity_id, receipt) = (
915            self.identity_id,
916            self.mac
917                .send_unicast(self.identity_id, to, payload, options)
918                .await?,
919        );
920        Ok(self.register_ack_send(send_identity_id, receipt))
921    }
922
923    async fn send_all(
924        &self,
925        payload: &[u8],
926        options: &SendOptions,
927    ) -> Result<SendProgressTicket, Self::Error> {
928        let receipt = self
929            .mac
930            .send_broadcast(self.identity_id, payload, options)
931            .await?;
932        Ok(self.register_non_ack_send(self.identity_id, receipt))
933    }
934}
935
936/// A channel bound to a specific `LocalNode`. Implements `Transport`.
937///
938/// Holds a snapshot of the per-channel membership generation at creation
939/// time. If the node leaves this channel, operations return
940/// `NodeError::ChannelLeft`.
941#[cfg(feature = "software-crypto")]
942#[derive(Clone)]
943pub struct BoundChannel<M: MacBackend> {
944    node: LocalNode<M>,
945    channel: Channel,
946    join_generation: u64,
947}
948
949#[cfg(feature = "software-crypto")]
950impl<M: MacBackend> BoundChannel<M> {
951    /// The underlying channel descriptor.
952    pub fn channel(&self) -> &Channel {
953        &self.channel
954    }
955
956    /// True if the node is still a member of this channel.
957    pub fn is_active(&self) -> bool {
958        let membership = self.node.membership.borrow();
959        membership
960            .channels
961            .iter()
962            .any(|e| e.channel == self.channel && e.active && e.generation == self.join_generation)
963    }
964
965    /// Create a peer connection through this channel.
966    pub fn peer(&self, key: PublicKey) -> PeerConnection<Self> {
967        PeerConnection::new(self.clone(), key)
968    }
969
970    /// Check membership is still valid.
971    fn check_active(&self) -> Result<(), NodeError<M>> {
972        if self.is_active() {
973            Ok(())
974        } else {
975            Err(NodeError::ChannelLeft)
976        }
977    }
978
979    /// Return the owning local node for this bound channel.
980    pub fn node(&self) -> &LocalNode<M> {
981        &self.node
982    }
983}
984
985#[cfg(feature = "software-crypto")]
986impl<M: MacBackend> Transport for BoundChannel<M> {
987    type Error = NodeError<M>;
988
989    async fn send(
990        &self,
991        to: &PublicKey,
992        payload: &[u8],
993        options: &SendOptions,
994    ) -> Result<SendProgressTicket, Self::Error> {
995        self.check_active()?;
996        let receipt = self
997            .node
998            .mac
999            .send_blind_unicast(
1000                self.node.identity_id,
1001                to,
1002                self.channel.channel_id(),
1003                payload,
1004                options,
1005            )
1006            .await?;
1007        Ok(self.node.register_ack_send(self.node.identity_id, receipt))
1008    }
1009
1010    async fn send_all(
1011        &self,
1012        payload: &[u8],
1013        options: &SendOptions,
1014    ) -> Result<SendProgressTicket, Self::Error> {
1015        self.check_active()?;
1016        let receipt = self
1017            .node
1018            .mac
1019            .send_multicast(
1020                self.node.identity_id,
1021                self.channel.channel_id(),
1022                payload,
1023                options,
1024            )
1025            .await?;
1026        Ok(self
1027            .node
1028            .register_non_ack_send(self.node.identity_id, receipt))
1029    }
1030}