1use alloc::boxed::Box;
2use alloc::rc::Rc;
3#[cfg(feature = "software-crypto")]
4use alloc::vec::Vec;
5use core::cell::RefCell;
6use core::num::NonZeroU32;
7
8use umsh_core::NodeHint;
9use umsh_core::PublicKey;
10use umsh_mac::{LocalIdentityId, SendOptions};
11
12#[cfg(feature = "software-crypto")]
13use crate::channel::Channel;
14use crate::dispatch::EventDispatcher;
15use crate::mac::MacBackend;
16use crate::peer::PeerConnection;
17#[cfg(feature = "software-crypto")]
18use crate::pfs::{PfsSessionManager, PfsState};
19use crate::receive::ReceivedPacketRef;
20use crate::ticket::{SendProgressTicket, SendToken};
21use crate::transport::Transport;
22use crate::{AppEncodeError, OwnedMacCommand};
23
24pub(crate) struct NodeMembership {
27 #[cfg(feature = "software-crypto")]
28 pub channels: Vec<ChannelMembershipEntry>,
29}
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
32pub(crate) struct SubscriptionHandle(NonZeroU32);
33
34pub struct Subscription {
38 cancel: Rc<RefCell<Option<Box<dyn FnMut() -> bool>>>>,
39}
40
41impl Subscription {
42 pub(crate) fn new(cancel: impl FnMut() -> bool + 'static) -> Self {
43 Self {
44 cancel: Rc::new(RefCell::new(Some(Box::new(cancel)))),
45 }
46 }
47
48 pub fn unsubscribe(self) -> bool {
50 Self::run_cancel(&self.cancel)
51 }
52
53 fn run_cancel(cancel: &Rc<RefCell<Option<Box<dyn FnMut() -> bool>>>>) -> bool {
54 let Some(mut cancel) = cancel.borrow_mut().take() else {
55 return false;
56 };
57 cancel()
58 }
59}
60
61impl Drop for Subscription {
62 fn drop(&mut self) {
63 let _ = Self::run_cancel(&self.cancel);
64 }
65}
66
67pub(crate) struct HandlerTable<T> {
68 slots: Vec<Option<T>>,
69}
70
71impl<T> Default for HandlerTable<T> {
72 fn default() -> Self {
73 Self { slots: Vec::new() }
74 }
75}
76
77impl<T> HandlerTable<T> {
78 pub(crate) fn insert(&mut self, handler: T) -> SubscriptionHandle {
79 if let Some((index, slot)) = self
80 .slots
81 .iter_mut()
82 .enumerate()
83 .find(|(_, slot)| slot.is_none())
84 {
85 *slot = Some(handler);
86 return SubscriptionHandle(NonZeroU32::new((index + 1) as u32).unwrap());
87 }
88 self.slots.push(Some(handler));
89 SubscriptionHandle(NonZeroU32::new(self.slots.len() as u32).unwrap())
90 }
91
92 pub(crate) fn remove(&mut self, handle: SubscriptionHandle) -> bool {
93 let index = handle.0.get() as usize - 1;
94 let Some(slot) = self.slots.get_mut(index) else {
95 return false;
96 };
97 slot.take().is_some()
98 }
99
100 fn any_mut(&mut self, mut f: impl FnMut(&mut T) -> bool) -> bool {
101 for slot in &mut self.slots {
102 let Some(handler) = slot.as_mut() else {
103 continue;
104 };
105 if f(handler) {
106 return true;
107 }
108 }
109 false
110 }
111
112 fn for_each_mut(&mut self, mut f: impl FnMut(&mut T)) {
113 for slot in &mut self.slots {
114 let Some(handler) = slot.as_mut() else {
115 continue;
116 };
117 f(handler);
118 }
119 }
120}
121
122pub(crate) struct PeerSubscriptions {
123 peer: PublicKey,
124 pub(crate) receive_handlers: HandlerTable<Box<dyn FnMut(&ReceivedPacketRef<'_>) -> bool>>,
125 pub(crate) ack_received_handlers: HandlerTable<Box<dyn FnMut(SendToken)>>,
126 pub(crate) ack_timeout_handlers: HandlerTable<Box<dyn FnMut(SendToken)>>,
127 pub(crate) pfs_established_handlers: HandlerTable<Box<dyn FnMut()>>,
128 pub(crate) pfs_ended_handlers: HandlerTable<Box<dyn FnMut()>>,
129}
130
131impl PeerSubscriptions {
132 fn new(peer: PublicKey) -> Self {
133 Self {
134 peer,
135 receive_handlers: HandlerTable::default(),
136 ack_received_handlers: HandlerTable::default(),
137 ack_timeout_handlers: HandlerTable::default(),
138 pfs_established_handlers: HandlerTable::default(),
139 pfs_ended_handlers: HandlerTable::default(),
140 }
141 }
142}
143
144pub(crate) struct LocalNodeState {
145 receive_handlers: HandlerTable<Box<dyn FnMut(&ReceivedPacketRef<'_>) -> bool>>,
146 node_discovered_handlers: HandlerTable<Box<dyn FnMut(PublicKey, Option<&str>)>>,
147 beacon_handlers: HandlerTable<Box<dyn FnMut(NodeHint, Option<PublicKey>)>>,
148 mac_command_handlers: HandlerTable<Box<dyn FnMut(PublicKey, &OwnedMacCommand)>>,
149 ack_received_handlers: HandlerTable<Box<dyn FnMut(PublicKey, SendToken)>>,
150 ack_timeout_handlers: HandlerTable<Box<dyn FnMut(PublicKey, SendToken)>>,
151 pfs_established_handlers: HandlerTable<Box<dyn FnMut(PublicKey)>>,
152 pfs_ended_handlers: HandlerTable<Box<dyn FnMut(PublicKey)>>,
153 peer_subscriptions: Vec<PeerSubscriptions>,
154 #[cfg(feature = "software-crypto")]
155 pfs: PfsSessionManager,
156}
157
158impl LocalNodeState {
159 pub(crate) fn new() -> Self {
160 Self {
161 receive_handlers: HandlerTable::default(),
162 node_discovered_handlers: HandlerTable::default(),
163 beacon_handlers: HandlerTable::default(),
164 mac_command_handlers: HandlerTable::default(),
165 ack_received_handlers: HandlerTable::default(),
166 ack_timeout_handlers: HandlerTable::default(),
167 pfs_established_handlers: HandlerTable::default(),
168 pfs_ended_handlers: HandlerTable::default(),
169 peer_subscriptions: Vec::new(),
170 #[cfg(feature = "software-crypto")]
171 pfs: PfsSessionManager::new(),
172 }
173 }
174
175 pub(crate) fn peer_subscriptions_mut(&mut self, peer: PublicKey) -> &mut PeerSubscriptions {
176 if let Some(index) = self
177 .peer_subscriptions
178 .iter()
179 .position(|entry| entry.peer == peer)
180 {
181 return &mut self.peer_subscriptions[index];
182 }
183 self.peer_subscriptions.push(PeerSubscriptions::new(peer));
184 self.peer_subscriptions
185 .last_mut()
186 .expect("peer subscriptions just inserted")
187 }
188
189 pub(crate) fn find_peer_subscriptions_mut(
190 &mut self,
191 peer: PublicKey,
192 ) -> Option<&mut PeerSubscriptions> {
193 self.peer_subscriptions
194 .iter_mut()
195 .find(|entry| entry.peer == peer)
196 }
197}
198
199#[derive(Clone, Copy, Debug, PartialEq, Eq)]
200#[cfg(feature = "software-crypto")]
201pub enum PfsStatus {
202 Inactive,
203 Requested,
204 Active {
205 local_ephemeral_id: LocalIdentityId,
206 peer_ephemeral: PublicKey,
207 expires_ms: u64,
208 },
209}
210
211#[derive(Clone, Copy, Debug, PartialEq, Eq)]
212pub(crate) enum PfsLifecycle {
213 Established(PublicKey),
214 Ended(PublicKey),
215}
216
217#[cfg(feature = "software-crypto")]
218pub(crate) struct ChannelMembershipEntry {
219 pub channel: Channel,
220 pub generation: u64,
223 pub active: bool,
225}
226
227impl NodeMembership {
228 pub fn new() -> Self {
229 Self {
230 #[cfg(feature = "software-crypto")]
231 channels: Vec::new(),
232 }
233 }
234}
235
236#[derive(Clone, PartialEq, Eq)]
238pub enum NodeError<M: MacBackend> {
239 Mac(crate::mac::MacBackendError<M::SendError, M::CapacityError>),
241 ChannelLeft,
243 PeerMissing,
245 AppEncode(AppEncodeError),
247 #[cfg(feature = "software-crypto")]
249 PfsSessionMissing,
250 #[cfg(feature = "software-crypto")]
252 PfsSessionTableFull,
253 #[cfg(feature = "software-crypto")]
255 Crypto(umsh_crypto::CryptoError),
256}
257
258impl<M> core::fmt::Debug for NodeError<M>
259where
260 M: MacBackend,
261 M::SendError: core::fmt::Debug,
262 M::CapacityError: core::fmt::Debug,
263{
264 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
265 match self {
266 Self::Mac(e) => f.debug_tuple("Mac").field(e).finish(),
267 Self::ChannelLeft => f.write_str("ChannelLeft"),
268 Self::PeerMissing => f.write_str("PeerMissing"),
269 Self::AppEncode(e) => f.debug_tuple("AppEncode").field(e).finish(),
270 #[cfg(feature = "software-crypto")]
271 Self::PfsSessionMissing => f.write_str("PfsSessionMissing"),
272 #[cfg(feature = "software-crypto")]
273 Self::PfsSessionTableFull => f.write_str("PfsSessionTableFull"),
274 #[cfg(feature = "software-crypto")]
275 Self::Crypto(e) => f.debug_tuple("Crypto").field(e).finish(),
276 }
277 }
278}
279
280impl<M: MacBackend> From<crate::mac::MacBackendError<M::SendError, M::CapacityError>>
281 for NodeError<M>
282{
283 fn from(e: crate::mac::MacBackendError<M::SendError, M::CapacityError>) -> Self {
284 Self::Mac(e)
285 }
286}
287
288impl<M: MacBackend> From<AppEncodeError> for NodeError<M> {
289 fn from(e: AppEncodeError) -> Self {
290 Self::AppEncode(e)
291 }
292}
293
294#[cfg(feature = "software-crypto")]
295impl<M: MacBackend> From<umsh_crypto::CryptoError> for NodeError<M> {
296 fn from(e: umsh_crypto::CryptoError) -> Self {
297 Self::Crypto(e)
298 }
299}
300
301#[derive(Clone)]
307pub struct LocalNode<M: MacBackend> {
308 identity_id: LocalIdentityId,
309 mac: M,
310 dispatcher: Rc<RefCell<EventDispatcher>>,
311 #[allow(dead_code)] membership: Rc<RefCell<NodeMembership>>,
313 state: Rc<RefCell<LocalNodeState>>,
314}
315
316impl<M: MacBackend> LocalNode<M> {
317 pub(crate) fn new(
319 identity_id: LocalIdentityId,
320 mac: M,
321 dispatcher: Rc<RefCell<EventDispatcher>>,
322 membership: Rc<RefCell<NodeMembership>>,
323 state: Rc<RefCell<LocalNodeState>>,
324 ) -> Self {
325 Self {
326 identity_id,
327 mac,
328 dispatcher,
329 membership,
330 state,
331 }
332 }
333
334 pub fn identity_id(&self) -> LocalIdentityId {
336 self.identity_id
337 }
338
339 pub fn peer(&self, key: PublicKey) -> Result<PeerConnection<Self>, NodeError<M>> {
341 self.mac.add_peer(key)?;
342 Ok(PeerConnection::new(self.clone(), key))
343 }
344
345 fn add_receive_handler<F>(&self, handler: F) -> SubscriptionHandle
346 where
347 F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
348 {
349 self.state
350 .borrow_mut()
351 .receive_handlers
352 .insert(Box::new(handler))
353 }
354
355 pub fn on_receive<F>(&self, handler: F) -> Subscription
356 where
357 F: FnMut(&ReceivedPacketRef<'_>) -> bool + 'static,
358 {
359 let handle = self.add_receive_handler(handler);
360 let state = self.state.clone();
361 Subscription::new(move || state.borrow_mut().receive_handlers.remove(handle))
362 }
363
364 fn add_node_discovered_handler<F>(&self, handler: F) -> SubscriptionHandle
365 where
366 F: FnMut(PublicKey, Option<&str>) + 'static,
367 {
368 self.state
369 .borrow_mut()
370 .node_discovered_handlers
371 .insert(Box::new(handler))
372 }
373
374 pub fn on_node_discovered<F>(&self, handler: F) -> Subscription
375 where
376 F: FnMut(PublicKey, Option<&str>) + 'static,
377 {
378 let handle = self.add_node_discovered_handler(handler);
379 let state = self.state.clone();
380 Subscription::new(move || state.borrow_mut().node_discovered_handlers.remove(handle))
381 }
382
383 fn add_beacon_handler<F>(&self, handler: F) -> SubscriptionHandle
384 where
385 F: FnMut(NodeHint, Option<PublicKey>) + 'static,
386 {
387 self.state
388 .borrow_mut()
389 .beacon_handlers
390 .insert(Box::new(handler))
391 }
392
393 pub fn on_beacon<F>(&self, handler: F) -> Subscription
394 where
395 F: FnMut(NodeHint, Option<PublicKey>) + 'static,
396 {
397 let handle = self.add_beacon_handler(handler);
398 let state = self.state.clone();
399 Subscription::new(move || state.borrow_mut().beacon_handlers.remove(handle))
400 }
401
402 fn add_mac_command_handler<F>(&self, handler: F) -> SubscriptionHandle
403 where
404 F: FnMut(PublicKey, &OwnedMacCommand) + 'static,
405 {
406 self.state
407 .borrow_mut()
408 .mac_command_handlers
409 .insert(Box::new(handler))
410 }
411
412 pub fn on_mac_command<F>(&self, handler: F) -> Subscription
413 where
414 F: FnMut(PublicKey, &OwnedMacCommand) + 'static,
415 {
416 let handle = self.add_mac_command_handler(handler);
417 let state = self.state.clone();
418 Subscription::new(move || state.borrow_mut().mac_command_handlers.remove(handle))
419 }
420
421 fn add_ack_received_handler<F>(&self, handler: F) -> SubscriptionHandle
422 where
423 F: FnMut(PublicKey, SendToken) + 'static,
424 {
425 self.state
426 .borrow_mut()
427 .ack_received_handlers
428 .insert(Box::new(handler))
429 }
430
431 pub fn on_ack_received<F>(&self, handler: F) -> Subscription
432 where
433 F: FnMut(PublicKey, SendToken) + 'static,
434 {
435 let handle = self.add_ack_received_handler(handler);
436 let state = self.state.clone();
437 Subscription::new(move || state.borrow_mut().ack_received_handlers.remove(handle))
438 }
439
440 fn add_ack_timeout_handler<F>(&self, handler: F) -> SubscriptionHandle
441 where
442 F: FnMut(PublicKey, SendToken) + 'static,
443 {
444 self.state
445 .borrow_mut()
446 .ack_timeout_handlers
447 .insert(Box::new(handler))
448 }
449
450 pub fn on_ack_timeout<F>(&self, handler: F) -> Subscription
451 where
452 F: FnMut(PublicKey, SendToken) + 'static,
453 {
454 let handle = self.add_ack_timeout_handler(handler);
455 let state = self.state.clone();
456 Subscription::new(move || state.borrow_mut().ack_timeout_handlers.remove(handle))
457 }
458
459 fn add_pfs_established_handler<F>(&self, handler: F) -> SubscriptionHandle
460 where
461 F: FnMut(PublicKey) + 'static,
462 {
463 self.state
464 .borrow_mut()
465 .pfs_established_handlers
466 .insert(Box::new(handler))
467 }
468
469 pub fn on_pfs_established<F>(&self, handler: F) -> Subscription
470 where
471 F: FnMut(PublicKey) + 'static,
472 {
473 let handle = self.add_pfs_established_handler(handler);
474 let state = self.state.clone();
475 Subscription::new(move || state.borrow_mut().pfs_established_handlers.remove(handle))
476 }
477
478 fn add_pfs_ended_handler<F>(&self, handler: F) -> SubscriptionHandle
479 where
480 F: FnMut(PublicKey) + 'static,
481 {
482 self.state
483 .borrow_mut()
484 .pfs_ended_handlers
485 .insert(Box::new(handler))
486 }
487
488 pub fn on_pfs_ended<F>(&self, handler: F) -> Subscription
489 where
490 F: FnMut(PublicKey) + 'static,
491 {
492 let handle = self.add_pfs_ended_handler(handler);
493 let state = self.state.clone();
494 Subscription::new(move || state.borrow_mut().pfs_ended_handlers.remove(handle))
495 }
496
497 #[cfg(feature = "software-crypto")]
498 pub async fn request_pfs(
499 &self,
500 peer: &PublicKey,
501 duration_minutes: u16,
502 options: &SendOptions,
503 ) -> Result<SendProgressTicket, NodeError<M>> {
504 let receipt = self
505 .state
506 .borrow_mut()
507 .pfs
508 .request_session(&self.mac, self.identity_id, peer, duration_minutes, options)
509 .await?;
510 Ok(self.register_ack_send(self.identity_id, receipt))
511 }
512
513 #[cfg(feature = "software-crypto")]
514 pub async fn end_pfs(
515 &self,
516 peer: &PublicKey,
517 options: &SendOptions,
518 ) -> Result<(), NodeError<M>> {
519 let _ = self
520 .state
521 .borrow_mut()
522 .pfs
523 .end_session(&self.mac, self.identity_id, peer, true, options)
524 .await?;
525 Ok(())
526 }
527
528 #[cfg(feature = "software-crypto")]
529 pub fn pfs_status(&self, peer: &PublicKey) -> Result<PfsStatus, NodeError<M>> {
530 let now_ms = self.mac.now_ms()?;
531 let state = self.state.borrow();
532 if let Some(session) = state
533 .pfs
534 .sessions()
535 .iter()
536 .find(|session| session.peer_long_term == *peer)
537 {
538 return Ok(match session.state {
539 PfsState::Requested => PfsStatus::Requested,
540 PfsState::Active => PfsStatus::Active {
541 local_ephemeral_id: session.local_ephemeral_id,
542 peer_ephemeral: session.peer_ephemeral,
543 expires_ms: session.expires_ms,
544 },
545 });
546 }
547 if state.pfs.active_route(peer, now_ms).is_some() {
548 return Ok(PfsStatus::Requested);
550 }
551 Ok(PfsStatus::Inactive)
552 }
553
554 #[cfg(feature = "software-crypto")]
557 pub fn join(&self, channel: &Channel) -> Result<BoundChannel<M>, NodeError<M>> {
558 let mut membership = self.membership.borrow_mut();
559
560 if let Some(entry) = membership
562 .channels
563 .iter_mut()
564 .find(|e| e.channel == *channel)
565 {
566 if entry.active {
567 return Ok(BoundChannel {
568 node: self.clone(),
569 channel: entry.channel.clone(),
570 join_generation: entry.generation,
571 });
572 }
573 entry.active = true;
575 entry.generation = entry.generation.wrapping_add(1);
576 return Ok(BoundChannel {
577 node: self.clone(),
578 channel: entry.channel.clone(),
579 join_generation: entry.generation,
580 });
581 }
582
583 self.mac.add_private_channel(channel.key().clone())?;
585
586 let generation = 0;
587 membership.channels.push(ChannelMembershipEntry {
588 channel: channel.clone(),
589 generation,
590 active: true,
591 });
592
593 Ok(BoundChannel {
594 node: self.clone(),
595 channel: channel.clone(),
596 join_generation: generation,
597 })
598 }
599
600 #[cfg(feature = "software-crypto")]
603 pub fn leave(&self, channel: &Channel) -> Result<(), NodeError<M>> {
604 let mut membership = self.membership.borrow_mut();
605 if let Some(entry) = membership
606 .channels
607 .iter_mut()
608 .find(|e| e.channel == *channel && e.active)
609 {
610 entry.active = false;
611 entry.generation = entry.generation.wrapping_add(1);
612 }
613 Ok(())
614 }
615
616 #[cfg(feature = "software-crypto")]
618 pub fn bound_channel(&self, channel: &Channel) -> Option<BoundChannel<M>> {
619 let membership = self.membership.borrow();
620 membership
621 .channels
622 .iter()
623 .find(|e| e.channel == *channel && e.active)
624 .map(|entry| BoundChannel {
625 node: self.clone(),
626 channel: entry.channel.clone(),
627 join_generation: entry.generation,
628 })
629 }
630
631 #[cfg(feature = "software-crypto")]
633 pub fn bound_channels(&self) -> Vec<BoundChannel<M>> {
634 let membership = self.membership.borrow();
635 membership
636 .channels
637 .iter()
638 .filter(|e| e.active)
639 .map(|entry| BoundChannel {
640 node: self.clone(),
641 channel: entry.channel.clone(),
642 join_generation: entry.generation,
643 })
644 .collect()
645 }
646
647 fn register_ack_send(
649 &self,
650 send_identity_id: LocalIdentityId,
651 receipt: Option<umsh_mac::SendReceipt>,
652 ) -> SendProgressTicket {
653 match receipt {
654 Some(receipt) => {
655 let token = SendToken::new(send_identity_id, receipt);
656 let state = self.dispatcher.borrow_mut().register_ticket(token, false);
657 SendProgressTicket::new(token, state)
658 }
659 None => SendProgressTicket::fire_and_forget(),
661 }
662 }
663
664 fn register_non_ack_send(
669 &self,
670 send_identity_id: LocalIdentityId,
671 receipt: umsh_mac::SendReceipt,
672 ) -> SendProgressTicket {
673 let token = SendToken::new(send_identity_id, receipt);
674 let state = self.dispatcher.borrow_mut().register_ticket(token, true);
675 SendProgressTicket::new(token, state)
676 }
677
678 pub(crate) fn state(&self) -> &Rc<RefCell<LocalNodeState>> {
679 &self.state
680 }
681
682 #[cfg(feature = "software-crypto")]
683 pub(crate) fn owns_ephemeral_identity(&self, identity_id: LocalIdentityId) -> bool {
684 self.state
685 .borrow()
686 .pfs
687 .sessions()
688 .iter()
689 .any(|session| session.local_ephemeral_id == identity_id)
690 }
691
692 #[cfg(feature = "software-crypto")]
693 pub(crate) async fn handle_pfs_command(
694 &self,
695 from: &PublicKey,
696 command: &OwnedMacCommand,
697 options: &SendOptions,
698 ) -> Result<Option<PfsLifecycle>, NodeError<M>> {
699 match *command {
700 OwnedMacCommand::PfsSessionRequest {
701 ephemeral_key,
702 duration_minutes,
703 } => {
704 self.state
705 .borrow_mut()
706 .pfs
707 .accept_request(
708 &self.mac,
709 self.identity_id,
710 *from,
711 ephemeral_key,
712 duration_minutes,
713 options,
714 )
715 .await?;
716 Ok(Some(PfsLifecycle::Established(*from)))
717 }
718 OwnedMacCommand::PfsSessionResponse {
719 ephemeral_key,
720 duration_minutes,
721 } => {
722 if self.state.borrow_mut().pfs.accept_response(
723 &self.mac,
724 self.identity_id,
725 *from,
726 ephemeral_key,
727 duration_minutes,
728 )? {
729 Ok(Some(PfsLifecycle::Established(*from)))
730 } else {
731 Ok(None)
732 }
733 }
734 OwnedMacCommand::EndPfsSession => {
735 let _ = self
736 .state
737 .borrow_mut()
738 .pfs
739 .end_session(&self.mac, self.identity_id, from, false, options)
740 .await?;
741 Ok(Some(PfsLifecycle::Ended(*from)))
742 }
743 _ => Ok(None),
744 }
745 }
746
747 pub(crate) fn dispatch_received_packet(&self, packet: &ReceivedPacketRef<'_>) -> bool {
748 let peer = packet.from_key();
749 let mut state = self.state.borrow_mut();
750
751 if let Some(peer) = peer.map(|peer| canonical_peer(&state, peer)) {
752 if let Some(entry) = state
753 .peer_subscriptions
754 .iter_mut()
755 .find(|entry| entry.peer == peer)
756 {
757 if entry.receive_handlers.any_mut(|handler| handler(packet)) {
758 return true;
759 }
760 }
761 }
762
763 state.receive_handlers.any_mut(|handler| handler(packet))
764 }
765
766 pub(crate) fn dispatch_node_discovered(&self, key: PublicKey, name: Option<&str>) {
767 self.state
768 .borrow_mut()
769 .node_discovered_handlers
770 .for_each_mut(|handler| handler(key, name));
771 }
772
773 pub(crate) fn dispatch_beacon(&self, from_hint: NodeHint, from_key: Option<PublicKey>) {
774 self.state
775 .borrow_mut()
776 .beacon_handlers
777 .for_each_mut(|handler| handler(from_hint, from_key));
778 }
779
780 pub(crate) fn dispatch_mac_command(&self, from: PublicKey, command: &OwnedMacCommand) {
781 self.state
782 .borrow_mut()
783 .mac_command_handlers
784 .for_each_mut(|handler| handler(from, command));
785 }
786
787 pub(crate) fn dispatch_ack_received(&self, peer: PublicKey, token: SendToken) {
788 let mut state = self.state.borrow_mut();
789 let peer = canonical_peer(&state, peer);
790 if let Some(entry) = state
791 .peer_subscriptions
792 .iter_mut()
793 .find(|entry| entry.peer == peer)
794 {
795 entry
796 .ack_received_handlers
797 .for_each_mut(|handler| handler(token));
798 }
799 state
800 .ack_received_handlers
801 .for_each_mut(|handler| handler(peer, token));
802 }
803
804 pub(crate) fn dispatch_ack_timeout(&self, peer: PublicKey, token: SendToken) {
805 let mut state = self.state.borrow_mut();
806 let peer = canonical_peer(&state, peer);
807 if let Some(entry) = state
808 .peer_subscriptions
809 .iter_mut()
810 .find(|entry| entry.peer == peer)
811 {
812 entry
813 .ack_timeout_handlers
814 .for_each_mut(|handler| handler(token));
815 }
816 state
817 .ack_timeout_handlers
818 .for_each_mut(|handler| handler(peer, token));
819 }
820
821 pub(crate) fn dispatch_pfs_established(&self, peer: PublicKey) {
822 let mut state = self.state.borrow_mut();
823 let peer = canonical_peer(&state, peer);
824 if let Some(entry) = state
825 .peer_subscriptions
826 .iter_mut()
827 .find(|entry| entry.peer == peer)
828 {
829 entry
830 .pfs_established_handlers
831 .for_each_mut(|handler| handler());
832 }
833 state
834 .pfs_established_handlers
835 .for_each_mut(|handler| handler(peer));
836 }
837
838 pub(crate) fn dispatch_pfs_ended(&self, peer: PublicKey) {
839 let mut state = self.state.borrow_mut();
840 let peer = canonical_peer(&state, peer);
841 if let Some(entry) = state
842 .peer_subscriptions
843 .iter_mut()
844 .find(|entry| entry.peer == peer)
845 {
846 entry.pfs_ended_handlers.for_each_mut(|handler| handler());
847 }
848 state
849 .pfs_ended_handlers
850 .for_each_mut(|handler| handler(peer));
851 }
852
853 pub(crate) fn expire_pfs_sessions(&self) -> Result<Vec<PublicKey>, NodeError<M>> {
854 #[cfg(feature = "software-crypto")]
855 {
856 let now_ms = self.mac.now_ms()?;
857 return self
858 .state
859 .borrow_mut()
860 .pfs
861 .expire_sessions(&self.mac, now_ms);
862 }
863 #[cfg(not(feature = "software-crypto"))]
864 {
865 Ok(Vec::new())
866 }
867 }
868}
869
870fn canonical_peer(state: &LocalNodeState, peer: PublicKey) -> PublicKey {
871 #[cfg(feature = "software-crypto")]
872 {
873 if let Some(session) = state
874 .pfs
875 .sessions()
876 .iter()
877 .find(|session| session.state == PfsState::Active && session.peer_ephemeral == peer)
878 {
879 return session.peer_long_term;
880 }
881 }
882 peer
883}
884
885impl<M: MacBackend> Transport for LocalNode<M> {
886 type Error = NodeError<M>;
887
888 async fn send(
889 &self,
890 to: &PublicKey,
891 payload: &[u8],
892 options: &SendOptions,
893 ) -> Result<SendProgressTicket, Self::Error> {
894 #[cfg(feature = "software-crypto")]
895 let (send_identity_id, receipt) = {
896 let now_ms = self.mac.now_ms()?;
897 if let Some((local_id, peer_ephemeral)) =
898 self.state.borrow().pfs.active_route(to, now_ms)
899 {
900 let receipt = self
901 .mac
902 .send_unicast(local_id, &peer_ephemeral, payload, options)
903 .await?;
904 (local_id, receipt)
905 } else {
906 let receipt = self
907 .mac
908 .send_unicast(self.identity_id, to, payload, options)
909 .await?;
910 (self.identity_id, receipt)
911 }
912 };
913 #[cfg(not(feature = "software-crypto"))]
914 let (send_identity_id, receipt) = (
915 self.identity_id,
916 self.mac
917 .send_unicast(self.identity_id, to, payload, options)
918 .await?,
919 );
920 Ok(self.register_ack_send(send_identity_id, receipt))
921 }
922
923 async fn send_all(
924 &self,
925 payload: &[u8],
926 options: &SendOptions,
927 ) -> Result<SendProgressTicket, Self::Error> {
928 let receipt = self
929 .mac
930 .send_broadcast(self.identity_id, payload, options)
931 .await?;
932 Ok(self.register_non_ack_send(self.identity_id, receipt))
933 }
934}
935
936#[cfg(feature = "software-crypto")]
942#[derive(Clone)]
943pub struct BoundChannel<M: MacBackend> {
944 node: LocalNode<M>,
945 channel: Channel,
946 join_generation: u64,
947}
948
949#[cfg(feature = "software-crypto")]
950impl<M: MacBackend> BoundChannel<M> {
951 pub fn channel(&self) -> &Channel {
953 &self.channel
954 }
955
956 pub fn is_active(&self) -> bool {
958 let membership = self.node.membership.borrow();
959 membership
960 .channels
961 .iter()
962 .any(|e| e.channel == self.channel && e.active && e.generation == self.join_generation)
963 }
964
965 pub fn peer(&self, key: PublicKey) -> PeerConnection<Self> {
967 PeerConnection::new(self.clone(), key)
968 }
969
970 fn check_active(&self) -> Result<(), NodeError<M>> {
972 if self.is_active() {
973 Ok(())
974 } else {
975 Err(NodeError::ChannelLeft)
976 }
977 }
978
979 pub fn node(&self) -> &LocalNode<M> {
981 &self.node
982 }
983}
984
985#[cfg(feature = "software-crypto")]
986impl<M: MacBackend> Transport for BoundChannel<M> {
987 type Error = NodeError<M>;
988
989 async fn send(
990 &self,
991 to: &PublicKey,
992 payload: &[u8],
993 options: &SendOptions,
994 ) -> Result<SendProgressTicket, Self::Error> {
995 self.check_active()?;
996 let receipt = self
997 .node
998 .mac
999 .send_blind_unicast(
1000 self.node.identity_id,
1001 to,
1002 self.channel.channel_id(),
1003 payload,
1004 options,
1005 )
1006 .await?;
1007 Ok(self.node.register_ack_send(self.node.identity_id, receipt))
1008 }
1009
1010 async fn send_all(
1011 &self,
1012 payload: &[u8],
1013 options: &SendOptions,
1014 ) -> Result<SendProgressTicket, Self::Error> {
1015 self.check_active()?;
1016 let receipt = self
1017 .node
1018 .mac
1019 .send_multicast(
1020 self.node.identity_id,
1021 self.channel.channel_id(),
1022 payload,
1023 options,
1024 )
1025 .await?;
1026 Ok(self
1027 .node
1028 .register_non_ack_send(self.node.identity_id, receipt))
1029 }
1030}