1use alloc::boxed::Box;
2use alloc::rc::Rc;
3#[cfg(feature = "software-crypto")]
4use alloc::vec::Vec;
5use core::cell::RefCell;
6use core::num::NonZeroU32;
7
8use umsh_core::NodeHint;
9use umsh_core::PublicKey;
10use umsh_mac::{LocalIdentityId, SendOptions};
11
12#[cfg(feature = "software-crypto")]
13use crate::channel::Channel;
14use crate::dispatch::EventDispatcher;
15use crate::mac::MacBackend;
16use crate::peer::PeerConnection;
17#[cfg(feature = "software-crypto")]
18use crate::pfs::{PfsSessionManager, PfsState};
19use crate::receive::ReceivedPacketRef;
20use crate::ticket::{SendProgressTicket, SendToken};
21use crate::transport::Transport;
22use crate::{AppEncodeError, OwnedMacCommand};
23
24pub(crate) struct NodeMembership {
27 #[cfg(feature = "software-crypto")]
28 pub channels: Vec<ChannelMembershipEntry>,
29}
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
32pub(crate) struct SubscriptionHandle(NonZeroU32);
33
34pub struct Subscription {
38 cancel: Rc<RefCell<Option<Box<dyn FnMut() -> bool>>>>,
39}
40
41impl Subscription {
42 pub(crate) fn new(cancel: impl FnMut() -> bool + 'static) -> Self {
43 Self {
44 cancel: Rc::new(RefCell::new(Some(Box::new(cancel)))),
45 }
46 }
47
48 pub fn unsubscribe(self) -> bool {
50 Self::run_cancel(&self.cancel)
51 }
52
53 fn run_cancel(cancel: &Rc<RefCell<Option<Box<dyn FnMut() -> bool>>>>) -> bool {
54 let Some(mut cancel) = cancel.borrow_mut().take() else {
55 return false;
56 };
57 cancel()
58 }
59}
60
61impl Drop for Subscription {
62 fn drop(&mut self) {
63 let _ = Self::run_cancel(&self.cancel);
64 }
65}
66
67pub(crate) struct HandlerTable<T> {
68 slots: Vec<Option<T>>,
69}
70
71impl<T> Default for HandlerTable<T> {
72 fn default() -> Self {
73 Self { slots: Vec::new() }
74 }
75}
76
77impl<T> HandlerTable<T> {
78 pub(crate) fn insert(&mut self, handler: T) -> SubscriptionHandle {
79 if let Some((index, slot)) = self
80 .slots
81 .iter_mut()
82 .enumerate()
83 .find(|(_, slot)| slot.is_none())
84 {
85 *slot = Some(handler);
86 return SubscriptionHandle(NonZeroU32::new((index + 1) as u32).unwrap());
87 }
88 self.slots.push(Some(handler));
89 SubscriptionHandle(NonZeroU32::new(self.slots.len() as u32).unwrap())
90 }
91
92 pub(crate) fn remove(&mut self, handle: SubscriptionHandle) -> bool {
93 let index = handle.0.get() as usize - 1;
94 let Some(slot) = self.slots.get_mut(index) else {
95 return false;
96 };
97 slot.take().is_some()
98 }
99
100 fn any_mut(&mut self, mut f: impl FnMut(&mut T) -> bool) -> bool {
101 for slot in &mut self.slots {
102 let Some(handler) = slot.as_mut() else {
103 continue;
104 };
105 if f(handler) {
106 return true;
107 }
108 }
109 false
110 }
111
112 fn for_each_mut(&mut self, mut f: impl FnMut(&mut T)) {
113 for slot in &mut self.slots {
114 let Some(handler) = slot.as_mut() else {
115 continue;
116 };
117 f(handler);
118 }
119 }
120}
121
122pub(crate) struct PeerSubscriptions {
123 peer: PublicKey,
124 pub(crate) receive_handlers: HandlerTable<Box<dyn FnMut(&ReceivedPacketRef<'_>) -> bool>>,
125 pub(crate) ack_received_handlers: HandlerTable<Box<dyn FnMut(SendToken)>>,
126 pub(crate) ack_timeout_handlers: HandlerTable<Box<dyn FnMut(SendToken)>>,
127 pub(crate) pfs_established_handlers: HandlerTable<Box<dyn FnMut()>>,
128 pub(crate) pfs_ended_handlers: HandlerTable<Box<dyn FnMut()>>,
129}
130
131impl PeerSubscriptions {
132 fn new(peer: PublicKey) -> Self {
133 Self {
134 peer,
135 receive_handlers: HandlerTable::default(),
136 ack_received_handlers: HandlerTable::default(),
137 ack_timeout_handlers: HandlerTable::default(),
138 pfs_established_handlers: HandlerTable::default(),
139 pfs_ended_handlers: HandlerTable::default(),
140 }
141 }
142}
143
144pub(crate) struct LocalNodeState {
145 receive_handlers: HandlerTable<Box<dyn FnMut(&ReceivedPacketRef<'_>) -> bool>>,
146 node_discovered_handlers: HandlerTable<Box<dyn FnMut(PublicKey, Option<&str>)>>,
147 beacon_handlers: HandlerTable<Box<dyn FnMut(NodeHint, Option<PublicKey>)>>,
148 mac_command_handlers: HandlerTable<Box<dyn FnMut(PublicKey, &OwnedMacCommand)>>,
149 ack_received_handlers: HandlerTable<Box<dyn FnMut(PublicKey, SendToken)>>,
150 ack_timeout_handlers: HandlerTable<Box<dyn FnMut(PublicKey, SendToken)>>,
151 pfs_established_handlers: HandlerTable<Box<dyn FnMut(PublicKey)>>,
152 pfs_ended_handlers: HandlerTable<Box<dyn FnMut(PublicKey)>>,
153 peer_subscriptions: Vec<PeerSubscriptions>,
154 #[cfg(feature = "software-crypto")]
155 pfs: PfsSessionManager,
156}
157
158impl LocalNodeState {
159 pub(crate) fn new() -> Self {
160 Self {
161 receive_handlers: HandlerTable::default(),
162 node_discovered_handlers: HandlerTable::default(),
163 beacon_handlers: HandlerTable::default(),
164 mac_command_handlers: HandlerTable::default(),
165 ack_received_handlers: HandlerTable::default(),
166 ack_timeout_handlers: HandlerTable::default(),
167 pfs_established_handlers: HandlerTable::default(),
168 pfs_ended_handlers: HandlerTable::default(),
169 peer_subscriptions: Vec::new(),
170 #[cfg(feature = "software-crypto")]
171 pfs: PfsSessionManager::new(),
172 }
173 }
174
175 pub(crate) fn peer_subscriptions_mut(&mut self, peer: PublicKey) -> &mut PeerSubscriptions {
176 if let Some(index) = self
177 .peer_subscriptions
178 .iter()
179 .position(|entry| entry.peer == peer)
180 {
181 return &mut self.peer_subscriptions[index];
182 }
183 self.peer_subscriptions.push(PeerSubscriptions::new(peer));
184 self.peer_subscriptions
185 .last_mut()
186 .expect("peer subscriptions just inserted")
187 }
188
189 pub(crate) fn find_peer_subscriptions_mut(
190 &mut self,
191 peer: PublicKey,
192 ) -> Option<&mut PeerSubscriptions> {
193 self.peer_subscriptions
194 .iter_mut()
195 .find(|entry| entry.peer == peer)
196 }
197}
198
199#[derive(Clone, Copy, Debug, PartialEq, Eq)]
200#[cfg(feature = "software-crypto")]
201pub enum PfsStatus {
202 Inactive,
203 Requested,
204 Active {
205 local_ephemeral_id: LocalIdentityId,
206 peer_ephemeral: PublicKey,
207 expires_ms: u64,
208 },
209}
210
211#[derive(Clone, Copy, Debug, PartialEq, Eq)]
212pub(crate) enum PfsLifecycle {
213 Established(PublicKey),
214 Ended(PublicKey),
215}
216
217#[cfg(feature = "software-crypto")]
218pub(crate) struct ChannelMembershipEntry {
219 pub channel: Channel,
220 pub generation: u64,
223 pub active: bool,
225}
226
227impl NodeMembership {
228 pub fn new() -> Self {
229 Self {
230 #[cfg(feature = "software-crypto")]
231 channels: Vec::new(),
232 }
233 }
234}
235
236#[derive(Clone, PartialEq, Eq)]
238pub enum NodeError<M: MacBackend> {
239 Mac(crate::mac::MacBackendError<M::SendError, M::CapacityError>),
241 ChannelLeft,
243 PeerMissing,
245 AppEncode(AppEncodeError),
247 #[cfg(feature = "software-crypto")]
249 PfsSessionMissing,
250 #[cfg(feature = "software-crypto")]
252 PfsSessionTableFull,
253 #[cfg(feature = "software-crypto")]
255 Crypto(umsh_crypto::CryptoError),
256}
257
258impl<M> core::fmt::Debug for NodeError<M>
259where
260 M: MacBackend,
261 M::SendError: core::fmt::Debug,
262 M::CapacityError: core::fmt::Debug,
263{
264 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
265 match self {
266 Self::Mac(e) => f.debug_tuple("Mac").field(e).finish(),
267 Self::ChannelLeft => f.write_str("ChannelLeft"),
268 Self::PeerMissing => f.write_str("PeerMissing"),
269 Self::AppEncode(e) => f.debug_tuple("AppEncode").field(e).finish(),
270 #[cfg(feature = "software-crypto")]
271 Self::PfsSessionMissing => f.write_str("PfsSessionMissing"),
272 #[cfg(feature = "software-crypto")]
273 Self::PfsSessionTableFull => f.write_str("PfsSessionTableFull"),
274 #[cfg(feature = "software-crypto")]
275 Self::Crypto(e) => f.debug_tuple("Crypto").field(e).finish(),
276 }
277 }
278}
279
280impl<M: MacBackend> From<crate::mac::MacBackendError<M::SendError, M::CapacityError>>
281 for NodeError<M>
282{
283 fn from(e: crate::mac::MacBackendError<M::SendError, M::CapacityError>) -> Self {
284 Self::Mac(e)
285 }
286}
287
288impl<M: MacBackend> From<AppEncodeError> for NodeError<M> {
289 fn from(e: AppEncodeError) -> Self {
290 Self::AppEncode(e)
291 }
292}
293
294#[cfg(feature = "software-crypto")]
295impl<M: MacBackend> From<umsh_crypto::CryptoError> for NodeError<M> {
296 fn from(e: umsh_crypto::CryptoError) -> Self {
297 Self::Crypto(e)
298 }
299}
300
301#[derive(Clone)]
307pub struct LocalNode<M: MacBackend> {
308 identity_id: LocalIdentityId,
309 mac: M,
310 dispatcher: Rc<RefCell<EventDispatcher>>,
311 #[allow(dead_code)] membership: Rc<RefCell<NodeMembership>>,
313 state: Rc<RefCell<LocalNodeState>>,
314}
315
316impl<M: MacBackend> LocalNode<M> {
317 pub(crate) fn new(
319 identity_id: LocalIdentityId,
320 mac: M,
321 dispatcher: Rc<RefCell<EventDispatcher>>,
322 membership: Rc<RefCell<NodeMembership>>,
323 state: Rc<RefCell<LocalNodeState>>,
324 ) -> Self {
325 Self {
326 identity_id,
327 mac,
328 dispatcher,
329 membership,
330 state,
331 }
332 }
333
334 pub fn identity_id(&self) -> LocalIdentityId {
336 self.identity_id
337 }
338
339 pub async fn peer(&self, key: PublicKey) -> Result<PeerConnection<Self>, NodeError<M>> {
341 self.mac.add_peer(key).await?;
342 Ok(PeerConnection::new(self.clone(), key))
343 }
344
345 fn add_receive_handler<F>(&self, handler: F) -> SubscriptionHandle
346 where
347 F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
348 {
349 self.state
350 .borrow_mut()
351 .receive_handlers
352 .insert(Box::new(handler))
353 }
354
355 pub fn on_receive<F>(&self, handler: F) -> Subscription
356 where
357 F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
358 {
359 let handle = self.add_receive_handler(handler);
360 let state = self.state.clone();
361 Subscription::new(move || state.borrow_mut().receive_handlers.remove(handle))
362 }
363
364 fn add_node_discovered_handler<F>(&self, handler: F) -> SubscriptionHandle
365 where
366 F: FnMut(PublicKey, Option<&str>) + 'static,
367 {
368 self.state
369 .borrow_mut()
370 .node_discovered_handlers
371 .insert(Box::new(handler))
372 }
373
374 pub fn on_node_discovered<F>(&self, handler: F) -> Subscription
375 where
376 F: FnMut(PublicKey, Option<&str>) + 'static,
377 {
378 let handle = self.add_node_discovered_handler(handler);
379 let state = self.state.clone();
380 Subscription::new(move || state.borrow_mut().node_discovered_handlers.remove(handle))
381 }
382
383 fn add_beacon_handler<F>(&self, handler: F) -> SubscriptionHandle
384 where
385 F: FnMut(NodeHint, Option<PublicKey>) + 'static,
386 {
387 self.state
388 .borrow_mut()
389 .beacon_handlers
390 .insert(Box::new(handler))
391 }
392
393 pub fn on_beacon<F>(&self, handler: F) -> Subscription
394 where
395 F: FnMut(NodeHint, Option<PublicKey>) + 'static,
396 {
397 let handle = self.add_beacon_handler(handler);
398 let state = self.state.clone();
399 Subscription::new(move || state.borrow_mut().beacon_handlers.remove(handle))
400 }
401
402 fn add_mac_command_handler<F>(&self, handler: F) -> SubscriptionHandle
403 where
404 F: FnMut(PublicKey, &OwnedMacCommand) + 'static,
405 {
406 self.state
407 .borrow_mut()
408 .mac_command_handlers
409 .insert(Box::new(handler))
410 }
411
412 pub fn on_mac_command<F>(&self, handler: F) -> Subscription
413 where
414 F: FnMut(PublicKey, &OwnedMacCommand) + 'static,
415 {
416 let handle = self.add_mac_command_handler(handler);
417 let state = self.state.clone();
418 Subscription::new(move || state.borrow_mut().mac_command_handlers.remove(handle))
419 }
420
421 fn add_ack_received_handler<F>(&self, handler: F) -> SubscriptionHandle
422 where
423 F: FnMut(PublicKey, SendToken) + 'static,
424 {
425 self.state
426 .borrow_mut()
427 .ack_received_handlers
428 .insert(Box::new(handler))
429 }
430
431 pub fn on_ack_received<F>(&self, handler: F) -> Subscription
432 where
433 F: FnMut(PublicKey, SendToken) + 'static,
434 {
435 let handle = self.add_ack_received_handler(handler);
436 let state = self.state.clone();
437 Subscription::new(move || state.borrow_mut().ack_received_handlers.remove(handle))
438 }
439
440 fn add_ack_timeout_handler<F>(&self, handler: F) -> SubscriptionHandle
441 where
442 F: FnMut(PublicKey, SendToken) + 'static,
443 {
444 self.state
445 .borrow_mut()
446 .ack_timeout_handlers
447 .insert(Box::new(handler))
448 }
449
450 pub fn on_ack_timeout<F>(&self, handler: F) -> Subscription
451 where
452 F: FnMut(PublicKey, SendToken) + 'static,
453 {
454 let handle = self.add_ack_timeout_handler(handler);
455 let state = self.state.clone();
456 Subscription::new(move || state.borrow_mut().ack_timeout_handlers.remove(handle))
457 }
458
459 fn add_pfs_established_handler<F>(&self, handler: F) -> SubscriptionHandle
460 where
461 F: FnMut(PublicKey) + 'static,
462 {
463 self.state
464 .borrow_mut()
465 .pfs_established_handlers
466 .insert(Box::new(handler))
467 }
468
469 pub fn on_pfs_established<F>(&self, handler: F) -> Subscription
470 where
471 F: FnMut(PublicKey) + 'static,
472 {
473 let handle = self.add_pfs_established_handler(handler);
474 let state = self.state.clone();
475 Subscription::new(move || state.borrow_mut().pfs_established_handlers.remove(handle))
476 }
477
478 fn add_pfs_ended_handler<F>(&self, handler: F) -> SubscriptionHandle
479 where
480 F: FnMut(PublicKey) + 'static,
481 {
482 self.state
483 .borrow_mut()
484 .pfs_ended_handlers
485 .insert(Box::new(handler))
486 }
487
488 pub fn on_pfs_ended<F>(&self, handler: F) -> Subscription
489 where
490 F: FnMut(PublicKey) + 'static,
491 {
492 let handle = self.add_pfs_ended_handler(handler);
493 let state = self.state.clone();
494 Subscription::new(move || state.borrow_mut().pfs_ended_handlers.remove(handle))
495 }
496
497 #[cfg(feature = "software-crypto")]
498 pub async fn request_pfs(
499 &self,
500 peer: &PublicKey,
501 duration_minutes: u16,
502 options: &SendOptions,
503 ) -> Result<SendProgressTicket, NodeError<M>> {
504 let receipt = self
505 .state
506 .borrow_mut()
507 .pfs
508 .request_session(&self.mac, self.identity_id, peer, duration_minutes, options)
509 .await?;
510 Ok(self.register_ack_send(self.identity_id, receipt))
511 }
512
513 #[cfg(feature = "software-crypto")]
514 pub async fn end_pfs(
515 &self,
516 peer: &PublicKey,
517 options: &SendOptions,
518 ) -> Result<(), NodeError<M>> {
519 let _ = self
520 .state
521 .borrow_mut()
522 .pfs
523 .end_session(&self.mac, self.identity_id, peer, true, options)
524 .await?;
525 Ok(())
526 }
527
528 #[cfg(feature = "software-crypto")]
529 pub async fn pfs_status(&self, peer: &PublicKey) -> Result<PfsStatus, NodeError<M>> {
530 let now_ms = self.mac.now_ms().await;
531 let state = self.state.borrow();
532 if let Some(session) = state
533 .pfs
534 .sessions()
535 .iter()
536 .find(|session| session.peer_long_term == *peer)
537 {
538 return Ok(match session.state {
539 PfsState::Requested => PfsStatus::Requested,
540 PfsState::Active => PfsStatus::Active {
541 local_ephemeral_id: session.local_ephemeral_id,
542 peer_ephemeral: session.peer_ephemeral,
543 expires_ms: session.expires_ms,
544 },
545 });
546 }
547 if state.pfs.active_route(peer, now_ms).is_some() {
548 return Ok(PfsStatus::Requested);
550 }
551 Ok(PfsStatus::Inactive)
552 }
553
554 #[cfg(feature = "software-crypto")]
557 pub async fn join(&self, channel: &Channel) -> Result<BoundChannel<M>, NodeError<M>> {
558 let mut membership = self.membership.borrow_mut();
559
560 if let Some(entry) = membership
562 .channels
563 .iter_mut()
564 .find(|e| e.channel == *channel)
565 {
566 if entry.active {
567 return Ok(BoundChannel {
568 node: self.clone(),
569 channel: entry.channel.clone(),
570 join_generation: entry.generation,
571 });
572 }
573 entry.active = true;
575 entry.generation = entry.generation.wrapping_add(1);
576 return Ok(BoundChannel {
577 node: self.clone(),
578 channel: entry.channel.clone(),
579 join_generation: entry.generation,
580 });
581 }
582
583 drop(membership);
585 self.mac.add_private_channel(channel.key().clone()).await?;
586 let mut membership = self.membership.borrow_mut();
587
588 let generation = 0;
589 membership.channels.push(ChannelMembershipEntry {
590 channel: channel.clone(),
591 generation,
592 active: true,
593 });
594
595 Ok(BoundChannel {
596 node: self.clone(),
597 channel: channel.clone(),
598 join_generation: generation,
599 })
600 }
601
602 #[cfg(feature = "software-crypto")]
605 pub fn leave(&self, channel: &Channel) -> Result<(), NodeError<M>> {
606 let mut membership = self.membership.borrow_mut();
607 if let Some(entry) = membership
608 .channels
609 .iter_mut()
610 .find(|e| e.channel == *channel && e.active)
611 {
612 entry.active = false;
613 entry.generation = entry.generation.wrapping_add(1);
614 }
615 Ok(())
616 }
617
618 #[cfg(feature = "software-crypto")]
620 pub fn bound_channel(&self, channel: &Channel) -> Option<BoundChannel<M>> {
621 let membership = self.membership.borrow();
622 membership
623 .channels
624 .iter()
625 .find(|e| e.channel == *channel && e.active)
626 .map(|entry| BoundChannel {
627 node: self.clone(),
628 channel: entry.channel.clone(),
629 join_generation: entry.generation,
630 })
631 }
632
633 #[cfg(feature = "software-crypto")]
635 pub fn bound_channels(&self) -> Vec<BoundChannel<M>> {
636 let membership = self.membership.borrow();
637 membership
638 .channels
639 .iter()
640 .filter(|e| e.active)
641 .map(|entry| BoundChannel {
642 node: self.clone(),
643 channel: entry.channel.clone(),
644 join_generation: entry.generation,
645 })
646 .collect()
647 }
648
649 fn register_ack_send(
651 &self,
652 send_identity_id: LocalIdentityId,
653 receipt: Option<umsh_mac::SendReceipt>,
654 ) -> SendProgressTicket {
655 match receipt {
656 Some(receipt) => {
657 let token = SendToken::new(send_identity_id, receipt);
658 let state = self.dispatcher.borrow_mut().register_ticket(token, false);
659 SendProgressTicket::new(token, state)
660 }
661 None => SendProgressTicket::fire_and_forget(),
663 }
664 }
665
666 fn register_non_ack_send(
671 &self,
672 send_identity_id: LocalIdentityId,
673 receipt: umsh_mac::SendReceipt,
674 ) -> SendProgressTicket {
675 let token = SendToken::new(send_identity_id, receipt);
676 let state = self.dispatcher.borrow_mut().register_ticket(token, true);
677 SendProgressTicket::new(token, state)
678 }
679
680 pub(crate) fn state(&self) -> &Rc<RefCell<LocalNodeState>> {
681 &self.state
682 }
683
684 #[cfg(feature = "software-crypto")]
685 pub(crate) fn owns_ephemeral_identity(&self, identity_id: LocalIdentityId) -> bool {
686 self.state
687 .borrow()
688 .pfs
689 .sessions()
690 .iter()
691 .any(|session| session.local_ephemeral_id == identity_id)
692 }
693
694 #[cfg(feature = "software-crypto")]
695 pub(crate) async fn handle_pfs_command(
696 &self,
697 from: &PublicKey,
698 command: &OwnedMacCommand,
699 options: &SendOptions,
700 ) -> Result<Option<PfsLifecycle>, NodeError<M>> {
701 match *command {
702 OwnedMacCommand::PfsSessionRequest {
703 ephemeral_key,
704 duration_minutes,
705 } => {
706 self.state
707 .borrow_mut()
708 .pfs
709 .accept_request(
710 &self.mac,
711 self.identity_id,
712 *from,
713 ephemeral_key,
714 duration_minutes,
715 options,
716 )
717 .await?;
718 Ok(Some(PfsLifecycle::Established(*from)))
719 }
720 OwnedMacCommand::PfsSessionResponse {
721 ephemeral_key,
722 duration_minutes,
723 } => {
724 if self
725 .state
726 .borrow_mut()
727 .pfs
728 .accept_response(
729 &self.mac,
730 self.identity_id,
731 *from,
732 ephemeral_key,
733 duration_minutes,
734 )
735 .await?
736 {
737 Ok(Some(PfsLifecycle::Established(*from)))
738 } else {
739 Ok(None)
740 }
741 }
742 OwnedMacCommand::EndPfsSession => {
743 let _ = self
744 .state
745 .borrow_mut()
746 .pfs
747 .end_session(&self.mac, self.identity_id, from, false, options)
748 .await?;
749 Ok(Some(PfsLifecycle::Ended(*from)))
750 }
751 _ => Ok(None),
752 }
753 }
754
755 pub(crate) fn dispatch_received_packet(&self, packet: &ReceivedPacketRef<'_>) -> bool {
756 let peer = packet.from_key();
757 let mut state = self.state.borrow_mut();
758
759 if let Some(peer) = peer.map(|peer| canonical_peer(&state, peer)) {
760 if let Some(entry) = state
761 .peer_subscriptions
762 .iter_mut()
763 .find(|entry| entry.peer == peer)
764 {
765 if entry.receive_handlers.any_mut(|handler| handler(packet)) {
766 return true;
767 }
768 }
769 }
770
771 state.receive_handlers.any_mut(|handler| handler(packet))
772 }
773
774 pub(crate) fn dispatch_node_discovered(&self, key: PublicKey, name: Option<&str>) {
775 self.state
776 .borrow_mut()
777 .node_discovered_handlers
778 .for_each_mut(|handler| handler(key, name));
779 }
780
781 pub(crate) fn dispatch_beacon(&self, from_hint: NodeHint, from_key: Option<PublicKey>) {
782 self.state
783 .borrow_mut()
784 .beacon_handlers
785 .for_each_mut(|handler| handler(from_hint, from_key));
786 }
787
788 pub(crate) fn dispatch_mac_command(&self, from: PublicKey, command: &OwnedMacCommand) {
789 self.state
790 .borrow_mut()
791 .mac_command_handlers
792 .for_each_mut(|handler| handler(from, command));
793 }
794
795 pub(crate) fn dispatch_ack_received(&self, peer: PublicKey, token: SendToken) {
796 let mut state = self.state.borrow_mut();
797 let peer = canonical_peer(&state, peer);
798 if let Some(entry) = state
799 .peer_subscriptions
800 .iter_mut()
801 .find(|entry| entry.peer == peer)
802 {
803 entry
804 .ack_received_handlers
805 .for_each_mut(|handler| handler(token));
806 }
807 state
808 .ack_received_handlers
809 .for_each_mut(|handler| handler(peer, token));
810 }
811
812 pub(crate) fn dispatch_ack_timeout(&self, peer: PublicKey, token: SendToken) {
813 let mut state = self.state.borrow_mut();
814 let peer = canonical_peer(&state, peer);
815 if let Some(entry) = state
816 .peer_subscriptions
817 .iter_mut()
818 .find(|entry| entry.peer == peer)
819 {
820 entry
821 .ack_timeout_handlers
822 .for_each_mut(|handler| handler(token));
823 }
824 state
825 .ack_timeout_handlers
826 .for_each_mut(|handler| handler(peer, token));
827 }
828
829 pub(crate) fn dispatch_pfs_established(&self, peer: PublicKey) {
830 let mut state = self.state.borrow_mut();
831 let peer = canonical_peer(&state, peer);
832 if let Some(entry) = state
833 .peer_subscriptions
834 .iter_mut()
835 .find(|entry| entry.peer == peer)
836 {
837 entry
838 .pfs_established_handlers
839 .for_each_mut(|handler| handler());
840 }
841 state
842 .pfs_established_handlers
843 .for_each_mut(|handler| handler(peer));
844 }
845
846 pub(crate) fn dispatch_pfs_ended(&self, peer: PublicKey) {
847 let mut state = self.state.borrow_mut();
848 let peer = canonical_peer(&state, peer);
849 if let Some(entry) = state
850 .peer_subscriptions
851 .iter_mut()
852 .find(|entry| entry.peer == peer)
853 {
854 entry.pfs_ended_handlers.for_each_mut(|handler| handler());
855 }
856 state
857 .pfs_ended_handlers
858 .for_each_mut(|handler| handler(peer));
859 }
860
861 pub(crate) async fn expire_pfs_sessions(&self) -> Result<Vec<PublicKey>, NodeError<M>> {
862 #[cfg(feature = "software-crypto")]
863 {
864 let now_ms = self.mac.now_ms().await;
865 return self
866 .state
867 .borrow_mut()
868 .pfs
869 .expire_sessions(&self.mac, now_ms)
870 .await;
871 }
872 #[cfg(not(feature = "software-crypto"))]
873 {
874 Ok(Vec::new())
875 }
876 }
877}
878
879fn canonical_peer(state: &LocalNodeState, peer: PublicKey) -> PublicKey {
880 #[cfg(feature = "software-crypto")]
881 {
882 if let Some(session) = state
883 .pfs
884 .sessions()
885 .iter()
886 .find(|session| session.state == PfsState::Active && session.peer_ephemeral == peer)
887 {
888 return session.peer_long_term;
889 }
890 }
891 peer
892}
893
894impl<M: MacBackend> Transport for LocalNode<M> {
895 type Error = NodeError<M>;
896
897 async fn send(
898 &self,
899 to: &PublicKey,
900 payload: &[u8],
901 options: &SendOptions,
902 ) -> Result<SendProgressTicket, Self::Error> {
903 #[cfg(feature = "software-crypto")]
904 let (send_identity_id, receipt) = {
905 let now_ms = self.mac.now_ms().await;
906 if let Some((local_id, peer_ephemeral)) =
907 self.state.borrow().pfs.active_route(to, now_ms)
908 {
909 let receipt = self
910 .mac
911 .send_unicast(local_id, &peer_ephemeral, payload, options)
912 .await?;
913 (local_id, receipt)
914 } else {
915 let receipt = self
916 .mac
917 .send_unicast(self.identity_id, to, payload, options)
918 .await?;
919 (self.identity_id, receipt)
920 }
921 };
922 #[cfg(not(feature = "software-crypto"))]
923 let (send_identity_id, receipt) = (
924 self.identity_id,
925 self.mac
926 .send_unicast(self.identity_id, to, payload, options)
927 .await?,
928 );
929 Ok(self.register_ack_send(send_identity_id, receipt))
930 }
931
932 async fn send_all(
933 &self,
934 payload: &[u8],
935 options: &SendOptions,
936 ) -> Result<SendProgressTicket, Self::Error> {
937 let receipt = self
938 .mac
939 .send_broadcast(self.identity_id, payload, options)
940 .await?;
941 Ok(self.register_non_ack_send(self.identity_id, receipt))
942 }
943}
944
945#[cfg(feature = "software-crypto")]
951#[derive(Clone)]
952pub struct BoundChannel<M: MacBackend> {
953 node: LocalNode<M>,
954 channel: Channel,
955 join_generation: u64,
956}
957
958#[cfg(feature = "software-crypto")]
959impl<M: MacBackend> BoundChannel<M> {
960 pub fn channel(&self) -> &Channel {
962 &self.channel
963 }
964
965 pub fn is_active(&self) -> bool {
967 let membership = self.node.membership.borrow();
968 membership
969 .channels
970 .iter()
971 .any(|e| e.channel == self.channel && e.active && e.generation == self.join_generation)
972 }
973
974 pub fn peer(&self, key: PublicKey) -> PeerConnection<Self> {
976 PeerConnection::new(self.clone(), key)
977 }
978
979 fn check_active(&self) -> Result<(), NodeError<M>> {
981 if self.is_active() {
982 Ok(())
983 } else {
984 Err(NodeError::ChannelLeft)
985 }
986 }
987
988 pub fn node(&self) -> &LocalNode<M> {
990 &self.node
991 }
992}
993
994#[cfg(feature = "software-crypto")]
995impl<M: MacBackend> Transport for BoundChannel<M> {
996 type Error = NodeError<M>;
997
998 async fn send(
999 &self,
1000 to: &PublicKey,
1001 payload: &[u8],
1002 options: &SendOptions,
1003 ) -> Result<SendProgressTicket, Self::Error> {
1004 self.check_active()?;
1005 let receipt = self
1006 .node
1007 .mac
1008 .send_blind_unicast(
1009 self.node.identity_id,
1010 to,
1011 self.channel.channel_id(),
1012 payload,
1013 options,
1014 )
1015 .await?;
1016 Ok(self.node.register_ack_send(self.node.identity_id, receipt))
1017 }
1018
1019 async fn send_all(
1020 &self,
1021 payload: &[u8],
1022 options: &SendOptions,
1023 ) -> Result<SendProgressTicket, Self::Error> {
1024 self.check_active()?;
1025 let receipt = self
1026 .node
1027 .mac
1028 .send_multicast(
1029 self.node.identity_id,
1030 self.channel.channel_id(),
1031 payload,
1032 options,
1033 )
1034 .await?;
1035 Ok(self
1036 .node
1037 .register_non_ack_send(self.node.identity_id, receipt))
1038 }
1039}