1use core::num::NonZeroU8;
2use core::{future::poll_fn, task::Poll};
3
4use hamaddr::HamAddr;
5use heapless::{LinearMap, Vec};
6use rand::{Rng, RngExt as _};
7use umsh_core::{
8 BuildError, ChannelId, ChannelKey, FloodHops, NodeHint, OptionNumber, PacketBuilder,
9 PacketHeader, PacketType, ParseError, ParsedOptions, PayloadType, PublicKey, RouterHint,
10 SourceAddrRef, UnsealedPacket, feed_aad, options::OptionEncoder,
11};
12use umsh_crypto::{
13 CmacState, CryptoEngine, CryptoError, DerivedChannelKeys, NodeIdentity, PairwiseKeys,
14};
15use umsh_hal::{Clock, CounterStore, Radio, RxInfo, Snr, TxError, TxOptions};
16
17use crate::{
18 CapacityError, DEFAULT_ACKS, DEFAULT_CHANNELS, DEFAULT_DUP, DEFAULT_IDENTITIES, DEFAULT_PEERS,
19 DEFAULT_TX, MAX_CAD_ATTEMPTS, MAX_FORWARD_RETRIES, MAX_RESEND_FRAME_LEN, MAX_SOURCE_ROUTE_HOPS,
20 Platform, ReplayVerdict, ReplayWindow,
21 cache::{DupCacheKey, DuplicateCache},
22 peers::CachedRoute,
23 peers::{ChannelTable, PeerCryptoMap, PeerId, PeerRegistry},
24 send::{
25 PendingAck, PendingAckError, ResendRecord, SendOptions, SendReceipt, TxPriority, TxQueue,
26 },
27};
28
29pub enum WakeReason {
35 Received(RxInfo),
38 TimerExpired,
41}
42
43const COUNTER_PERSIST_BLOCK_SIZE: u32 = 128;
44const COUNTER_PERSIST_BLOCK_MASK: u32 = COUNTER_PERSIST_BLOCK_SIZE - 1;
45const COUNTER_PERSIST_SCHEDULE_OFFSET: u32 = 100;
46const MAC_COMMAND_ECHO_REQUEST_ID: u8 = 4;
47const MAC_COMMAND_ECHO_RESPONSE_ID: u8 = 5;
48const COUNTER_RESYNC_NONCE_LEN: usize = 4;
49const COUNTER_RESYNC_REQUEST_RETRY_MS: u64 = 5_000;
50
51#[derive(Clone, Copy, Debug, PartialEq, Eq)]
52struct PendingCounterResync {
53 nonce: u32,
54 requested_ms: u64,
55}
56
57#[derive(Clone, Debug, PartialEq, Eq)]
58struct DeferredCounterResyncFrame<const FRAME: usize> {
59 local_id: LocalIdentityId,
60 peer_id: PeerId,
61 frame: Vec<u8, FRAME>,
62 rssi: i16,
63 snr: Snr,
64 lqi: Option<NonZeroU8>,
65 received_at_ms: u64,
66}
67
68#[derive(Clone, Copy, Debug, PartialEq, Eq)]
69struct ResolvedMulticastSource {
70 peer_id: Option<PeerId>,
71 public_key: Option<PublicKey>,
72 hint: Option<NodeHint>,
73}
74
75#[derive(Clone, Debug, PartialEq, Eq)]
76struct PostTxListen {
77 identity_id: LocalIdentityId,
78 receipt: SendReceipt,
79 confirm_key: DupCacheKey,
80 deadline_ms: u64,
81}
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
96pub struct LocalIdentityId(pub u8);
97
98pub enum LocalIdentity<I: NodeIdentity> {
118 LongTerm(I),
120 #[cfg(feature = "software-crypto")]
121 Ephemeral(umsh_crypto::software::SoftwareIdentity),
123}
124
125impl<I: NodeIdentity> LocalIdentity<I> {
126 pub fn public_key(&self) -> &PublicKey {
128 match self {
129 Self::LongTerm(identity) => identity.public_key(),
130 #[cfg(feature = "software-crypto")]
131 Self::Ephemeral(identity) => identity.public_key(),
132 }
133 }
134
135 pub fn hint(&self) -> umsh_core::NodeHint {
137 self.public_key().hint()
138 }
139
140 pub fn is_ephemeral(&self) -> bool {
142 match self {
143 Self::LongTerm(_) => false,
144 #[cfg(feature = "software-crypto")]
145 Self::Ephemeral(_) => true,
146 }
147 }
148}
149
150impl<I: NodeIdentity> From<I> for LocalIdentity<I> {
151 fn from(value: I) -> Self {
152 Self::LongTerm(value)
153 }
154}
155
156pub struct IdentitySlot<
192 I: NodeIdentity,
193 const PEERS: usize,
194 const ACKS: usize,
195 const FRAME: usize = MAX_RESEND_FRAME_LEN,
196> {
197 identity: LocalIdentity<I>,
198 peer_crypto: PeerCryptoMap<PEERS>,
199 frame_counter: u32,
200 persisted_counter: u32,
201 pending_persist_target: Option<u32>,
202 save_scheduled_since_boot: bool,
203 counter_persistence_enabled: bool,
204 pending_acks: LinearMap<SendReceipt, PendingAck<FRAME>, ACKS>,
205 next_receipt: u32,
206 pfs_parent: Option<LocalIdentityId>,
207 pending_counter_resync: LinearMap<PeerId, PendingCounterResync, PEERS>,
208}
209
210impl<I: NodeIdentity, const PEERS: usize, const ACKS: usize, const FRAME: usize>
211 IdentitySlot<I, PEERS, ACKS, FRAME>
212{
213 pub fn new(
215 identity: LocalIdentity<I>,
216 frame_counter: u32,
217 pfs_parent: Option<LocalIdentityId>,
218 ) -> Self {
219 let counter_persistence_enabled = !identity.is_ephemeral();
220 Self {
221 identity,
222 peer_crypto: PeerCryptoMap::new(),
223 frame_counter,
224 persisted_counter: frame_counter,
225 pending_persist_target: None,
226 save_scheduled_since_boot: false,
227 counter_persistence_enabled,
228 pending_acks: LinearMap::new(),
229 next_receipt: 0,
230 pfs_parent,
231 pending_counter_resync: LinearMap::new(),
232 }
233 }
234
235 pub fn identity(&self) -> &LocalIdentity<I> {
237 &self.identity
238 }
239 pub fn peer_crypto(&self) -> &PeerCryptoMap<PEERS> {
241 &self.peer_crypto
242 }
243 pub fn peer_crypto_mut(&mut self) -> &mut PeerCryptoMap<PEERS> {
245 &mut self.peer_crypto
246 }
247 pub fn frame_counter(&self) -> u32 {
249 self.frame_counter
250 }
251 pub fn persisted_counter(&self) -> u32 {
253 self.persisted_counter
254 }
255 #[cfg(test)]
260 pub(crate) fn set_frame_counter(&mut self, value: u32) {
261 self.frame_counter = value;
262 }
263 pub fn pending_persist_target(&self) -> Option<u32> {
265 self.pending_persist_target
266 }
267
268 pub fn counter_persistence_enabled(&self) -> bool {
270 self.counter_persistence_enabled
271 }
272
273 pub(crate) fn advance_frame_counter(&mut self) -> u32 {
275 let current = self.frame_counter;
276 self.frame_counter = self.frame_counter.wrapping_add(1);
277 current
278 }
279
280 pub fn load_persisted_counter(&mut self, value: u32) {
282 let aligned = align_counter_boundary(value);
283 self.frame_counter = aligned;
284 self.persisted_counter = aligned;
285 self.pending_persist_target = None;
286 self.save_scheduled_since_boot = false;
287 }
288
289 fn schedule_counter_persist_if_needed(&mut self) {
290 if !self.counter_persistence_enabled {
291 return;
292 }
293
294 let should_schedule = !self.save_scheduled_since_boot
295 || (self.frame_counter & COUNTER_PERSIST_BLOCK_MASK) == COUNTER_PERSIST_SCHEDULE_OFFSET;
296 if !should_schedule {
297 return;
298 }
299
300 let target = next_counter_persist_target(self.frame_counter);
301 self.pending_persist_target = Some(
302 self.pending_persist_target
303 .map(|existing| existing.max(target))
304 .unwrap_or(target),
305 );
306 self.save_scheduled_since_boot = true;
307 }
308
309 fn mark_counter_persisted(&mut self, value: u32) {
310 let aligned = align_counter_boundary(value);
311 self.persisted_counter = aligned;
312 if self.pending_persist_target == Some(aligned) {
313 self.pending_persist_target = None;
314 }
315 }
316
317 fn counter_window_exhausted(&self) -> bool {
318 if !self.counter_persistence_enabled {
319 return false;
320 }
321
322 let ahead = self.persisted_counter.wrapping_sub(self.frame_counter);
323 if ahead > 0 && ahead <= COUNTER_PERSIST_BLOCK_SIZE {
324 return false;
325 }
326
327 if ahead == 0 {
328 return self.save_scheduled_since_boot;
329 }
330
331 self.frame_counter.wrapping_sub(self.persisted_counter) >= COUNTER_PERSIST_BLOCK_SIZE
332 }
333
334 pub fn next_receipt(&mut self) -> SendReceipt {
336 let receipt = SendReceipt(self.next_receipt);
337 self.next_receipt = self.next_receipt.wrapping_add(1);
338 receipt
339 }
340
341 #[cfg(test)]
343 pub(crate) fn set_next_receipt_for_test(&mut self, value: u32) {
344 self.next_receipt = value;
345 }
346
347 pub fn try_insert_pending_ack(
349 &mut self,
350 receipt: SendReceipt,
351 pending: PendingAck<FRAME>,
352 ) -> Result<Option<PendingAck<FRAME>>, PendingAckError> {
353 self.pending_acks
354 .insert(receipt, pending)
355 .map_err(|_| PendingAckError::TableFull)
356 }
357
358 pub fn pending_ack(&self, receipt: &SendReceipt) -> Option<&PendingAck<FRAME>> {
360 self.pending_acks.get(receipt)
361 }
362
363 pub fn pending_ack_mut(&mut self, receipt: &SendReceipt) -> Option<&mut PendingAck<FRAME>> {
365 self.pending_acks.get_mut(receipt)
366 }
367
368 pub fn remove_pending_ack(&mut self, receipt: &SendReceipt) -> Option<PendingAck<FRAME>> {
370 self.pending_acks.remove(receipt)
371 }
372
373 pub fn pfs_parent(&self) -> Option<LocalIdentityId> {
375 self.pfs_parent
376 }
377
378 fn pending_counter_resync(&self) -> &LinearMap<PeerId, PendingCounterResync, PEERS> {
380 &self.pending_counter_resync
381 }
382
383 fn pending_counter_resync_mut(
385 &mut self,
386 ) -> &mut LinearMap<PeerId, PendingCounterResync, PEERS> {
387 &mut self.pending_counter_resync
388 }
389}
390
391#[derive(Clone, Debug, PartialEq, Eq)]
411pub struct ChannelPolicy {
412 pub channel_id: ChannelId,
414 pub require_unencrypted: bool,
416 pub require_full_source: bool,
418 pub max_flood_hops: Option<u8>,
420}
421
422#[derive(Clone, Copy, Debug, PartialEq, Eq)]
440pub enum AmateurRadioMode {
441 Unlicensed,
448 LicensedOnly,
454 Hybrid,
461}
462
463#[derive(Clone, Copy, Debug, PartialEq, Eq)]
464enum TransmitAuthority {
465 Unlicensed,
466 Amateur,
467}
468
469#[derive(Clone, Copy, Debug, PartialEq, Eq)]
470enum ForwardStationAction {
471 Remove,
472 Replace,
473}
474
475#[derive(Clone, Copy, Debug, PartialEq, Eq)]
476struct ForwardPlan {
477 router_hint: RouterHint,
478 consume_source_route: bool,
479 decrement_flood_hops: bool,
480 insert_region_code: Option<[u8; 2]>,
481 delay_ms: u64,
482 station_action: ForwardStationAction,
483}
484
485#[derive(Clone, Debug, PartialEq, Eq)]
504pub struct OperatingPolicy {
505 pub amateur_radio_mode: AmateurRadioMode,
507 pub operator_callsign: Option<HamAddr>,
509 pub channel_policies: Vec<ChannelPolicy, 4>,
511}
512
513impl Default for OperatingPolicy {
514 fn default() -> Self {
515 Self {
516 amateur_radio_mode: AmateurRadioMode::Unlicensed,
517 operator_callsign: None,
518 channel_policies: Vec::new(),
519 }
520 }
521}
522
523#[derive(Clone, Debug, PartialEq, Eq)]
552pub struct RepeaterConfig {
553 pub enabled: bool,
555 pub regions: Vec<[u8; 2], 8>,
557 pub min_rssi: Option<i16>,
559 pub min_snr: Option<i8>,
561 pub flood_contention_snr_low_db: i8,
563 pub flood_contention_snr_high_db: i8,
565 pub flood_contention_min_window_percent: u8,
567 pub flood_contention_max_window_frames: u8,
569 pub flood_contention_max_deferrals: u8,
571 pub amateur_radio_mode: AmateurRadioMode,
573 pub station_callsign: Option<HamAddr>,
575}
576
577impl Default for RepeaterConfig {
578 fn default() -> Self {
579 Self {
580 enabled: false,
581 regions: Vec::new(),
582 min_rssi: None,
583 min_snr: None,
584 flood_contention_snr_low_db: -6,
585 flood_contention_snr_high_db: 15,
586 flood_contention_min_window_percent: 20,
587 flood_contention_max_window_frames: 2,
588 flood_contention_max_deferrals: 3,
589 amateur_radio_mode: AmateurRadioMode::Unlicensed,
590 station_callsign: None,
591 }
592 }
593}
594
595#[derive(Clone, Debug, PartialEq, Eq)]
601pub enum SendError {
602 IdentityMissing,
605 PeerMissing,
608 PairwiseKeysMissing,
612 IdentityAgreementFailed,
614 ChannelMissing,
617 PolicyViolation,
620 Build(BuildError),
623 Parse(ParseError),
626 Crypto(CryptoError),
629 QueueFull,
632 PendingAckFull,
635 CounterPersistenceLag,
639}
640
641impl From<BuildError> for SendError {
642 fn from(value: BuildError) -> Self {
643 Self::Build(value)
644 }
645}
646
647impl From<ParseError> for SendError {
648 fn from(value: ParseError) -> Self {
649 Self::Parse(value)
650 }
651}
652
653impl From<CryptoError> for SendError {
654 fn from(value: CryptoError) -> Self {
655 Self::Crypto(value)
656 }
657}
658
659#[derive(Clone, Debug, PartialEq, Eq)]
665pub enum MacError<RadioError> {
666 Radio(RadioError),
670 Transmit(TxError<RadioError>),
673 QueueFull,
677}
678
679#[derive(Clone, Debug, PartialEq, Eq)]
685pub enum CounterPersistenceError<StoreError> {
686 IdentityMissing,
688 Store(StoreError),
692}
693
694impl<RadioError> From<RadioError> for MacError<RadioError> {
695 fn from(value: RadioError) -> Self {
696 Self::Radio(value)
697 }
698}
699
700impl<RadioError> From<TxError<RadioError>> for MacError<RadioError> {
701 fn from(value: TxError<RadioError>) -> Self {
702 Self::Transmit(value)
703 }
704}
705
706pub struct Mac<
768 P: Platform,
769 const IDENTITIES: usize = DEFAULT_IDENTITIES,
770 const PEERS: usize = DEFAULT_PEERS,
771 const CHANNELS: usize = DEFAULT_CHANNELS,
772 const ACKS: usize = DEFAULT_ACKS,
773 const TX: usize = DEFAULT_TX,
774 const FRAME: usize = MAX_RESEND_FRAME_LEN,
775 const DUP: usize = DEFAULT_DUP,
776> {
777 radio: P::Radio,
778 crypto: CryptoEngine<P::Aes, P::Sha>,
779 clock: P::Clock,
780 rng: P::Rng,
781 counter_store: P::CounterStore,
782 identities: Vec<Option<IdentitySlot<P::Identity, PEERS, ACKS, FRAME>>, IDENTITIES>,
783 peer_registry: PeerRegistry<PEERS>,
784 channels: ChannelTable<CHANNELS>,
785 dup_cache: DuplicateCache<DUP>,
786 multicast_unknown_dup_cache: DuplicateCache<DUP>,
787 tx_queue: TxQueue<TX, FRAME>,
788 post_tx_listen: Option<PostTxListen>,
789 repeater: RepeaterConfig,
790 operating_policy: OperatingPolicy,
791 auto_register_full_key_peers: bool,
792 deferred_counter_resync_frame: Option<DeferredCounterResyncFrame<FRAME>>,
793}
794
795impl<
796 P: Platform,
797 const IDENTITIES: usize,
798 const PEERS: usize,
799 const CHANNELS: usize,
800 const ACKS: usize,
801 const TX: usize,
802 const FRAME: usize,
803 const DUP: usize,
804> Mac<P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>
805{
806 pub fn new(
808 radio: P::Radio,
809 crypto: CryptoEngine<P::Aes, P::Sha>,
810 clock: P::Clock,
811 rng: P::Rng,
812 counter_store: P::CounterStore,
813 repeater: RepeaterConfig,
814 operating_policy: OperatingPolicy,
815 ) -> Self {
816 Self {
817 radio,
818 crypto,
819 clock,
820 rng,
821 counter_store,
822 identities: Vec::new(),
823 peer_registry: PeerRegistry::new(),
824 channels: ChannelTable::new(),
825 dup_cache: DuplicateCache::new(),
826 multicast_unknown_dup_cache: DuplicateCache::new(),
827 tx_queue: TxQueue::new(),
828 post_tx_listen: None,
829 repeater,
830 operating_policy,
831 auto_register_full_key_peers: false,
832 deferred_counter_resync_frame: None,
833 }
834 }
835
836 pub fn radio(&self) -> &P::Radio {
838 &self.radio
839 }
840
841 pub fn radio_mut(&mut self) -> &mut P::Radio {
843 &mut self.radio
844 }
845
846 pub fn crypto(&self) -> &CryptoEngine<P::Aes, P::Sha> {
848 &self.crypto
849 }
850
851 pub fn clock(&self) -> &P::Clock {
853 &self.clock
854 }
855
856 pub fn rng(&self) -> &P::Rng {
858 &self.rng
859 }
860
861 pub fn rng_mut(&mut self) -> &mut P::Rng {
863 &mut self.rng
864 }
865
866 pub fn counter_store(&self) -> &P::CounterStore {
868 &self.counter_store
869 }
870 pub fn tx_queue(&self) -> &TxQueue<TX, FRAME> {
872 &self.tx_queue
873 }
874 pub fn tx_queue_mut(&mut self) -> &mut TxQueue<TX, FRAME> {
876 &mut self.tx_queue
877 }
878 pub fn dup_cache(&self) -> &DuplicateCache<DUP> {
880 &self.dup_cache
881 }
882 pub fn peer_registry(&self) -> &PeerRegistry<PEERS> {
884 &self.peer_registry
885 }
886 pub fn peer_registry_mut(&mut self) -> &mut PeerRegistry<PEERS> {
888 &mut self.peer_registry
889 }
890 pub fn channels(&self) -> &ChannelTable<CHANNELS> {
892 &self.channels
893 }
894 pub fn channels_mut(&mut self) -> &mut ChannelTable<CHANNELS> {
896 &mut self.channels
897 }
898 pub fn repeater_config(&self) -> &RepeaterConfig {
900 &self.repeater
901 }
902 pub fn repeater_config_mut(&mut self) -> &mut RepeaterConfig {
904 &mut self.repeater
905 }
906 pub fn operating_policy(&self) -> &OperatingPolicy {
908 &self.operating_policy
909 }
910 pub fn operating_policy_mut(&mut self) -> &mut OperatingPolicy {
912 &mut self.operating_policy
913 }
914
915 pub fn auto_register_full_key_peers(&self) -> bool {
917 self.auto_register_full_key_peers
918 }
919
920 pub fn set_auto_register_full_key_peers(&mut self, enabled: bool) {
922 self.auto_register_full_key_peers = enabled;
923 }
924
925 pub fn add_identity(
927 &mut self,
928 identity: P::Identity,
929 ) -> Result<LocalIdentityId, CapacityError> {
930 self.insert_identity(LocalIdentity::LongTerm(identity), None)
931 }
932
933 pub async fn load_persisted_counter(
935 &mut self,
936 id: LocalIdentityId,
937 ) -> Result<u32, CounterPersistenceError<<P::CounterStore as CounterStore>::Error>> {
938 let context = {
939 let slot = self
940 .identity(id)
941 .ok_or(CounterPersistenceError::IdentityMissing)?;
942 if !slot.counter_persistence_enabled() {
943 return Ok(slot.frame_counter());
944 }
945 *slot.identity().public_key()
946 };
947 let loaded = self
948 .counter_store
949 .load(&context.0)
950 .await
951 .map_err(CounterPersistenceError::Store)?;
952 let aligned = align_counter_boundary(loaded);
953 let slot = self
954 .identity_mut(id)
955 .ok_or(CounterPersistenceError::IdentityMissing)?;
956 slot.load_persisted_counter(aligned);
957 Ok(aligned)
958 }
959
960 pub async fn service_counter_persistence(
962 &mut self,
963 ) -> Result<usize, <P::CounterStore as CounterStore>::Error> {
964 let mut pending = Vec::<(LocalIdentityId, [u8; 32], u32), IDENTITIES>::new();
965 for (index, slot) in self.identities.iter().enumerate() {
966 let Some(slot) = slot.as_ref() else {
967 continue;
968 };
969 let Some(target) = slot.pending_persist_target() else {
970 continue;
971 };
972 if !slot.counter_persistence_enabled() {
973 continue;
974 }
975 pending
976 .push((
977 LocalIdentityId(index as u8),
978 slot.identity().public_key().0,
979 target,
980 ))
981 .expect("identity enumeration must fit configured identity capacity");
982 }
983
984 let mut wrote = 0usize;
985 for (_, context, target) in pending.iter() {
986 self.counter_store
987 .store(context, align_counter_boundary(*target))
988 .await?;
989 wrote += 1;
990 }
991 if wrote > 0 {
992 self.counter_store.flush().await?;
993 for (id, _, target) in pending {
994 if let Some(slot) = self.identity_mut(id) {
995 slot.mark_counter_persisted(target);
996 }
997 }
998 }
999 Ok(wrote)
1000 }
1001
1002 #[cfg(feature = "software-crypto")]
1003 pub fn register_ephemeral(
1005 &mut self,
1006 parent: LocalIdentityId,
1007 identity: umsh_crypto::software::SoftwareIdentity,
1008 ) -> Result<LocalIdentityId, CapacityError> {
1009 self.insert_identity(LocalIdentity::Ephemeral(identity), Some(parent))
1010 }
1011
1012 #[cfg(feature = "software-crypto")]
1013 pub fn remove_ephemeral(&mut self, id: LocalIdentityId) -> bool {
1015 if let Some(slot) = self.identities.get_mut(id.0 as usize) {
1016 let should_remove = slot
1017 .as_ref()
1018 .map(|identity_slot| identity_slot.identity().is_ephemeral())
1019 .unwrap_or(false);
1020 if should_remove {
1021 *slot = None;
1022 return true;
1023 }
1024 }
1025 false
1026 }
1027
1028 pub fn identity(
1030 &self,
1031 id: LocalIdentityId,
1032 ) -> Option<&IdentitySlot<P::Identity, PEERS, ACKS, FRAME>> {
1033 self.identities.get(id.0 as usize)?.as_ref()
1034 }
1035
1036 pub fn identity_mut(
1038 &mut self,
1039 id: LocalIdentityId,
1040 ) -> Option<&mut IdentitySlot<P::Identity, PEERS, ACKS, FRAME>> {
1041 self.identities.get_mut(id.0 as usize)?.as_mut()
1042 }
1043
1044 pub fn add_peer(&mut self, key: PublicKey) -> Result<PeerId, CapacityError> {
1046 self.peer_registry.try_insert_or_update(key)
1047 }
1048
1049 pub fn add_channel(&mut self, key: ChannelKey) -> Result<(), CapacityError> {
1051 let derived = self.crypto.derive_channel_keys(&key);
1052 self.channels.try_add(key, derived)
1053 }
1054
1055 pub fn add_named_channel(&mut self, name: &str) -> Result<(), CapacityError> {
1057 let key = self.crypto.derive_named_channel_key(name);
1058 self.add_channel(key)
1059 }
1060
1061 pub fn identity_count(&self) -> usize {
1063 self.identities.iter().filter(|slot| slot.is_some()).count()
1064 }
1065
1066 #[cfg(any(feature = "unsafe-advanced", test))]
1073 pub(crate) fn install_pairwise_keys(
1074 &mut self,
1075 identity_id: LocalIdentityId,
1076 peer_id: PeerId,
1077 pairwise_keys: PairwiseKeys,
1078 ) -> Result<Option<crate::peers::PeerCryptoState>, SendError> {
1079 let slot = self
1080 .identity_mut(identity_id)
1081 .ok_or(SendError::IdentityMissing)?;
1082 slot.peer_crypto_mut()
1083 .insert(
1084 peer_id,
1085 crate::peers::PeerCryptoState {
1086 pairwise_keys,
1087 replay_window: ReplayWindow::new(),
1088 },
1089 )
1090 .map_err(|_| SendError::QueueFull)
1091 }
1092
1093 #[cfg(feature = "unsafe-advanced")]
1100 pub fn install_pairwise_keys_advanced(
1101 &mut self,
1102 identity_id: LocalIdentityId,
1103 peer_id: PeerId,
1104 pairwise_keys: PairwiseKeys,
1105 ) -> Result<Option<crate::peers::PeerCryptoState>, SendError> {
1106 self.install_pairwise_keys(identity_id, peer_id, pairwise_keys)
1107 }
1108
1109 pub fn queue_broadcast(
1111 &mut self,
1112 from: LocalIdentityId,
1113 payload: &[u8],
1114 options: &SendOptions,
1115 ) -> Result<SendReceipt, SendError> {
1116 let mut options = options.clone();
1122 options.encrypted = false;
1123 options.ack_requested = false;
1124 options.salt = false;
1125 let options = &options;
1126 self.enforce_send_policy(None, options, false)?;
1127
1128 let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
1129 let source_key = *slot.identity().public_key();
1130 let receipt = slot.next_receipt();
1131 let mut buf = [0u8; FRAME];
1132 let builder = PacketBuilder::new(&mut buf).broadcast();
1133 let mut builder = if options.full_source {
1134 builder.source_full(&source_key)
1135 } else {
1136 builder.source_hint(source_key.hint())
1137 };
1138 if let Some(hops) = options.flood_hops {
1139 builder = builder.flood_hops(hops);
1140 }
1141 if options.trace_route {
1142 builder = builder.trace_route();
1143 }
1144 if let Some(route) = options.source_route.as_ref() {
1145 builder = builder.source_route(route.as_slice());
1146 }
1147 if let Some(region_code) = options.region_code {
1148 builder = builder.region_code(region_code);
1149 }
1150 if let Some(callsign) = self.operating_policy.operator_callsign {
1151 builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1152 }
1153 let frame = builder.payload(payload).build()?;
1154 if frame.len() > self.radio.max_frame_size() {
1155 return Err(SendError::Build(BuildError::BufferTooSmall));
1156 }
1157 self.tx_queue
1158 .enqueue(TxPriority::Application, frame, Some(receipt), Some(from))
1159 .map_err(|_| SendError::QueueFull)?;
1160 Ok(receipt)
1161 }
1162
1163 pub async fn send_broadcast(
1165 &mut self,
1166 from: LocalIdentityId,
1167 payload: &[u8],
1168 options: &SendOptions,
1169 ) -> Result<SendReceipt, SendError> {
1170 self.queue_broadcast(from, payload, options)
1171 }
1172
1173 pub fn queue_multicast(
1175 &mut self,
1176 from: LocalIdentityId,
1177 channel_id: &ChannelId,
1178 payload: &[u8],
1179 options: &SendOptions,
1180 ) -> Result<SendReceipt, SendError> {
1181 self.enforce_send_policy(Some(*channel_id), options, false)?;
1182 let derived = self
1186 .channels
1187 .lookup_by_id(channel_id)
1188 .next()
1189 .ok_or(SendError::ChannelMissing)?
1190 .derived
1191 .clone();
1192 let keys = PairwiseKeys {
1193 k_enc: derived.k_enc,
1194 k_mic: derived.k_mic,
1195 };
1196 let receipt = self
1197 .identity_mut(from)
1198 .ok_or(SendError::IdentityMissing)?
1199 .next_receipt();
1200 let (source_key, frame_counter) = self.identity_and_advance(from)?;
1201 let salt = self.take_salt(options);
1202 let mut buf = [0u8; FRAME];
1203 let builder = PacketBuilder::new(&mut buf).multicast(*channel_id);
1204 let builder = if options.full_source {
1205 builder.source_full(&source_key)
1206 } else {
1207 builder.source_hint(source_key.hint())
1208 };
1209 let mut builder = builder.frame_counter(frame_counter);
1210 if options.encrypted {
1211 builder = builder.encrypted();
1212 }
1213 builder = builder.mic_size(options.mic_size);
1214 if let Some(salt) = salt {
1215 builder = builder.salt(salt);
1216 }
1217 if let Some(hops) = options.flood_hops {
1218 builder = builder.flood_hops(hops);
1219 }
1220 if options.trace_route {
1221 builder = builder.trace_route();
1222 }
1223 if let Some(route) = options.source_route.as_ref() {
1224 builder = builder.source_route(route.as_slice());
1225 }
1226 if let Some(region_code) = options.region_code {
1227 builder = builder.region_code(region_code);
1228 }
1229 if let Some(callsign) = self.operating_policy.operator_callsign {
1230 builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1231 }
1232 let mut packet = builder.payload(payload).build()?;
1233 self.crypto.seal_packet(&mut packet, &keys)?;
1234 self.enqueue_packet(packet, Some(receipt), Some(from))?;
1235 Ok(receipt)
1236 }
1237
1238 pub async fn send_multicast(
1240 &mut self,
1241 from: LocalIdentityId,
1242 channel_id: &ChannelId,
1243 payload: &[u8],
1244 options: &SendOptions,
1245 ) -> Result<SendReceipt, SendError> {
1246 self.queue_multicast(from, channel_id, payload, options)
1247 }
1248
1249 pub fn queue_mac_ack_for_peer(
1251 &mut self,
1252 peer_id: PeerId,
1253 dst: NodeHint,
1254 ack_tag: [u8; 8],
1255 ) -> Result<(), SendError> {
1256 let mut buf = [0u8; FRAME];
1257 let mut builder = PacketBuilder::new(&mut buf).mac_ack(dst, ack_tag);
1258 if let Some(peer) = self.peer_registry.get(peer_id) {
1259 match peer.route.as_ref() {
1260 Some(CachedRoute::Direct) | None => {}
1261 Some(CachedRoute::Source(route)) => {
1262 builder = builder.source_route(route.as_slice());
1263 }
1264 Some(CachedRoute::Flood { hops, regions }) => {
1265 builder = builder.flood_hops((*hops).clamp(1, 15));
1266 for region in regions {
1267 builder = builder.region_code(*region);
1268 }
1269 }
1270 }
1271 }
1272 let frame = builder.build()?;
1273 if frame.len() > self.radio.max_frame_size() {
1274 return Err(SendError::Build(BuildError::BufferTooSmall));
1275 }
1276 self.tx_queue
1277 .enqueue(TxPriority::ImmediateAck, frame, None, None)
1278 .map_err(|_| SendError::QueueFull)?;
1279 Ok(())
1280 }
1281
1282 pub fn queue_mac_ack(&mut self, dst: NodeHint, ack_tag: [u8; 8]) -> Result<(), SendError> {
1284 let mut buf = [0u8; FRAME];
1285 let frame = PacketBuilder::new(&mut buf).mac_ack(dst, ack_tag).build()?;
1286 if frame.len() > self.radio.max_frame_size() {
1287 return Err(SendError::Build(BuildError::BufferTooSmall));
1288 }
1289 self.tx_queue
1290 .enqueue(TxPriority::ImmediateAck, frame, None, None)
1291 .map_err(|_| SendError::QueueFull)?;
1292 Ok(())
1293 }
1294
1295 pub fn queue_unicast(
1297 &mut self,
1298 from: LocalIdentityId,
1299 peer: &PublicKey,
1300 payload: &[u8],
1301 options: &SendOptions,
1302 ) -> Result<Option<SendReceipt>, SendError> {
1303 self.enforce_send_policy(None, options, false)?;
1304 let (peer_id, _) = self
1305 .peer_registry
1306 .lookup_by_key(peer)
1307 .ok_or(SendError::PeerMissing)?;
1308 let pairwise_keys = self
1309 .identity(from)
1310 .ok_or(SendError::IdentityMissing)?
1311 .peer_crypto()
1312 .get(&peer_id)
1313 .ok_or(SendError::PairwiseKeysMissing)?
1314 .pairwise_keys
1315 .clone();
1316 let effective_source_route = self.effective_source_route(peer_id, options);
1317
1318 let (source_key, frame_counter) = self.identity_and_advance(from)?;
1319 let salt = self.take_salt(options);
1320 let mut buf = [0u8; FRAME];
1321 let builder = PacketBuilder::new(&mut buf).unicast(peer.hint());
1322 let builder = if options.full_source {
1323 builder.source_full(&source_key)
1324 } else {
1325 builder.source_hint(source_key.hint())
1326 };
1327 let mut builder = builder.frame_counter(frame_counter);
1328 if options.ack_requested {
1329 builder = builder.ack_requested();
1330 }
1331 if options.encrypted {
1332 builder = builder.encrypted();
1333 }
1334 builder = builder.mic_size(options.mic_size);
1335 if let Some(salt) = salt {
1336 builder = builder.salt(salt);
1337 }
1338 if let Some(hops) = options.flood_hops {
1339 builder = builder.flood_hops(hops);
1340 }
1341 if options.trace_route {
1342 builder = builder.trace_route();
1343 }
1344 if let Some(route) = effective_source_route.as_ref() {
1345 builder = builder.source_route(route.as_slice());
1346 }
1347 if let Some(region_code) = options.region_code {
1348 builder = builder.region_code(region_code);
1349 }
1350 if let Some(callsign) = self.operating_policy.operator_callsign {
1351 builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1352 }
1353 let mut packet = builder.payload(payload).build()?;
1354
1355 let receipt = if options.ack_requested {
1356 Some(self.prepare_pending_ack(from, *peer, &packet, &pairwise_keys, options)?)
1357 } else {
1358 None
1359 };
1360
1361 self.crypto.seal_packet(&mut packet, &pairwise_keys)?;
1362 if let Some(receipt) = receipt {
1363 self.refresh_pending_resend(
1364 from,
1365 receipt,
1366 packet.as_bytes(),
1367 effective_source_route
1368 .as_ref()
1369 .map(|route| route.as_slice()),
1370 )?;
1371 }
1372 if let Err(err) = self.enqueue_packet(packet, receipt, Some(from)) {
1373 if let Some(receipt) = receipt {
1374 let _ = self
1375 .identity_mut(from)
1376 .and_then(|slot| slot.remove_pending_ack(&receipt));
1377 }
1378 return Err(err);
1379 }
1380 Ok(receipt)
1381 }
1382
1383 pub async fn send_unicast(
1385 &mut self,
1386 from: LocalIdentityId,
1387 peer: &PublicKey,
1388 payload: &[u8],
1389 options: &SendOptions,
1390 ) -> Result<Option<SendReceipt>, SendError> {
1391 let (peer_id, _) = self
1392 .peer_registry
1393 .lookup_by_key(peer)
1394 .ok_or(SendError::PeerMissing)?;
1395 let _ = self.ensure_peer_crypto(from, peer_id).await?;
1396 self.queue_unicast(from, peer, payload, options)
1397 }
1398
1399 pub fn queue_blind_unicast(
1401 &mut self,
1402 from: LocalIdentityId,
1403 peer: &PublicKey,
1404 channel_id: &ChannelId,
1405 payload: &[u8],
1406 options: &SendOptions,
1407 ) -> Result<Option<SendReceipt>, SendError> {
1408 self.enforce_send_policy(Some(*channel_id), options, true)?;
1409 let (peer_id, _) = self
1410 .peer_registry
1411 .lookup_by_key(peer)
1412 .ok_or(SendError::PeerMissing)?;
1413 let pairwise_keys = self
1414 .identity(from)
1415 .ok_or(SendError::IdentityMissing)?
1416 .peer_crypto()
1417 .get(&peer_id)
1418 .ok_or(SendError::PairwiseKeysMissing)?
1419 .pairwise_keys
1420 .clone();
1421 let channel_keys = self
1422 .channels
1423 .lookup_by_id(channel_id)
1424 .next()
1425 .ok_or(SendError::ChannelMissing)?
1426 .derived
1427 .clone();
1428 let blind_keys = self.crypto.derive_blind_keys(&pairwise_keys, &channel_keys);
1429 let effective_source_route = self.effective_source_route(peer_id, options);
1430
1431 let (source_key, frame_counter) = self.identity_and_advance(from)?;
1432 let salt = self.take_salt(options);
1433 let mut buf = [0u8; FRAME];
1434 let builder = PacketBuilder::new(&mut buf).blind_unicast(*channel_id, peer.hint());
1435 let builder = if options.full_source {
1436 builder.source_full(&source_key)
1437 } else {
1438 builder.source_hint(source_key.hint())
1439 };
1440 let mut builder = builder.frame_counter(frame_counter);
1441 if options.ack_requested {
1442 builder = builder.ack_requested();
1443 }
1444 if !options.encrypted {
1445 builder = builder.unencrypted();
1446 }
1447 builder = builder.mic_size(options.mic_size);
1448 if let Some(salt) = salt {
1449 builder = builder.salt(salt);
1450 }
1451 if let Some(hops) = options.flood_hops {
1452 builder = builder.flood_hops(hops);
1453 }
1454 if options.trace_route {
1455 builder = builder.trace_route();
1456 }
1457 if let Some(route) = effective_source_route.as_ref() {
1458 builder = builder.source_route(route.as_slice());
1459 }
1460 if let Some(region_code) = options.region_code {
1461 builder = builder.region_code(region_code);
1462 }
1463 if let Some(callsign) = self.operating_policy.operator_callsign {
1464 builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1465 }
1466 let mut packet = builder.payload(payload).build()?;
1467
1468 let receipt = if options.ack_requested {
1469 Some(self.prepare_pending_ack(from, *peer, &packet, &blind_keys, options)?)
1470 } else {
1471 None
1472 };
1473
1474 self.crypto
1475 .seal_blind_packet(&mut packet, &blind_keys, &channel_keys)
1476 .map_err(SendError::Crypto)?;
1477 if let Some(receipt) = receipt {
1478 self.refresh_pending_resend(
1479 from,
1480 receipt,
1481 packet.as_bytes(),
1482 effective_source_route
1483 .as_ref()
1484 .map(|route| route.as_slice()),
1485 )?;
1486 }
1487 if let Err(err) = self.enqueue_packet(packet, receipt, Some(from)) {
1488 if let Some(receipt) = receipt {
1489 let _ = self
1490 .identity_mut(from)
1491 .and_then(|slot| slot.remove_pending_ack(&receipt));
1492 }
1493 return Err(err);
1494 }
1495 Ok(receipt)
1496 }
1497
1498 pub async fn send_blind_unicast(
1500 &mut self,
1501 from: LocalIdentityId,
1502 peer: &PublicKey,
1503 channel_id: &ChannelId,
1504 payload: &[u8],
1505 options: &SendOptions,
1506 ) -> Result<Option<SendReceipt>, SendError> {
1507 let (peer_id, _) = self
1508 .peer_registry
1509 .lookup_by_key(peer)
1510 .ok_or(SendError::PeerMissing)?;
1511 let _ = self.ensure_peer_crypto(from, peer_id).await?;
1512 self.queue_blind_unicast(from, peer, channel_id, payload, options)
1513 }
1514
1515 pub async fn transmit_next(
1523 &mut self,
1524 on_event: &mut impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1525 ) -> Result<Option<SendReceipt>, MacError<<P::Radio as Radio>::Error>> {
1526 self.expire_post_tx_listen_if_needed();
1527 let Some(queued) = self.tx_queue.pop_next() else {
1528 return Ok(None);
1529 };
1530 let now_ms = self.clock.now_ms();
1531
1532 if queued.not_before_ms > now_ms {
1533 self.requeue_tx(&queued).map_err(|_| MacError::QueueFull)?;
1534 return Ok(None);
1535 }
1536
1537 if self.post_tx_listen.is_some() && queued.priority != TxPriority::ImmediateAck {
1538 self.requeue_tx(&queued).map_err(|_| MacError::QueueFull)?;
1539 return Ok(None);
1540 }
1541
1542 let receipt = queued.receipt;
1543 let identity_id = queued.identity_id;
1544 let tx_options = if queued.priority == TxPriority::ImmediateAck {
1545 TxOptions::default()
1546 } else {
1547 TxOptions {
1548 cad_timeout_ms: Some(0),
1549 }
1550 };
1551 match self
1552 .radio
1553 .transmit(queued.frame.as_slice(), tx_options)
1554 .await
1555 {
1556 Ok(()) => {}
1557 Err(TxError::CadTimeout) => {
1558 let next_attempt = queued.cad_attempts.saturating_add(1);
1559 if next_attempt >= MAX_CAD_ATTEMPTS {
1560 return Ok(None);
1561 }
1562 let backoff_ms = u64::from(
1563 self.rng
1564 .random_range(..self.radio.t_frame_ms().saturating_add(1)),
1565 );
1566 self.tx_queue
1567 .enqueue_with_state(
1568 queued.priority,
1569 queued.frame.as_slice(),
1570 queued.receipt,
1571 queued.identity_id,
1572 now_ms.saturating_add(backoff_ms),
1573 next_attempt,
1574 queued.forward_deferrals,
1575 )
1576 .map_err(|_| MacError::QueueFull)?;
1577 return Ok(None);
1578 }
1579 Err(error) => return Err(MacError::Transmit(error)),
1580 }
1581 if let Some(identity_id) = identity_id {
1582 on_event(
1583 identity_id,
1584 crate::MacEventRef::Transmitted {
1585 identity_id,
1586 receipt,
1587 },
1588 );
1589 }
1590 if let Some(receipt) = receipt {
1591 self.note_transmitted_ack_requested(receipt, queued.frame.as_slice());
1592 }
1593 Ok(receipt)
1594 }
1595
1596 pub async fn drain_tx_queue(
1601 &mut self,
1602 on_event: &mut impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1603 ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1604 while !self.tx_queue.is_empty() {
1605 let queue_len = self.tx_queue.len();
1606 let _ = self.transmit_next(on_event).await?;
1607 if self.tx_queue.len() >= queue_len {
1608 break;
1609 }
1610 }
1611 Ok(())
1612 }
1613
1614 pub async fn poll_cycle(
1627 &mut self,
1628 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1629 ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1630 self.drain_tx_queue(&mut on_event).await?;
1631 if self.post_tx_listen.is_some() {
1632 self.service_post_tx_listen(&mut on_event).await?;
1633 } else {
1634 let _ = self.receive_one(&mut on_event).await?;
1635 }
1636 self.drain_tx_queue(&mut on_event).await?;
1637 self.service_pending_ack_timeouts(&mut on_event)
1638 .map_err(|_| MacError::QueueFull)?;
1639 Ok(())
1640 }
1641
1642 pub fn earliest_deadline_ms(&self) -> Option<u64> {
1649 let mut earliest: Option<u64> = None;
1650
1651 if let Some(listen) = &self.post_tx_listen {
1652 earliest =
1653 Some(earliest.map_or(listen.deadline_ms, |e: u64| e.min(listen.deadline_ms)));
1654 }
1655
1656 for slot in self.identities.iter().filter_map(|s| s.as_ref()) {
1657 for (_, pending) in slot.pending_acks.iter() {
1658 if !matches!(pending.state, crate::AckState::Queued { .. }) {
1659 earliest = Some(earliest.map_or(pending.ack_deadline_ms, |e: u64| {
1660 e.min(pending.ack_deadline_ms)
1661 }));
1662 }
1663 if let crate::AckState::AwaitingForward {
1664 confirm_deadline_ms,
1665 } = pending.state
1666 {
1667 earliest = Some(
1668 earliest.map_or(confirm_deadline_ms, |e: u64| e.min(confirm_deadline_ms)),
1669 );
1670 }
1671 }
1672 }
1673
1674 if let Some(nb) = self.tx_queue.earliest_not_before_ms() {
1675 earliest = Some(earliest.map_or(nb, |e: u64| e.min(nb)));
1676 }
1677
1678 earliest
1679 }
1680
1681 pub async fn next_event(
1700 &mut self,
1701 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1702 ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1703 loop {
1704 self.drain_tx_queue(&mut on_event).await?;
1706
1707 let mut buf = [0u8; FRAME];
1709 let reason = poll_fn(|cx| self.poll_wait_for_wake(cx, &mut buf))
1710 .await
1711 .map_err(MacError::Radio)?;
1712
1713 self.process_wake_reason(reason, &mut buf, &mut on_event)
1715 .await?;
1716
1717 if !self.tx_queue.is_empty() {
1720 continue;
1721 }
1722
1723 return Ok(());
1724 }
1725 }
1726
1727 pub fn poll_wait_for_wake(
1735 &mut self,
1736 cx: &mut core::task::Context<'_>,
1737 buf: &mut [u8; FRAME],
1738 ) -> Poll<Result<WakeReason, <P::Radio as Radio>::Error>> {
1739 match self.radio.poll_receive(cx, buf) {
1740 Poll::Ready(Ok(rx)) => return Poll::Ready(Ok(WakeReason::Received(rx))),
1741 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
1742 Poll::Pending => {}
1743 }
1744
1745 let now_ms = self.clock.now_ms();
1746 if let Some(deadline) = self.earliest_deadline_ms() {
1747 if now_ms >= deadline {
1748 return Poll::Ready(Ok(WakeReason::TimerExpired));
1749 }
1750 let _ = self.clock.poll_delay_until(cx, deadline);
1751 }
1752
1753 if self.tx_queue.has_ready(now_ms) {
1754 return Poll::Ready(Ok(WakeReason::TimerExpired));
1755 }
1756
1757 Poll::Pending
1758 }
1759
1760 pub async fn process_wake_reason(
1764 &mut self,
1765 reason: WakeReason,
1766 buf: &mut [u8; FRAME],
1767 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1768 ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1769 match reason {
1770 WakeReason::Received(rx) => {
1771 let frame_len = rx.len.min(buf.len());
1772 let _ = self
1773 .process_received_frame(buf, frame_len, &rx, &mut on_event)
1774 .await;
1775 }
1776 WakeReason::TimerExpired => {}
1777 }
1778
1779 self.drain_tx_queue(&mut on_event).await?;
1780
1781 self.service_pending_ack_timeouts(&mut on_event)
1782 .map_err(|_| MacError::QueueFull)?;
1783
1784 Ok(())
1785 }
1786
1787 pub async fn run(
1795 &mut self,
1796 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1797 ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1798 loop {
1799 self.next_event(&mut on_event).await?;
1800 }
1801 }
1802
1803 pub async fn run_quiet(&mut self) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1809 self.run(|_, _| {}).await
1810 }
1811
1812 pub async fn process_received_frame(
1818 &mut self,
1819 buf: &mut [u8; FRAME],
1820 frame_len: usize,
1821 rx: &RxInfo,
1822 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1823 ) -> bool {
1824 let received_at_ms = self.clock.now_ms();
1825 let mut current_len = frame_len;
1826 let mut current_rx = RxInfo {
1827 len: frame_len,
1828 rssi: rx.rssi,
1829 snr: rx.snr,
1830 lqi: rx.lqi,
1831 };
1832 let mut current_received_at_ms = received_at_ms;
1833 let mut handled_any = false;
1834
1835 loop {
1836 let Ok(header) = PacketHeader::parse(&buf[..current_len]) else {
1837 return handled_any;
1838 };
1839 let forwarding_confirmed = if let Some((identity_id, receipt)) =
1840 self.observe_forwarding_confirmation(&buf[..current_len])
1841 {
1842 let hint = match header.source {
1843 SourceAddrRef::Hint(h) => Some(RouterHint([h.0[0], h.0[1]])),
1844 SourceAddrRef::FullKeyAt { offset } => {
1845 let mut key_bytes = [0u8; 32];
1846 key_bytes.copy_from_slice(&buf[offset..offset + 32]);
1847 let h = PublicKey(key_bytes).hint();
1848 Some(RouterHint([h.0[0], h.0[1]]))
1849 }
1850 _ => None,
1851 };
1852 on_event(
1853 identity_id,
1854 crate::MacEventRef::Forwarded {
1855 identity_id,
1856 receipt,
1857 hint,
1858 },
1859 );
1860 true
1861 } else {
1862 false
1863 };
1864
1865 let (handled, replay_target) = match header.packet_type() {
1866 PacketType::Broadcast => (
1867 self.process_broadcast(
1868 buf,
1869 current_len,
1870 &header,
1871 ¤t_rx,
1872 current_received_at_ms,
1873 &mut on_event,
1874 ),
1875 None,
1876 ),
1877 PacketType::MacAck => (
1878 self.process_mac_ack(
1879 buf,
1880 current_len,
1881 &header,
1882 ¤t_rx,
1883 forwarding_confirmed,
1884 &mut on_event,
1885 ),
1886 None,
1887 ),
1888 PacketType::Unicast | PacketType::UnicastAckReq => {
1889 self.process_unicast(
1890 buf,
1891 current_len,
1892 &header,
1893 ¤t_rx,
1894 current_received_at_ms,
1895 forwarding_confirmed,
1896 &mut on_event,
1897 )
1898 .await
1899 }
1900 PacketType::Multicast => (
1901 self.process_multicast(
1902 buf,
1903 current_len,
1904 &header,
1905 ¤t_rx,
1906 current_received_at_ms,
1907 forwarding_confirmed,
1908 &mut on_event,
1909 ),
1910 None,
1911 ),
1912 PacketType::BlindUnicast | PacketType::BlindUnicastAckReq => {
1913 self.process_blind_unicast(
1914 buf,
1915 current_len,
1916 &header,
1917 ¤t_rx,
1918 current_received_at_ms,
1919 forwarding_confirmed,
1920 &mut on_event,
1921 )
1922 .await
1923 }
1924 PacketType::Reserved5 => (false, None),
1925 };
1926 handled_any |= handled;
1927
1928 let Some((local_id, peer_id)) = replay_target else {
1929 return handled_any;
1930 };
1931 let Some(deferred) = self.take_deferred_counter_resync_frame(local_id, peer_id) else {
1932 return handled_any;
1933 };
1934 current_len = deferred.frame.len();
1935 buf[..current_len].copy_from_slice(deferred.frame.as_slice());
1936 current_rx = RxInfo {
1937 len: current_len,
1938 rssi: deferred.rssi,
1939 snr: deferred.snr,
1940 lqi: deferred.lqi,
1941 };
1942 current_received_at_ms = deferred.received_at_ms;
1943 }
1944 }
1945
1946 fn process_broadcast(
1947 &mut self,
1948 buf: &[u8; FRAME],
1949 frame_len: usize,
1950 header: &PacketHeader,
1951 rx: &RxInfo,
1952 received_at_ms: u64,
1953 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1954 ) -> bool {
1955 let Some((from_hint, from_key)) = Self::resolve_broadcast_source(&buf[..frame_len], header)
1956 else {
1957 return false;
1958 };
1959 if !Self::payload_is_allowed(header.packet_type(), &buf[header.body_range.clone()]) {
1960 return false;
1961 }
1962 let mut delivered = false;
1963 for (index, slot) in self.identities.iter().enumerate() {
1964 if slot.is_none() {
1965 continue;
1966 }
1967 delivered = true;
1968 on_event(
1969 LocalIdentityId(index as u8),
1970 crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
1971 &buf[..frame_len],
1972 &buf[header.body_range.clone()],
1973 header.clone(),
1974 ParsedOptions::extract(&buf[..frame_len], header.options_range.clone())
1975 .unwrap_or_default(),
1976 from_key,
1977 Some(from_hint),
1978 false,
1979 None,
1980 crate::send::RxMetadata::new(
1981 Some(rx.rssi),
1982 Some(rx.snr),
1983 rx.lqi,
1984 Some(received_at_ms),
1985 ),
1986 )),
1987 );
1988 }
1989 let forwarded = self.maybe_forward_received(&buf[..frame_len], header, rx, false);
1993 delivered || forwarded
1994 }
1995
1996 fn process_mac_ack(
1997 &mut self,
1998 buf: &[u8; FRAME],
1999 frame_len: usize,
2000 header: &PacketHeader,
2001 rx: &RxInfo,
2002 forwarding_confirmed: bool,
2003 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2004 ) -> bool {
2005 let Some(ack_dst) = header.ack_dst else {
2006 return false;
2007 };
2008 let target_peer = self
2009 .identities
2010 .iter()
2011 .filter_map(|slot| slot.as_ref())
2012 .find(|slot| slot.identity().public_key().hint() == ack_dst)
2013 .and_then(|slot| self.match_pending_peer_for_ack(slot, &buf[header.mic_range.clone()]));
2014 if let Some(target_peer) = target_peer {
2015 let mut ack_tag = [0u8; 8];
2016 ack_tag.copy_from_slice(&buf[header.mic_range.clone()]);
2017 if let Some((identity_id, receipt)) = self.complete_ack(&target_peer, &ack_tag) {
2018 on_event(
2019 identity_id,
2020 crate::MacEventRef::AckReceived {
2021 peer: target_peer,
2022 receipt,
2023 },
2024 );
2025 return true;
2026 }
2027 }
2028 forwarding_confirmed || self.maybe_forward_received(&buf[..frame_len], header, rx, false)
2029 }
2030
2031 async fn process_unicast(
2032 &mut self,
2033 buf: &mut [u8; FRAME],
2034 frame_len: usize,
2035 header: &PacketHeader,
2036 rx: &RxInfo,
2037 received_at_ms: u64,
2038 forwarding_confirmed: bool,
2039 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2040 ) -> (bool, Option<(LocalIdentityId, PeerId)>) {
2041 let mut original = [0u8; FRAME];
2042 original[..frame_len].copy_from_slice(&buf[..frame_len]);
2043 let mut replay_target = None;
2044 let handled = if let Some(local_id) = self.find_local_identity_for_dst(header.dst) {
2045 let mut handled = false;
2046 for (peer_id, peer_key) in
2047 self.resolve_source_peer_candidates(&buf[..frame_len], header)
2048 {
2049 let Ok(keys) = self.ensure_peer_crypto(local_id, peer_id).await else {
2050 continue;
2051 };
2052 let Ok(body_range) = self
2053 .crypto
2054 .open_packet(&mut buf[..frame_len], header, &keys)
2055 else {
2056 continue;
2057 };
2058 let payload = &buf[body_range.clone()];
2059 if !Self::payload_is_allowed(header.packet_type(), payload) {
2060 continue;
2061 }
2062 match self.unicast_replay_verdict(local_id, peer_id, header, &buf[..frame_len]) {
2063 Some(ReplayVerdict::Accept) => {
2064 let _ = self.accept_unicast_replay(
2065 local_id,
2066 peer_id,
2067 header,
2068 &buf[..frame_len],
2069 );
2070 }
2071 Some(ReplayVerdict::OutOfWindow | ReplayVerdict::Stale) => {
2072 if self.try_accept_counter_resync_response(
2073 local_id,
2074 peer_id,
2075 header,
2076 &buf[..frame_len],
2077 payload,
2078 ) {
2079 replay_target = Some((local_id, peer_id));
2080 } else {
2081 self.store_deferred_counter_resync_frame(
2082 local_id,
2083 peer_id,
2084 &original[..frame_len],
2085 rx,
2086 received_at_ms,
2087 );
2088 self.maybe_request_counter_resync(local_id, peer_id, peer_key)
2089 .await;
2090 continue;
2091 }
2092 }
2093 Some(ReplayVerdict::Replay) | None => continue,
2094 }
2095 self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2096
2097 if header.ack_requested()
2098 && self.should_emit_destination_ack(&buf[..frame_len], header)
2099 {
2100 let ack_tag = self.compute_received_ack_tag(
2101 &buf[..frame_len],
2102 header,
2103 body_range.clone(),
2104 &keys,
2105 );
2106 self.queue_mac_ack_for_peer(peer_id, peer_key.hint(), ack_tag)
2107 .ok();
2108 }
2109
2110 if let Some(data) = Self::echo_request_data(payload) {
2111 let response =
2112 Self::build_echo_command_payload(MAC_COMMAND_ECHO_RESPONSE_ID, data);
2113 let _ = self
2114 .send_unicast(
2115 local_id,
2116 &peer_key,
2117 response.as_slice(),
2118 &SendOptions::default(),
2119 )
2120 .await;
2121 }
2122
2123 on_event(
2124 local_id,
2125 crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2126 &original[..frame_len],
2127 &buf[body_range],
2128 header.clone(),
2129 ParsedOptions::extract(
2130 &original[..frame_len],
2131 header.options_range.clone(),
2132 )
2133 .unwrap_or_default(),
2134 Some(peer_key),
2135 Some(peer_key.hint()),
2136 true,
2137 None,
2138 crate::send::RxMetadata::new(
2139 Some(rx.rssi),
2140 Some(rx.snr),
2141 rx.lqi,
2142 Some(received_at_ms),
2143 ),
2144 )),
2145 );
2146 handled = true;
2147 break;
2148 }
2149 handled
2150 } else {
2151 false
2152 };
2153 let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, handled);
2154 (
2155 handled || forwarding_confirmed || forwarded,
2156 handled.then_some(()).and(replay_target),
2157 )
2158 }
2159
2160 fn process_multicast(
2161 &mut self,
2162 buf: &mut [u8; FRAME],
2163 frame_len: usize,
2164 header: &PacketHeader,
2165 rx: &RxInfo,
2166 received_at_ms: u64,
2167 forwarding_confirmed: bool,
2168 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2169 ) -> bool {
2170 let mut original = [0u8; FRAME];
2171 original[..frame_len].copy_from_slice(&buf[..frame_len]);
2172 let delivered = if let Some(channel_id) = header.channel {
2173 let channel_info = {
2174 self.channels
2175 .lookup_by_id(&channel_id)
2176 .next()
2177 .map(|channel| (channel.channel_key.clone(), channel.derived.clone()))
2178 };
2179 if let Some((channel_key, derived)) = channel_info {
2180 let keys = PairwiseKeys {
2181 k_enc: derived.k_enc,
2182 k_mic: derived.k_mic,
2183 };
2184 if let Ok(body_range) =
2185 self.crypto
2186 .open_packet(&mut buf[..frame_len], header, &keys)
2187 {
2188 if !Self::payload_is_allowed(header.packet_type(), &buf[body_range.clone()]) {
2189 false
2190 } else if let Some(source) =
2191 self.resolve_multicast_source(&buf[..frame_len], header)
2192 {
2193 let accepted = if let Some(peer_id) = source.peer_id {
2194 let accepted = self.accept_multicast_replay(
2195 channel_id,
2196 peer_id,
2197 header,
2198 &buf[..frame_len],
2199 );
2200 if accepted {
2201 self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2202 }
2203 accepted
2204 } else {
2205 self.accept_unknown_multicast_replay(header, &buf[..frame_len])
2206 };
2207 if accepted {
2208 let mut delivered = false;
2209 for (index, slot) in self.identities.iter().enumerate() {
2210 if slot.is_none() {
2211 continue;
2212 }
2213 delivered = true;
2214 on_event(
2215 LocalIdentityId(index as u8),
2216 crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2217 &original[..frame_len],
2218 &buf[body_range.clone()],
2219 header.clone(),
2220 ParsedOptions::extract(
2221 &original[..frame_len],
2222 header.options_range.clone(),
2223 )
2224 .unwrap_or_default(),
2225 source.public_key,
2226 source
2227 .hint
2228 .or_else(|| source.public_key.map(|key| key.hint())),
2229 true,
2230 Some(crate::ChannelInfoRef {
2231 id: channel_id,
2232 key: &channel_key,
2233 }),
2234 crate::send::RxMetadata::new(
2235 Some(rx.rssi),
2236 Some(rx.snr),
2237 rx.lqi,
2238 Some(received_at_ms),
2239 ),
2240 )),
2241 );
2242 }
2243 delivered
2244 } else {
2245 false
2246 }
2247 } else {
2248 false
2249 }
2250 } else {
2251 false
2252 }
2253 } else {
2254 false
2255 }
2256 } else {
2257 false
2258 };
2259 let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, false);
2260 delivered || forwarding_confirmed || forwarded
2261 }
2262
2263 async fn process_blind_unicast(
2264 &mut self,
2265 buf: &mut [u8; FRAME],
2266 frame_len: usize,
2267 header: &PacketHeader,
2268 rx: &RxInfo,
2269 received_at_ms: u64,
2270 forwarding_confirmed: bool,
2271 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2272 ) -> (bool, Option<(LocalIdentityId, PeerId)>) {
2273 let mut original = [0u8; FRAME];
2274 original[..frame_len].copy_from_slice(&buf[..frame_len]);
2275 let mut replay_target = None;
2276 let handled = if let Some(channel_id) = header.channel {
2277 let channel_candidates: Vec<(ChannelKey, DerivedChannelKeys), CHANNELS> = self
2278 .channels
2279 .lookup_by_id(&channel_id)
2280 .map(|channel| (channel.channel_key.clone(), channel.derived.clone()))
2281 .collect();
2282 if channel_candidates.is_empty() {
2283 false
2284 } else {
2285 let mut handled = false;
2286 for (resolved_channel_key, channel_keys) in channel_candidates {
2287 buf[..frame_len].copy_from_slice(&original[..frame_len]);
2288 let Ok((dst, source_addr)) = self.crypto.decrypt_blind_addr(
2289 &mut buf[..frame_len],
2290 header,
2291 &channel_keys,
2292 ) else {
2293 continue;
2294 };
2295 let Some(local_id) = self.find_local_identity_for_dst(Some(dst)) else {
2296 continue;
2297 };
2298 for (peer_id, peer_key) in
2299 self.resolve_blind_source_peer_candidates(&buf[..frame_len], source_addr)
2300 {
2301 let Ok(pairwise_keys) = self.ensure_peer_crypto(local_id, peer_id).await
2302 else {
2303 continue;
2304 };
2305 let blind_keys =
2306 self.crypto.derive_blind_keys(&pairwise_keys, &channel_keys);
2307 let body_range = match self.crypto.open_packet(
2308 &mut buf[..frame_len],
2309 header,
2310 &blind_keys,
2311 ) {
2312 Ok(range) => range,
2313 Err(_) => continue,
2314 };
2315 let payload = &buf[body_range.clone()];
2316 if !Self::payload_is_allowed(header.packet_type(), payload) {
2317 continue;
2318 }
2319 match self.unicast_replay_verdict(
2320 local_id,
2321 peer_id,
2322 header,
2323 &buf[..frame_len],
2324 ) {
2325 Some(ReplayVerdict::Accept) => {
2326 let _ = self.accept_unicast_replay(
2327 local_id,
2328 peer_id,
2329 header,
2330 &buf[..frame_len],
2331 );
2332 }
2333 Some(ReplayVerdict::OutOfWindow | ReplayVerdict::Stale) => {
2334 if self.try_accept_counter_resync_response(
2335 local_id,
2336 peer_id,
2337 header,
2338 &buf[..frame_len],
2339 payload,
2340 ) {
2341 replay_target = Some((local_id, peer_id));
2342 } else {
2343 self.store_deferred_counter_resync_frame(
2344 local_id,
2345 peer_id,
2346 &original[..frame_len],
2347 rx,
2348 received_at_ms,
2349 );
2350 self.maybe_request_counter_resync(local_id, peer_id, peer_key)
2351 .await;
2352 continue;
2353 }
2354 }
2355 Some(ReplayVerdict::Replay) | None => continue,
2356 }
2357 self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2358
2359 if header.ack_requested()
2360 && self.should_emit_destination_ack(&buf[..frame_len], header)
2361 {
2362 let ack_tag = self.compute_received_ack_tag(
2363 &buf[..frame_len],
2364 header,
2365 body_range.clone(),
2366 &blind_keys,
2367 );
2368 self.queue_mac_ack_for_peer(peer_id, peer_key.hint(), ack_tag)
2369 .ok();
2370 }
2371
2372 if let Some(data) = Self::echo_request_data(payload) {
2373 let response = Self::build_echo_command_payload(
2374 MAC_COMMAND_ECHO_RESPONSE_ID,
2375 data,
2376 );
2377 let _ = self
2378 .send_unicast(
2379 local_id,
2380 &peer_key,
2381 response.as_slice(),
2382 &SendOptions::default(),
2383 )
2384 .await;
2385 }
2386
2387 on_event(
2388 local_id,
2389 crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2390 &original[..frame_len],
2391 &buf[body_range],
2392 header.clone(),
2393 ParsedOptions::extract(
2394 &original[..frame_len],
2395 header.options_range.clone(),
2396 )
2397 .unwrap_or_default(),
2398 Some(peer_key),
2399 Some(peer_key.hint()),
2400 true,
2401 Some(crate::ChannelInfoRef {
2402 id: channel_id,
2403 key: &resolved_channel_key,
2404 }),
2405 crate::send::RxMetadata::new(
2406 Some(rx.rssi),
2407 Some(rx.snr),
2408 rx.lqi,
2409 Some(received_at_ms),
2410 ),
2411 )),
2412 );
2413 handled = true;
2414 break;
2415 }
2416 if handled {
2417 break;
2418 }
2419 }
2420 handled
2421 }
2422 } else {
2423 false
2424 };
2425 let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, handled);
2426 (
2427 handled || forwarding_confirmed || forwarded,
2428 handled.then_some(()).and(replay_target),
2429 )
2430 }
2431
2432 pub async fn receive_one(
2438 &mut self,
2439 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2440 ) -> Result<bool, MacError<<P::Radio as Radio>::Error>> {
2441 let mut buf = [0u8; FRAME];
2442 let Some(rx) = poll_fn(|cx| match self.radio.poll_receive(cx, &mut buf) {
2443 Poll::Ready(Ok(rx)) => Poll::Ready(Ok(Some(rx))),
2444 Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
2445 Poll::Pending => Poll::Ready(Ok(None)),
2446 })
2447 .await
2448 .map_err(MacError::Radio)?
2449 else {
2450 return Ok(false);
2451 };
2452
2453 let frame_len = rx.len.min(buf.len());
2454 Ok(self
2455 .process_received_frame(&mut buf, frame_len, &rx, &mut on_event)
2456 .await)
2457 }
2458
2459 pub fn complete_ack(
2461 &mut self,
2462 peer: &PublicKey,
2463 ack_tag: &[u8; 8],
2464 ) -> Option<(LocalIdentityId, SendReceipt)> {
2465 for (index, slot) in self.identities.iter_mut().enumerate() {
2466 let Some(slot) = slot.as_mut() else {
2467 continue;
2468 };
2469
2470 let receipt = slot.pending_acks.iter().find_map(|(receipt, pending)| {
2471 (pending.peer == *peer && pending.ack_tag == *ack_tag).then_some(*receipt)
2472 });
2473
2474 if let Some(receipt) = receipt {
2475 slot.pending_acks.remove(&receipt);
2476 if self
2477 .post_tx_listen
2478 .as_ref()
2479 .map(|listen| {
2480 listen.identity_id == LocalIdentityId(index as u8)
2481 && listen.receipt == receipt
2482 })
2483 .unwrap_or(false)
2484 {
2485 self.post_tx_listen = None;
2486 }
2487 return Some((LocalIdentityId(index as u8), receipt));
2488 }
2489 }
2490
2491 None
2492 }
2493
2494 pub fn service_pending_ack_timeouts(
2496 &mut self,
2497 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2498 ) -> Result<(), CapacityError> {
2499 self.expire_post_tx_listen_if_needed();
2500
2501 #[derive(Clone)]
2502 enum Action<const FRAME: usize> {
2503 Retry {
2504 receipt: SendReceipt,
2505 resend: ResendRecord<FRAME>,
2506 not_before_ms: u64,
2507 },
2508 RouteRetry {
2509 receipt: SendReceipt,
2510 peer: PublicKey,
2511 resend: ResendRecord<FRAME>,
2512 not_before_ms: u64,
2513 },
2514 Timeout {
2515 receipt: SendReceipt,
2516 peer: PublicKey,
2517 },
2518 }
2519
2520 let now_ms = self.clock.now_ms();
2521 let t_frame_ms = self.radio.t_frame_ms();
2522
2523 for index in 0..self.identities.len() {
2524 let identity_id = LocalIdentityId(index as u8);
2525 let actions = {
2526 let Some(slot) = self.identities[index].as_mut() else {
2527 continue;
2528 };
2529
2530 let mut actions: Vec<Action<FRAME>, ACKS> = Vec::new();
2531 for (receipt, pending) in slot.pending_acks.iter_mut() {
2532 if !matches!(pending.state, crate::AckState::Queued { .. })
2533 && now_ms >= pending.ack_deadline_ms
2534 {
2535 if Self::can_attempt_route_retry(pending) {
2536 let backoff_cap_ms =
2537 Self::forward_retry_backoff_cap_ms_for_t_frame(t_frame_ms, 1);
2538 let backoff_ms = if backoff_cap_ms == 0 {
2539 0
2540 } else {
2541 u64::from(self.rng.random_range(..backoff_cap_ms.saturating_add(1)))
2542 };
2543 actions
2544 .push(Action::RouteRetry {
2545 receipt: *receipt,
2546 peer: pending.peer,
2547 resend: pending.resend.clone(),
2548 not_before_ms: now_ms.saturating_add(backoff_ms),
2549 })
2550 .map_err(|_| CapacityError)?;
2551 } else {
2552 actions
2553 .push(Action::Timeout {
2554 receipt: *receipt,
2555 peer: pending.peer,
2556 })
2557 .map_err(|_| CapacityError)?;
2558 }
2559 continue;
2560 }
2561
2562 if let crate::AckState::AwaitingForward {
2563 confirm_deadline_ms,
2564 } = pending.state
2565 {
2566 if now_ms >= confirm_deadline_ms && pending.retries < MAX_FORWARD_RETRIES {
2567 pending.retries = pending.retries.saturating_add(1);
2568 let backoff_cap_ms = Self::forward_retry_backoff_cap_ms_for_t_frame(
2569 t_frame_ms,
2570 pending.retries,
2571 );
2572 let backoff_ms = if backoff_cap_ms == 0 {
2573 0
2574 } else {
2575 u64::from(self.rng.random_range(..backoff_cap_ms.saturating_add(1)))
2576 };
2577 let not_before_ms = now_ms.saturating_add(backoff_ms);
2578 pending.state = crate::AckState::RetryQueued;
2579 actions
2580 .push(Action::Retry {
2581 receipt: *receipt,
2582 resend: pending.resend.clone(),
2583 not_before_ms,
2584 })
2585 .map_err(|_| CapacityError)?;
2586 }
2587 }
2588 }
2589 actions
2590 };
2591
2592 for action in actions {
2593 match action {
2594 Action::Retry {
2595 receipt,
2596 resend,
2597 not_before_ms,
2598 } => {
2599 self.tx_queue.enqueue_with_state(
2600 TxPriority::Retry,
2601 resend.frame.as_slice(),
2602 Some(receipt),
2603 Some(identity_id),
2604 not_before_ms,
2605 0,
2606 0,
2607 )?;
2608 }
2609 Action::RouteRetry {
2610 receipt,
2611 peer,
2612 resend,
2613 not_before_ms,
2614 } => {
2615 if let Some(rewritten) = self.synthesize_route_retry_resend(&peer, &resend)
2616 {
2617 if let Some(pending) = self
2618 .identity_mut(identity_id)
2619 .and_then(|slot| slot.pending_ack_mut(&receipt))
2620 {
2621 pending.resend = rewritten.clone();
2622 pending.retries = 0;
2623 pending.sent_ms = 0;
2624 pending.ack_deadline_ms = 0;
2625 pending.state = crate::AckState::RetryQueued;
2626 }
2627 self.tx_queue.enqueue_with_state(
2628 TxPriority::Retry,
2629 rewritten.frame.as_slice(),
2630 Some(receipt),
2631 Some(identity_id),
2632 not_before_ms,
2633 0,
2634 0,
2635 )?;
2636 } else {
2637 if let Some(slot) = self.identity_mut(identity_id) {
2638 slot.pending_acks.remove(&receipt);
2639 }
2640 on_event(
2641 identity_id,
2642 crate::MacEventRef::AckTimeout { peer, receipt },
2643 );
2644 }
2645 }
2646 Action::Timeout { receipt, peer } => {
2647 if let Some(slot) = self.identity_mut(identity_id) {
2648 slot.pending_acks.remove(&receipt);
2649 }
2650 on_event(
2651 identity_id,
2652 crate::MacEventRef::AckTimeout { peer, receipt },
2653 );
2654 }
2655 }
2656 }
2657 }
2658
2659 Ok(())
2660 }
2661
2662 pub fn cancel_pending_ack(
2668 &mut self,
2669 identity_id: LocalIdentityId,
2670 receipt: SendReceipt,
2671 ) -> bool {
2672 let removed = self
2673 .identity_mut(identity_id)
2674 .and_then(|slot| slot.remove_pending_ack(&receipt))
2675 .is_some();
2676
2677 self.tx_queue.remove_first_matching(|entry| {
2679 entry.receipt == Some(receipt) && entry.identity_id == Some(identity_id)
2680 });
2681
2682 if let Some(listen) = &self.post_tx_listen {
2684 if listen.identity_id == identity_id && listen.receipt == receipt {
2685 self.post_tx_listen = None;
2686 }
2687 }
2688
2689 removed
2690 }
2691
2692 fn identity_and_advance(
2693 &mut self,
2694 from: LocalIdentityId,
2695 ) -> Result<(PublicKey, u32), SendError> {
2696 let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
2697 if slot.counter_window_exhausted() {
2698 return Err(SendError::CounterPersistenceLag);
2699 }
2700 let source_key = *slot.identity().public_key();
2701 let frame_counter = slot.advance_frame_counter();
2702 slot.schedule_counter_persist_if_needed();
2703 Ok((source_key, frame_counter))
2704 }
2705
2706 fn take_salt(&mut self, options: &SendOptions) -> Option<u16> {
2707 options.salt.then(|| self.rng.next_u32() as u16)
2708 }
2709
2710 fn enforce_send_policy(
2711 &self,
2712 channel_id: Option<ChannelId>,
2713 options: &SendOptions,
2714 blind_unicast: bool,
2715 ) -> Result<(), SendError> {
2716 let _authority = self.classify_send_authority(options, blind_unicast)?;
2717
2718 let Some(channel_id) = channel_id else {
2719 return Ok(());
2720 };
2721 let Some(policy) = self
2722 .operating_policy
2723 .channel_policies
2724 .iter()
2725 .find(|policy| policy.channel_id == channel_id)
2726 else {
2727 return Ok(());
2728 };
2729
2730 if policy.require_unencrypted && options.encrypted {
2731 return Err(SendError::PolicyViolation);
2732 }
2733 if policy.require_full_source && !options.full_source {
2734 return Err(SendError::PolicyViolation);
2735 }
2736 if let Some(max_flood_hops) = policy.max_flood_hops {
2737 if options
2738 .flood_hops
2739 .map(|hops| hops > max_flood_hops)
2740 .unwrap_or(false)
2741 {
2742 return Err(SendError::PolicyViolation);
2743 }
2744 }
2745
2746 Ok(())
2747 }
2748
2749 fn classify_send_authority(
2750 &self,
2751 options: &SendOptions,
2752 _blind_unicast: bool,
2753 ) -> Result<TransmitAuthority, SendError> {
2754 match self.operating_policy.amateur_radio_mode {
2755 AmateurRadioMode::Unlicensed => Ok(TransmitAuthority::Unlicensed),
2756 AmateurRadioMode::LicensedOnly => {
2757 if options.encrypted || self.operating_policy.operator_callsign.is_none() {
2758 return Err(SendError::PolicyViolation);
2759 }
2760 Ok(TransmitAuthority::Amateur)
2761 }
2762 AmateurRadioMode::Hybrid => {
2763 if options.encrypted {
2764 Ok(TransmitAuthority::Unlicensed)
2767 } else if self.operating_policy.operator_callsign.is_some() {
2768 Ok(TransmitAuthority::Amateur)
2769 } else {
2770 Ok(TransmitAuthority::Unlicensed)
2771 }
2772 }
2773 }
2774 }
2775
2776 fn enqueue_packet(
2777 &mut self,
2778 packet: UnsealedPacket<'_>,
2779 receipt: Option<SendReceipt>,
2780 identity_id: Option<LocalIdentityId>,
2781 ) -> Result<(), SendError> {
2782 if packet.total_len() > self.radio.max_frame_size() {
2783 return Err(SendError::Build(BuildError::BufferTooSmall));
2784 }
2785 self.tx_queue
2786 .enqueue(
2787 TxPriority::Application,
2788 packet.as_bytes(),
2789 receipt,
2790 identity_id,
2791 )
2792 .map_err(|_| SendError::QueueFull)?;
2793 Ok(())
2794 }
2795
2796 fn refresh_pending_resend(
2797 &mut self,
2798 from: LocalIdentityId,
2799 receipt: SendReceipt,
2800 frame: &[u8],
2801 source_route: Option<&[RouterHint]>,
2802 ) -> Result<(), SendError> {
2803 let resend =
2804 ResendRecord::try_new(frame, source_route).map_err(|_| SendError::QueueFull)?;
2805 let pending = self
2806 .identity_mut(from)
2807 .ok_or(SendError::IdentityMissing)?
2808 .pending_ack_mut(&receipt)
2809 .ok_or(SendError::IdentityMissing)?;
2810 pending.resend = resend;
2811 Ok(())
2812 }
2813
2814 fn prepare_pending_ack(
2815 &mut self,
2816 from: LocalIdentityId,
2817 peer: PublicKey,
2818 packet: &UnsealedPacket<'_>,
2819 keys: &PairwiseKeys,
2820 options: &SendOptions,
2821 ) -> Result<SendReceipt, SendError> {
2822 let header = PacketHeader::parse(packet.as_bytes())?;
2823 let mut cmac: CmacState<_> = self.crypto.cmac_state(&keys.k_mic);
2824 feed_aad(&header, packet.as_bytes(), |chunk| cmac.update(chunk));
2825 cmac.update(packet.body());
2826 let full_mac = cmac.finalize();
2827 let ack_tag = self.crypto.compute_ack_tag(&full_mac, &keys.k_enc);
2828 let is_forwarded = options
2829 .source_route
2830 .as_ref()
2831 .map(|route| !route.is_empty())
2832 .unwrap_or(false)
2833 || options.flood_hops.unwrap_or(0) > 0;
2834 let resend = ResendRecord::try_new(
2835 packet.as_bytes(),
2836 options.source_route.as_ref().map(|route| route.as_slice()),
2837 )
2838 .map_err(|_| SendError::QueueFull)?;
2839
2840 let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
2841 let receipt = slot.next_receipt();
2842 let pending = if is_forwarded {
2843 PendingAck::forwarded(ack_tag, peer, resend)
2844 } else {
2845 PendingAck::direct(ack_tag, peer, resend)
2846 };
2847 slot.try_insert_pending_ack(receipt, pending)
2848 .map_err(|_| SendError::PendingAckFull)?;
2849 Ok(receipt)
2850 }
2851
2852 async fn derive_pairwise_keys_for_peer(
2853 &self,
2854 local_id: LocalIdentityId,
2855 peer_key: &PublicKey,
2856 ) -> Result<PairwiseKeys, SendError> {
2857 let shared_secret = {
2858 let slot = self.identity(local_id).ok_or(SendError::IdentityMissing)?;
2859 match slot.identity() {
2860 LocalIdentity::LongTerm(identity) => identity
2861 .agree(peer_key)
2862 .await
2863 .map_err(|_| SendError::IdentityAgreementFailed)?,
2864 #[cfg(feature = "software-crypto")]
2865 LocalIdentity::Ephemeral(identity) => identity
2866 .agree(peer_key)
2867 .await
2868 .map_err(|_| SendError::IdentityAgreementFailed)?,
2869 }
2870 };
2871
2872 Ok(self.crypto.derive_pairwise_keys(&shared_secret))
2873 }
2874
2875 fn effective_source_route(
2876 &self,
2877 peer_id: PeerId,
2878 options: &SendOptions,
2879 ) -> Option<Vec<RouterHint, MAX_SOURCE_ROUTE_HOPS>> {
2880 if let Some(route) = options.source_route.as_ref() {
2881 return Some(route.clone());
2882 }
2883
2884 let Some(peer) = self.peer_registry.get(peer_id) else {
2885 return None;
2886 };
2887 match peer.route.as_ref() {
2888 Some(CachedRoute::Source(route)) => Some(route.clone()),
2889 _ => None,
2890 }
2891 }
2892
2893 fn cache_peer_crypto(
2894 &mut self,
2895 local_id: LocalIdentityId,
2896 peer_id: PeerId,
2897 pairwise_keys: PairwiseKeys,
2898 ) -> Result<(), SendError> {
2899 let slot = self
2900 .identity_mut(local_id)
2901 .ok_or(SendError::IdentityMissing)?;
2902 if slot.peer_crypto().get(&peer_id).is_some() {
2903 return Ok(());
2904 }
2905 slot.peer_crypto_mut()
2906 .insert(
2907 peer_id,
2908 crate::peers::PeerCryptoState {
2909 pairwise_keys,
2910 replay_window: ReplayWindow::new(),
2911 },
2912 )
2913 .map_err(|_| SendError::QueueFull)?;
2914 Ok(())
2915 }
2916
2917 async fn ensure_peer_crypto(
2918 &mut self,
2919 local_id: LocalIdentityId,
2920 peer_id: PeerId,
2921 ) -> Result<PairwiseKeys, SendError> {
2922 if let Some(keys) = self
2923 .identity(local_id)
2924 .and_then(|slot| slot.peer_crypto().get(&peer_id))
2925 .map(|state| state.pairwise_keys.clone())
2926 {
2927 return Ok(keys);
2928 }
2929
2930 let peer_key = self
2931 .peer_registry
2932 .get(peer_id)
2933 .ok_or(SendError::PeerMissing)?
2934 .public_key;
2935 let pairwise_keys = self
2936 .derive_pairwise_keys_for_peer(local_id, &peer_key)
2937 .await?;
2938 self.cache_peer_crypto(local_id, peer_id, pairwise_keys.clone())?;
2939 Ok(pairwise_keys)
2940 }
2941
2942 fn insert_identity(
2943 &mut self,
2944 identity: LocalIdentity<P::Identity>,
2945 pfs_parent: Option<LocalIdentityId>,
2946 ) -> Result<LocalIdentityId, CapacityError> {
2947 let initial_frame_counter = self.rng.next_u32();
2948
2949 if let Some((index, slot)) = self
2950 .identities
2951 .iter_mut()
2952 .enumerate()
2953 .find(|(_, slot)| slot.is_none())
2954 {
2955 *slot = Some(IdentitySlot::new(
2956 identity,
2957 initial_frame_counter,
2958 pfs_parent,
2959 ));
2960 return Ok(LocalIdentityId(index as u8));
2961 }
2962
2963 let next_id = self.identities.len();
2964 self.identities
2965 .push(Some(IdentitySlot::new(
2966 identity,
2967 initial_frame_counter,
2968 pfs_parent,
2969 )))
2970 .map_err(|_| CapacityError)?;
2971 Ok(LocalIdentityId(next_id as u8))
2972 }
2973
2974 fn compute_received_ack_tag(
2975 &self,
2976 buf: &[u8],
2977 header: &PacketHeader,
2978 body_range: core::ops::Range<usize>,
2979 keys: &PairwiseKeys,
2980 ) -> [u8; 8] {
2981 let mut cmac: CmacState<_> = self.crypto.cmac_state(&keys.k_mic);
2982 feed_aad(header, buf, |chunk| cmac.update(chunk));
2983 cmac.update(&buf[body_range]);
2984 let full_mac = cmac.finalize();
2985 self.crypto.compute_ack_tag(&full_mac, &keys.k_enc)
2986 }
2987
2988 fn requeue_tx(&mut self, queued: &crate::QueuedTx<FRAME>) -> Result<u32, CapacityError> {
2989 self.tx_queue.enqueue_with_state(
2990 queued.priority,
2991 queued.frame.as_slice(),
2992 queued.receipt,
2993 queued.identity_id,
2994 queued.not_before_ms,
2995 queued.cad_attempts,
2996 queued.forward_deferrals,
2997 )
2998 }
2999
3000 fn accept_unicast_replay(
3001 &mut self,
3002 local_id: LocalIdentityId,
3003 peer_id: PeerId,
3004 header: &PacketHeader,
3005 frame: &[u8],
3006 ) -> bool {
3007 let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3008 return false;
3009 };
3010 let now_ms = self.clock.now_ms();
3011 let Some(window) = self
3012 .identity_mut(local_id)
3013 .and_then(|slot| slot.peer_crypto_mut().get_mut(&peer_id))
3014 .map(|state| &mut state.replay_window)
3015 else {
3016 return false;
3017 };
3018
3019 if window.check(counter, mic, now_ms) != ReplayVerdict::Accept {
3020 return false;
3021 }
3022
3023 window.accept(counter, mic, now_ms);
3024 true
3025 }
3026
3027 fn unicast_replay_verdict(
3028 &mut self,
3029 local_id: LocalIdentityId,
3030 peer_id: PeerId,
3031 header: &PacketHeader,
3032 frame: &[u8],
3033 ) -> Option<ReplayVerdict> {
3034 let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3035 return None;
3036 };
3037 let now_ms = self.clock.now_ms();
3038 self.identity_mut(local_id)
3039 .and_then(|slot| slot.peer_crypto_mut().get_mut(&peer_id))
3040 .map(|state| state.replay_window.check(counter, mic, now_ms))
3041 }
3042
3043 fn try_accept_counter_resync_response(
3044 &mut self,
3045 local_id: LocalIdentityId,
3046 peer_id: PeerId,
3047 header: &PacketHeader,
3048 frame: &[u8],
3049 payload: &[u8],
3050 ) -> bool {
3051 let Some(nonce) = Self::echo_response_nonce(payload) else {
3052 return false;
3053 };
3054 let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3055 return false;
3056 };
3057 let now_ms = self.clock.now_ms();
3058 let Some(slot) = self.identity_mut(local_id) else {
3059 return false;
3060 };
3061 let Some(pending) = slot.pending_counter_resync().get(&peer_id).copied() else {
3062 return false;
3063 };
3064 if pending.nonce != nonce {
3065 return false;
3066 }
3067 let Some(state) = slot.peer_crypto_mut().get_mut(&peer_id) else {
3068 return false;
3069 };
3070 state.replay_window.reset(counter, now_ms);
3071 state.replay_window.accept(counter, mic, now_ms);
3072 let _ = slot.pending_counter_resync_mut().remove(&peer_id);
3073 true
3074 }
3075
3076 async fn maybe_request_counter_resync(
3077 &mut self,
3078 local_id: LocalIdentityId,
3079 peer_id: PeerId,
3080 peer_key: PublicKey,
3081 ) {
3082 let now_ms = self.clock.now_ms();
3083 let should_send = {
3084 let Some(slot) = self.identity(local_id) else {
3085 return;
3086 };
3087 match slot.pending_counter_resync().get(&peer_id).copied() {
3088 Some(pending) => {
3089 now_ms.saturating_sub(pending.requested_ms) >= COUNTER_RESYNC_REQUEST_RETRY_MS
3090 }
3091 None => true,
3092 }
3093 };
3094 if !should_send {
3095 return;
3096 }
3097
3098 let nonce = self.rng.next_u32();
3099 let payload =
3100 Self::build_echo_command_payload(MAC_COMMAND_ECHO_REQUEST_ID, &nonce.to_be_bytes());
3101 let options = SendOptions::default();
3102 if self
3103 .send_unicast(local_id, &peer_key, payload.as_slice(), &options)
3104 .await
3105 .is_ok()
3106 {
3107 if let Some(slot) = self.identity_mut(local_id) {
3108 let _ = slot.pending_counter_resync_mut().insert(
3109 peer_id,
3110 PendingCounterResync {
3111 nonce,
3112 requested_ms: now_ms,
3113 },
3114 );
3115 }
3116 }
3117 }
3118
3119 fn store_deferred_counter_resync_frame(
3120 &mut self,
3121 local_id: LocalIdentityId,
3122 peer_id: PeerId,
3123 frame: &[u8],
3124 rx: &RxInfo,
3125 received_at_ms: u64,
3126 ) {
3127 let mut stored = Vec::new();
3128 stored
3129 .extend_from_slice(frame)
3130 .expect("received frame length must fit configured frame capacity");
3131 self.deferred_counter_resync_frame = Some(DeferredCounterResyncFrame {
3132 local_id,
3133 peer_id,
3134 frame: stored,
3135 rssi: rx.rssi,
3136 snr: rx.snr,
3137 lqi: rx.lqi,
3138 received_at_ms,
3139 });
3140 }
3141
3142 fn take_deferred_counter_resync_frame(
3143 &mut self,
3144 local_id: LocalIdentityId,
3145 peer_id: PeerId,
3146 ) -> Option<DeferredCounterResyncFrame<FRAME>> {
3147 match self.deferred_counter_resync_frame.as_ref() {
3148 Some(deferred) if deferred.local_id == local_id && deferred.peer_id == peer_id => {
3149 self.deferred_counter_resync_frame.take()
3150 }
3151 _ => None,
3152 }
3153 }
3154
3155 fn accept_multicast_replay(
3156 &mut self,
3157 channel_id: ChannelId,
3158 peer_id: PeerId,
3159 header: &PacketHeader,
3160 frame: &[u8],
3161 ) -> bool {
3162 let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3163 return false;
3164 };
3165 let now_ms = self.clock.now_ms();
3166 let Some(channel) = self.channels.get_mut_by_id(&channel_id) else {
3167 return false;
3168 };
3169
3170 if let Some(window) = channel.replay.get_mut(&peer_id) {
3171 if window.check(counter, mic, now_ms) != ReplayVerdict::Accept {
3172 return false;
3173 }
3174 window.accept(counter, mic, now_ms);
3175 return true;
3176 }
3177
3178 let mut window = ReplayWindow::new();
3179 window.accept(counter, mic, now_ms);
3180 channel.replay.insert(peer_id, window).is_ok()
3181 }
3182
3183 fn clear_peer_slot_state(&mut self, peer_id: PeerId) {
3184 for slot in self.identities.iter_mut().filter_map(|slot| slot.as_mut()) {
3185 let _ = slot.peer_crypto_mut().remove(&peer_id);
3186 let _ = slot.pending_counter_resync_mut().remove(&peer_id);
3187 }
3188 for channel in self.channels.iter_mut() {
3189 let _ = channel.replay.remove(&peer_id);
3190 }
3191 }
3192
3193 fn try_auto_register_peer(&mut self, key: PublicKey) -> Result<PeerId, CapacityError> {
3194 let now_ms = self.clock.now_ms();
3195 let outcome = self.peer_registry.try_insert_or_update_auto(key, now_ms)?;
3196 if outcome.evicted_key.is_some() {
3197 self.clear_peer_slot_state(outcome.peer_id);
3198 }
3199 Ok(outcome.peer_id)
3200 }
3201
3202 fn replay_metadata<'a>(header: &PacketHeader, frame: &'a [u8]) -> Option<(u32, &'a [u8])> {
3203 let counter = header.sec_info?.frame_counter;
3204 let mic = frame.get(header.mic_range.clone())?;
3205 Some((counter, mic))
3206 }
3207
3208 fn build_echo_command_payload(command_id: u8, data: &[u8]) -> Vec<u8, FRAME> {
3209 let mut payload = Vec::new();
3210 let _ = payload.push(PayloadType::MacCommand as u8);
3211 let _ = payload.push(command_id);
3212 let _ = payload.extend_from_slice(data);
3213 payload
3214 }
3215
3216 fn echo_request_data(payload: &[u8]) -> Option<&[u8]> {
3217 let (&payload_type, rest) = payload.split_first()?;
3218 let (&command_id, data) = rest.split_first()?;
3219 if PayloadType::from_byte(payload_type)? != PayloadType::MacCommand
3220 || command_id != MAC_COMMAND_ECHO_REQUEST_ID
3221 {
3222 return None;
3223 }
3224 Some(data)
3225 }
3226
3227 fn echo_response_nonce(payload: &[u8]) -> Option<u32> {
3228 let (&payload_type, rest) = payload.split_first()?;
3229 let (&command_id, data) = rest.split_first()?;
3230 if PayloadType::from_byte(payload_type)? != PayloadType::MacCommand
3231 || command_id != MAC_COMMAND_ECHO_RESPONSE_ID
3232 || data.len() != COUNTER_RESYNC_NONCE_LEN
3233 {
3234 return None;
3235 }
3236 Some(u32::from_be_bytes(data.try_into().ok()?))
3237 }
3238
3239 fn full_key_at(frame: &[u8], offset: usize) -> Option<PublicKey> {
3240 let mut key = [0u8; 32];
3241 key.copy_from_slice(frame.get(offset..offset + 32)?);
3242 Some(PublicKey(key))
3243 }
3244
3245 fn find_local_identity_for_dst(
3246 &self,
3247 dst: Option<umsh_core::NodeHint>,
3248 ) -> Option<LocalIdentityId> {
3249 let dst = dst?;
3250 self.identities
3251 .iter()
3252 .enumerate()
3253 .find(|(_, slot)| {
3254 slot.as_ref()
3255 .map(|slot| slot.identity().public_key().hint() == dst)
3256 .unwrap_or(false)
3257 })
3258 .map(|(index, _)| LocalIdentityId(index as u8))
3259 }
3260
3261 fn resolve_source_peer_candidates(
3262 &mut self,
3263 frame: &[u8],
3264 header: &PacketHeader,
3265 ) -> Vec<(PeerId, PublicKey), PEERS> {
3266 match header.source {
3267 SourceAddrRef::FullKeyAt { offset } => {
3268 let Some(peer_key) = Self::full_key_at(frame, offset) else {
3269 return Vec::new();
3270 };
3271
3272 if let Some((peer_id, _)) = self.peer_registry.lookup_by_key(&peer_key) {
3273 let mut out = Vec::new();
3274 let _ = out.push((peer_id, peer_key));
3275 return out;
3276 }
3277
3278 if self.auto_register_full_key_peers {
3279 if let Ok(peer_id) = self.try_auto_register_peer(peer_key) {
3280 let mut out = Vec::new();
3281 let _ = out.push((peer_id, peer_key));
3282 return out;
3283 }
3284 }
3285
3286 Vec::new()
3287 }
3288 SourceAddrRef::Hint(hint) => self
3289 .peer_registry
3290 .lookup_by_hint(&hint)
3291 .map(|(peer_id, info)| (peer_id, info.public_key))
3292 .collect(),
3293 SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => Vec::new(),
3294 }
3295 }
3296
3297 fn resolve_multicast_source(
3298 &mut self,
3299 frame: &[u8],
3300 header: &PacketHeader,
3301 ) -> Option<ResolvedMulticastSource> {
3302 match header.source {
3303 SourceAddrRef::FullKeyAt { offset } => {
3304 let mut key = [0u8; 32];
3305 key.copy_from_slice(frame.get(offset..offset + 32)?);
3306 let public_key = PublicKey(key);
3307 let peer_id = self
3308 .peer_registry
3309 .lookup_by_key(&public_key)
3310 .map(|(peer_id, _)| peer_id);
3311 Some(ResolvedMulticastSource {
3312 peer_id,
3313 public_key: Some(public_key),
3314 hint: Some(public_key.hint()),
3315 })
3316 }
3317 SourceAddrRef::Hint(hint) => {
3318 let resolved = self.resolve_unique_hint(hint);
3319 Some(ResolvedMulticastSource {
3320 peer_id: resolved.map(|(peer_id, _)| peer_id),
3321 public_key: resolved.map(|(_, key)| key),
3322 hint: Some(hint),
3323 })
3324 }
3325 SourceAddrRef::Encrypted { offset, len } => match len {
3326 32 => {
3327 let mut key = [0u8; 32];
3328 key.copy_from_slice(frame.get(offset..offset + 32)?);
3329 let public_key = PublicKey(key);
3330 let peer_id = self
3331 .peer_registry
3332 .lookup_by_key(&public_key)
3333 .map(|(peer_id, _)| peer_id);
3334 Some(ResolvedMulticastSource {
3335 peer_id,
3336 public_key: Some(public_key),
3337 hint: Some(public_key.hint()),
3338 })
3339 }
3340 3 => {
3341 let hint = umsh_core::NodeHint([
3342 *frame.get(offset)?,
3343 *frame.get(offset + 1)?,
3344 *frame.get(offset + 2)?,
3345 ]);
3346 let resolved = self.resolve_unique_hint(hint);
3347 Some(ResolvedMulticastSource {
3348 peer_id: resolved.map(|(peer_id, _)| peer_id),
3349 public_key: resolved.map(|(_, key)| key),
3350 hint: Some(hint),
3351 })
3352 }
3353 _ => None,
3354 },
3355 SourceAddrRef::None => None,
3356 }
3357 }
3358
3359 fn accept_unknown_multicast_replay(&mut self, header: &PacketHeader, frame: &[u8]) -> bool {
3360 let Some(cache_key) = Self::forward_dup_key(header, frame) else {
3361 return false;
3362 };
3363 if self.multicast_unknown_dup_cache.contains(&cache_key) {
3364 return false;
3365 }
3366 self.multicast_unknown_dup_cache
3367 .insert(cache_key, self.clock.now_ms());
3368 true
3369 }
3370
3371 fn resolve_blind_source_peer_candidates(
3372 &mut self,
3373 frame: &[u8],
3374 source: SourceAddrRef,
3375 ) -> Vec<(PeerId, PublicKey), PEERS> {
3376 match source {
3377 SourceAddrRef::FullKeyAt { offset } => {
3378 let Some(peer_key) = Self::full_key_at(frame, offset) else {
3379 return Vec::new();
3380 };
3381
3382 if let Some((peer_id, _)) = self.peer_registry.lookup_by_key(&peer_key) {
3383 let mut out = Vec::new();
3384 let _ = out.push((peer_id, peer_key));
3385 return out;
3386 }
3387
3388 if self.auto_register_full_key_peers {
3389 if let Ok(peer_id) = self.try_auto_register_peer(peer_key) {
3390 let mut out = Vec::new();
3391 let _ = out.push((peer_id, peer_key));
3392 return out;
3393 }
3394 }
3395
3396 Vec::new()
3397 }
3398 SourceAddrRef::Hint(hint) => self
3399 .peer_registry
3400 .lookup_by_hint(&hint)
3401 .map(|(peer_id, info)| (peer_id, info.public_key))
3402 .collect(),
3403 SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => Vec::new(),
3404 }
3405 }
3406
3407 fn resolve_unique_hint(&self, hint: umsh_core::NodeHint) -> Option<(PeerId, PublicKey)> {
3408 let mut matches = self.peer_registry.lookup_by_hint(&hint);
3409 let (peer_id, info) = matches.next()?;
3410 if matches.next().is_some() {
3411 return None;
3412 }
3413 Some((peer_id, info.public_key))
3414 }
3415
3416 fn learn_route_for_peer(&mut self, peer_id: PeerId, frame: &[u8], header: &PacketHeader) {
3417 let now_ms = self.clock.now_ms();
3418 self.peer_registry.touch(peer_id, now_ms);
3419
3420 let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3421 return;
3422 };
3423
3424 if let Some(trace_range) = options.trace_route {
3425 if let Some(route) = self.source_route_from_trace(frame.get(trace_range).unwrap_or(&[]))
3426 {
3427 self.peer_registry
3428 .update_route(peer_id, crate::CachedRoute::Source(route));
3429 return;
3430 }
3431 }
3432
3433 if let Some(flood_hops) = header.flood_hops {
3434 let regions = Self::region_codes_from_options(frame, header.options_range.clone());
3435 self.peer_registry.update_route(
3436 peer_id,
3437 crate::CachedRoute::Flood {
3438 hops: flood_hops.accumulated(),
3439 regions,
3440 },
3441 );
3442 }
3443 }
3444
3445 fn region_codes_from_options(
3446 frame: &[u8],
3447 options_range: core::ops::Range<usize>,
3448 ) -> Vec<[u8; 2], 8> {
3449 let mut regions = Vec::new();
3450 if options_range.is_empty() {
3451 return regions;
3452 }
3453 for entry in umsh_core::iter_options(frame, options_range) {
3454 let Ok((number, value)) = entry else {
3455 continue;
3456 };
3457 if OptionNumber::from(number) != OptionNumber::RegionCode || value.len() != 2 {
3458 continue;
3459 }
3460 if regions.push([value[0], value[1]]).is_err() {
3461 break;
3462 }
3463 }
3464 regions
3465 }
3466
3467 fn resolve_broadcast_source(
3468 frame: &[u8],
3469 header: &PacketHeader,
3470 ) -> Option<(umsh_core::NodeHint, Option<PublicKey>)> {
3471 match header.source {
3472 SourceAddrRef::Hint(hint) => Some((hint, None)),
3473 SourceAddrRef::FullKeyAt { offset } => {
3474 let mut key = [0u8; 32];
3475 key.copy_from_slice(frame.get(offset..offset + 32)?);
3476 let public_key = PublicKey(key);
3477 Some((public_key.hint(), Some(public_key)))
3478 }
3479 SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => None,
3480 }
3481 }
3482
3483 fn payload_is_allowed(packet_type: PacketType, payload: &[u8]) -> bool {
3484 if payload.is_empty() {
3485 return true;
3486 }
3487
3488 PayloadType::from_byte(payload[0])
3489 .unwrap_or(PayloadType::Empty)
3490 .allowed_for(packet_type)
3491 }
3492
3493 fn source_route_from_trace(
3494 &self,
3495 trace_bytes: &[u8],
3496 ) -> Option<heapless::Vec<RouterHint, { crate::MAX_SOURCE_ROUTE_HOPS }>> {
3497 if trace_bytes.len() % 2 != 0 {
3498 return None;
3499 }
3500
3501 let mut route = heapless::Vec::new();
3502 for chunk in trace_bytes.chunks_exact(2) {
3503 route.push(RouterHint([chunk[0], chunk[1]])).ok()?;
3504 }
3505 Some(route)
3506 }
3507
3508 fn should_emit_destination_ack(&self, frame: &[u8], header: &PacketHeader) -> bool {
3509 let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3510 return false;
3511 };
3512
3513 options
3518 .source_route
3519 .map(|range| range.is_empty())
3520 .unwrap_or(true)
3521 }
3522
3523 fn maybe_forward_received(
3524 &mut self,
3525 frame: &[u8],
3526 header: &PacketHeader,
3527 rx: &RxInfo,
3528 locally_handled_unicast: bool,
3529 ) -> bool {
3530 if !self.repeater.enabled {
3531 return false;
3532 }
3533 if !header.packet_type().is_routable() {
3534 return false;
3535 }
3536 if locally_handled_unicast
3540 && matches!(
3541 header.packet_type(),
3542 PacketType::Unicast
3543 | PacketType::UnicastAckReq
3544 | PacketType::BlindUnicast
3545 | PacketType::BlindUnicastAckReq
3546 )
3547 {
3548 return false;
3549 }
3550
3551 let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3552 return false;
3553 };
3554 let Some(cache_key) = Self::forward_dup_key(header, frame) else {
3555 return false;
3556 };
3557 if self.dup_cache.contains(&cache_key) {
3558 self.defer_pending_forward(&cache_key, rx, &options);
3559 return false;
3560 }
3561
3562 let Some(plan) = self.plan_forwarding(frame, header, &options, rx) else {
3563 return false;
3564 };
3565
3566 let mut rewritten = [0u8; FRAME];
3567 let Ok(total_len) =
3568 self.rewrite_forwarded_frame(frame, header, &options, plan, &mut rewritten)
3569 else {
3570 return false;
3571 };
3572 if total_len > self.radio.max_frame_size() {
3573 return false;
3574 }
3575
3576 let now_ms = self.clock.now_ms();
3577 if self
3578 .tx_queue
3579 .enqueue_with_state(
3580 TxPriority::Forward,
3581 &rewritten[..total_len],
3582 None,
3583 None,
3584 now_ms.saturating_add(plan.delay_ms),
3585 0,
3586 0,
3587 )
3588 .is_err()
3589 {
3590 return false;
3591 }
3592 self.dup_cache.insert(cache_key, now_ms);
3593 true
3594 }
3595
3596 fn plan_forwarding(
3597 &mut self,
3598 frame: &[u8],
3599 header: &PacketHeader,
3600 options: &ParsedOptions,
3601 rx: &RxInfo,
3602 ) -> Option<ForwardPlan> {
3603 if options.has_unknown_critical {
3604 return None;
3605 }
3606
3607 let router_hint = self.repeater_router_hint()?;
3608 let station_action = self.classify_forward_station_action(frame, header)?;
3609
3610 let source_route_bytes = options
3611 .source_route
3612 .as_ref()
3613 .and_then(|range| frame.get(range.clone()))
3614 .unwrap_or(&[]);
3615 if source_route_bytes.len() % 2 != 0 {
3616 return None;
3617 }
3618
3619 let mut consume_source_route = false;
3620 let mut decrement_flood_hops = false;
3621 let mut insert_region_code = None;
3622 let mut delay_ms = 0u64;
3623
3624 if !source_route_bytes.is_empty() {
3625 if source_route_bytes[..2] != router_hint.0 {
3626 return None;
3627 }
3628 consume_source_route = true;
3629 if source_route_bytes.len() == 2 {
3630 decrement_flood_hops = header.flood_hops.is_some();
3631 }
3632 } else {
3633 decrement_flood_hops = true;
3634 }
3635
3636 if decrement_flood_hops {
3637 let flood_hops = header.flood_hops?;
3638 if flood_hops.remaining() == 0 {
3639 return None;
3640 }
3641 if let Some(min_rssi) = Self::effective_min_rssi(options, &self.repeater) {
3644 if rx.rssi < min_rssi {
3645 return None;
3646 }
3647 }
3648 if let Some(min_snr) = Self::effective_min_snr(options, &self.repeater) {
3649 if rx.snr < Snr::from_decibels(min_snr) {
3650 return None;
3651 }
3652 }
3653 let mut saw_region_code = false;
3654 let mut matched_region_code = false;
3655 if !header.options_range.is_empty() {
3656 for entry in umsh_core::iter_options(frame, header.options_range.clone()) {
3657 let (number, value) = entry.ok()?;
3658 if OptionNumber::from(number) != OptionNumber::RegionCode || value.len() != 2 {
3659 continue;
3660 }
3661 saw_region_code = true;
3662 let region_code = [value[0], value[1]];
3663 if self
3664 .repeater
3665 .regions
3666 .iter()
3667 .any(|configured| *configured == region_code)
3668 {
3669 matched_region_code = true;
3670 }
3671 }
3672 }
3673 if saw_region_code {
3674 if !matched_region_code {
3675 return None;
3676 }
3677 } else {
3678 insert_region_code = self.repeater.regions.first().copied();
3679 }
3680 delay_ms = self.sample_flood_contention_delay_ms(rx, options);
3681 }
3682
3683 Some(ForwardPlan {
3684 router_hint,
3685 consume_source_route,
3686 decrement_flood_hops,
3687 insert_region_code,
3688 delay_ms,
3689 station_action,
3690 })
3691 }
3692
3693 fn classify_forward_station_action(
3694 &self,
3695 frame: &[u8],
3696 header: &PacketHeader,
3697 ) -> Option<ForwardStationAction> {
3698 let has_operator_callsign = if header.options_range.is_empty() {
3699 false
3700 } else {
3701 umsh_core::iter_options(frame, header.options_range.clone())
3702 .filter_map(Result::ok)
3703 .any(|(number, _)| OptionNumber::from(number) == OptionNumber::OperatorCallsign)
3704 };
3705 let encrypted = header
3706 .sec_info
3707 .map(|sec| sec.scf.encrypted())
3708 .unwrap_or(false);
3709
3710 match self.repeater.amateur_radio_mode {
3711 AmateurRadioMode::Unlicensed => Some(ForwardStationAction::Remove),
3712 AmateurRadioMode::LicensedOnly => {
3713 if encrypted || !has_operator_callsign || self.repeater.station_callsign.is_none() {
3714 None
3715 } else {
3716 Some(ForwardStationAction::Replace)
3717 }
3718 }
3719 AmateurRadioMode::Hybrid => {
3720 if !encrypted && has_operator_callsign {
3721 self.repeater
3722 .station_callsign
3723 .as_ref()
3724 .map(|_| ForwardStationAction::Replace)
3725 } else {
3726 Some(ForwardStationAction::Remove)
3727 }
3728 }
3729 }
3730 }
3731
3732 fn rewrite_forwarded_frame(
3733 &self,
3734 src: &[u8],
3735 header: &PacketHeader,
3736 options: &ParsedOptions,
3737 plan: ForwardPlan,
3738 dst: &mut [u8],
3739 ) -> Result<usize, CapacityError> {
3740 if dst.is_empty() {
3741 return Err(CapacityError);
3742 }
3743
3744 dst[0] = umsh_core::Fcf::new(
3746 header.packet_type(),
3747 header.fcf.full_source(),
3748 header.flood_hops.is_some(),
3749 )
3750 .0;
3751 let mut cursor = 1;
3752
3753 if let Some(flood_hops) = header.flood_hops {
3755 let next = if plan.decrement_flood_hops {
3756 flood_hops.decremented().0
3757 } else {
3758 flood_hops.0
3759 };
3760 *dst.get_mut(cursor).ok_or(CapacityError)? = next;
3761 cursor += 1;
3762 }
3763
3764 let fhops_len = usize::from(header.flood_hops.is_some());
3766 let fixed_core = src
3767 .get(1 + fhops_len..header.options_range.start)
3768 .ok_or(CapacityError)?;
3769 let core_end = cursor + fixed_core.len();
3770 dst.get_mut(cursor..core_end)
3771 .ok_or(CapacityError)?
3772 .copy_from_slice(fixed_core);
3773 cursor = core_end;
3774
3775 let options_len =
3777 self.encode_forwarded_options(src, header, options, plan, &mut dst[cursor..])?;
3778 cursor += options_len;
3779
3780 let needs_marker = !matches!(header.packet_type(), PacketType::MacAck)
3782 && header.options_range.end < header.total_len;
3783 if needs_marker {
3784 *dst.get_mut(cursor).ok_or(CapacityError)? = 0xFF;
3785 cursor += 1;
3786 }
3787
3788 let tail = src
3790 .get(header.options_range.end..header.total_len)
3791 .ok_or(CapacityError)?;
3792 let end = cursor + tail.len();
3793 dst.get_mut(cursor..end)
3794 .ok_or(CapacityError)?
3795 .copy_from_slice(tail);
3796 Ok(end)
3797 }
3798
3799 fn encode_forwarded_options(
3800 &self,
3801 src: &[u8],
3802 header: &PacketHeader,
3803 _options: &ParsedOptions,
3804 plan: ForwardPlan,
3805 dst: &mut [u8],
3806 ) -> Result<usize, CapacityError> {
3807 let mut encoder = OptionEncoder::new(dst);
3808 let mut inserted_region = false;
3809 let mut inserted_station = false;
3810 let mut saw_station = false;
3811
3812 if !header.options_range.is_empty() {
3813 for entry in umsh_core::iter_options(src, header.options_range.clone()) {
3814 let (number, value) = entry.map_err(|_| CapacityError)?;
3815 if !inserted_region {
3816 if let Some(region_code) = plan.insert_region_code {
3817 if number > OptionNumber::RegionCode.as_u16() {
3818 encoder
3819 .put(OptionNumber::RegionCode.as_u16(), ®ion_code)
3820 .map_err(|_| CapacityError)?;
3821 inserted_region = true;
3822 }
3823 }
3824 }
3825 if !inserted_station
3826 && matches!(plan.station_action, ForwardStationAction::Replace)
3827 && number > OptionNumber::StationCallsign.as_u16()
3828 {
3829 encoder
3830 .put(
3831 OptionNumber::StationCallsign.as_u16(),
3832 self.repeater
3833 .station_callsign
3834 .as_ref()
3835 .ok_or(CapacityError)?
3836 .as_trimmed_slice(),
3837 )
3838 .map_err(|_| CapacityError)?;
3839 inserted_station = true;
3840 }
3841
3842 match OptionNumber::from(number) {
3843 OptionNumber::RegionCode => {
3844 inserted_region = true;
3845 encoder.put(number, value).map_err(|_| CapacityError)?;
3846 }
3847 OptionNumber::TraceRoute => {
3848 let mut trace = [0u8; crate::MAX_SOURCE_ROUTE_HOPS * 2 + 2];
3849 trace[..2].copy_from_slice(&plan.router_hint.0);
3850 trace[2..2 + value.len()].copy_from_slice(value);
3851 encoder
3852 .put(number, &trace[..2 + value.len()])
3853 .map_err(|_| CapacityError)?;
3854 }
3855 OptionNumber::SourceRoute if plan.consume_source_route => {
3856 if value.len() < 2 || value.len() % 2 != 0 {
3857 return Err(CapacityError);
3858 }
3859 let remaining = if value.len() > 2 { &value[2..] } else { &[] };
3860 encoder.put(number, remaining).map_err(|_| CapacityError)?;
3861 }
3862 OptionNumber::StationCallsign => {
3863 saw_station = true;
3864 match plan.station_action {
3865 ForwardStationAction::Remove => {}
3866 ForwardStationAction::Replace => {
3867 encoder
3868 .put(
3869 number,
3870 self.repeater
3871 .station_callsign
3872 .as_ref()
3873 .ok_or(CapacityError)?
3874 .as_trimmed_slice(),
3875 )
3876 .map_err(|_| CapacityError)?;
3877 inserted_station = true;
3878 }
3879 }
3880 }
3881 _ => {
3882 encoder.put(number, value).map_err(|_| CapacityError)?;
3883 }
3884 }
3885 }
3886 }
3887
3888 if matches!(plan.station_action, ForwardStationAction::Replace)
3889 && !inserted_station
3890 && !saw_station
3891 {
3892 encoder
3893 .put(
3894 OptionNumber::StationCallsign.as_u16(),
3895 self.repeater
3896 .station_callsign
3897 .as_ref()
3898 .ok_or(CapacityError)?
3899 .as_trimmed_slice(),
3900 )
3901 .map_err(|_| CapacityError)?;
3902 }
3903 if let Some(region_code) = plan.insert_region_code {
3904 if !inserted_region {
3905 encoder
3906 .put(OptionNumber::RegionCode.as_u16(), ®ion_code)
3907 .map_err(|_| CapacityError)?;
3908 }
3909 }
3910 Ok(encoder.finish())
3911 }
3912
3913 fn synthesize_route_retry_resend(
3914 &self,
3915 peer: &PublicKey,
3916 resend: &ResendRecord<FRAME>,
3917 ) -> Option<ResendRecord<FRAME>> {
3918 let header = PacketHeader::parse(resend.frame.as_slice()).ok()?;
3919 let options =
3920 ParsedOptions::extract(resend.frame.as_slice(), header.options_range.clone()).ok()?;
3921 if options.route_retry {
3922 return None;
3923 }
3924 let source_route = resend.source_route.as_ref()?;
3925 if source_route.is_empty() {
3926 return None;
3927 }
3928
3929 let flood_hops = self.route_retry_flood_hops(peer, &header, source_route)?;
3930 let has_flood_hops = flood_hops > 0;
3931 let mut rewritten = [0u8; FRAME];
3932
3933 rewritten[0] = umsh_core::Fcf::new(
3935 header.packet_type(),
3936 header.fcf.full_source(),
3937 has_flood_hops,
3938 )
3939 .0;
3940 let mut cursor = 1;
3941
3942 if has_flood_hops {
3944 *rewritten.get_mut(cursor)? = FloodHops::new(flood_hops, 0)?.0;
3945 cursor += 1;
3946 }
3947
3948 let fhops_len = usize::from(header.flood_hops.is_some());
3950 let fixed_core = resend
3951 .frame
3952 .get(1 + fhops_len..header.options_range.start)?;
3953 let core_end = cursor + fixed_core.len();
3954 rewritten.get_mut(cursor..core_end)?.copy_from_slice(fixed_core);
3955 cursor = core_end;
3956
3957 let options_len = self
3959 .encode_route_retry_options(
3960 resend.frame.as_slice(),
3961 header.options_range.clone(),
3962 &options,
3963 &mut rewritten[cursor..],
3964 )
3965 .ok()?;
3966 cursor += options_len;
3967
3968 let needs_marker = !matches!(header.packet_type(), PacketType::MacAck)
3970 && header.options_range.end < header.total_len;
3971 if needs_marker {
3972 *rewritten.get_mut(cursor)? = 0xFF;
3973 cursor += 1;
3974 }
3975
3976 let tail = resend
3978 .frame
3979 .get(header.options_range.end..header.total_len)?;
3980 let end = cursor + tail.len();
3981 rewritten.get_mut(cursor..end)?.copy_from_slice(tail);
3982
3983 ResendRecord::try_new(&rewritten[..end], None).ok()
3984 }
3985
3986 fn encode_route_retry_options(
3987 &self,
3988 src: &[u8],
3989 options_range: core::ops::Range<usize>,
3990 _options: &ParsedOptions,
3991 dst: &mut [u8],
3992 ) -> Result<usize, CapacityError> {
3993 let mut encoder = OptionEncoder::new(dst);
3994 let mut inserted_trace_route = false;
3995 let mut inserted_route_retry = false;
3996
3997 if !options_range.is_empty() {
3998 for entry in umsh_core::iter_options(src, options_range) {
3999 let (number, value) = entry.map_err(|_| CapacityError)?;
4000 if !inserted_trace_route && number > OptionNumber::TraceRoute.as_u16() {
4001 encoder
4002 .put(OptionNumber::TraceRoute.as_u16(), &[])
4003 .map_err(|_| CapacityError)?;
4004 inserted_trace_route = true;
4005 }
4006 if !inserted_route_retry && number > OptionNumber::RouteRetry.as_u16() {
4007 encoder
4008 .put(OptionNumber::RouteRetry.as_u16(), &[])
4009 .map_err(|_| CapacityError)?;
4010 inserted_route_retry = true;
4011 }
4012 match OptionNumber::from(number) {
4013 OptionNumber::SourceRoute => {}
4014 OptionNumber::TraceRoute => {
4015 encoder.put(number, value).map_err(|_| CapacityError)?;
4016 inserted_trace_route = true;
4017 }
4018 OptionNumber::RouteRetry => {}
4019 _ => {
4020 encoder.put(number, value).map_err(|_| CapacityError)?;
4021 }
4022 }
4023 }
4024 }
4025
4026 if !inserted_trace_route {
4027 encoder
4028 .put(OptionNumber::TraceRoute.as_u16(), &[])
4029 .map_err(|_| CapacityError)?;
4030 }
4031 if !inserted_route_retry {
4032 encoder
4033 .put(OptionNumber::RouteRetry.as_u16(), &[])
4034 .map_err(|_| CapacityError)?;
4035 }
4036
4037 Ok(encoder.finish())
4038 }
4039
4040 fn route_retry_flood_hops(
4041 &self,
4042 peer: &PublicKey,
4043 header: &PacketHeader,
4044 source_route: &heapless::Vec<RouterHint, MAX_SOURCE_ROUTE_HOPS>,
4045 ) -> Option<u8> {
4046 let existing = header
4047 .flood_hops
4048 .map(|hops| hops.remaining())
4049 .filter(|hops| *hops > 0);
4050 let cached = self
4051 .peer_registry
4052 .lookup_by_key(peer)
4053 .and_then(|(_, info)| match info.route.as_ref() {
4054 Some(crate::CachedRoute::Flood { hops, .. }) => Some((*hops).clamp(1, 15)),
4055 _ => None,
4056 });
4057 let route_len = u8::try_from(source_route.len())
4058 .ok()
4059 .map(|hops| hops.clamp(1, 15));
4060
4061 existing.or(cached).or(route_len).or(Some(5))
4062 }
4063
4064 fn repeater_router_hint(&self) -> Option<RouterHint> {
4065 self.identities
4066 .iter()
4067 .filter_map(|slot| slot.as_ref())
4068 .next()
4069 .map(|slot| slot.identity().public_key().router_hint())
4070 }
4071
4072 fn effective_min_rssi(options: &ParsedOptions, repeater: &RepeaterConfig) -> Option<i16> {
4073 match (options.min_rssi, repeater.min_rssi) {
4074 (Some(packet), Some(local)) => Some(packet.max(local)),
4075 (Some(packet), None) => Some(packet),
4076 (None, Some(local)) => Some(local),
4077 (None, None) => None,
4078 }
4079 }
4080
4081 fn effective_min_snr(options: &ParsedOptions, repeater: &RepeaterConfig) -> Option<i8> {
4082 match (options.min_snr, repeater.min_snr) {
4083 (Some(packet), Some(local)) => Some(packet.max(local)),
4084 (Some(packet), None) => Some(packet),
4085 (None, Some(local)) => Some(local),
4086 (None, None) => None,
4087 }
4088 }
4089
4090 fn sample_flood_contention_delay_ms(&mut self, rx: &RxInfo, options: &ParsedOptions) -> u64 {
4091 let effective_threshold_db =
4092 Self::effective_min_snr(options, &self.repeater).unwrap_or(i8::MIN);
4093 let low_db = self
4094 .repeater
4095 .flood_contention_snr_low_db
4096 .max(effective_threshold_db);
4097 let high_db = self
4098 .repeater
4099 .flood_contention_snr_high_db
4100 .max(low_db.saturating_add(1));
4101 let low = i32::from(Snr::from_decibels(low_db).as_centibels());
4102 let high = i32::from(Snr::from_decibels(high_db).as_centibels());
4103 let received = i32::from(rx.snr.as_centibels());
4104 let clamped = (received - low).clamp(0, high - low) as u32;
4105 let range = (high - low) as u32;
4106 let t_frame_ms = u64::from(self.radio.t_frame_ms());
4107 let min_window_ms = t_frame_ms
4108 .saturating_mul(u64::from(self.repeater.flood_contention_min_window_percent))
4109 / 100;
4110 let max_window_ms = t_frame_ms
4111 .saturating_mul(u64::from(self.repeater.flood_contention_max_window_frames))
4112 .max(min_window_ms);
4113 let window_span_ms = max_window_ms.saturating_sub(min_window_ms);
4114 let window_ms = if range == 0 {
4115 max_window_ms
4116 } else {
4117 max_window_ms.saturating_sub(
4118 window_span_ms.saturating_mul(u64::from(clamped)) / u64::from(range),
4119 )
4120 };
4121 if window_ms == 0 {
4122 0
4123 } else {
4124 self.rng.random_range(..window_ms.saturating_add(1))
4125 }
4126 }
4127
4128 pub(crate) fn forward_dup_key(header: &PacketHeader, frame: &[u8]) -> Option<DupCacheKey> {
4139 Self::routable_packet_identity(header, frame)
4140 }
4141
4142 fn routable_packet_identity(header: &PacketHeader, frame: &[u8]) -> Option<DupCacheKey> {
4143 if !header.packet_type().is_secure() {
4144 return Some(DupCacheKey::Hash32(Self::normalized_routable_hash32(
4145 header, frame,
4146 )));
4147 }
4148 let options = ParsedOptions::extract(frame, header.options_range.clone()).ok()?;
4149 let mic = frame.get(header.mic_range.clone())?;
4150 if mic.is_empty() || mic.len() > 16 {
4151 return None;
4152 }
4153 let mut bytes = [0u8; 16];
4154 bytes[..mic.len()].copy_from_slice(mic);
4155 Some(DupCacheKey::Mic {
4156 bytes,
4157 len: mic.len() as u8,
4158 route_retry: options.route_retry,
4159 })
4160 }
4161
4162 fn defer_pending_forward(&mut self, key: &DupCacheKey, rx: &RxInfo, options: &ParsedOptions) {
4163 let Some(queued) = self.tx_queue.remove_first_matching(|entry| {
4164 entry.priority == TxPriority::Forward
4165 && Self::confirmation_key(entry.frame.as_slice())
4166 .map(|entry_key| &entry_key == key)
4167 .unwrap_or(false)
4168 }) else {
4169 return;
4170 };
4171
4172 if queued.forward_deferrals >= self.repeater.flood_contention_max_deferrals {
4173 return;
4174 }
4175
4176 let now_ms = self.clock.now_ms();
4177 let delay_ms = self.sample_flood_contention_delay_ms(rx, options);
4178 let _ = self.tx_queue.enqueue_with_state(
4179 queued.priority,
4180 queued.frame.as_slice(),
4181 queued.receipt,
4182 queued.identity_id,
4183 now_ms.saturating_add(delay_ms),
4184 queued.cad_attempts,
4185 queued.forward_deferrals.saturating_add(1),
4186 );
4187 }
4188
4189 async fn service_post_tx_listen(
4190 &mut self,
4191 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
4192 ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
4193 loop {
4194 self.expire_post_tx_listen_if_needed();
4195 if self.post_tx_listen.is_none() {
4196 return Ok(());
4197 }
4198
4199 let handled = self.receive_one(&mut on_event).await?;
4200 if !handled {
4201 return Ok(());
4202 }
4203 }
4204 }
4205
4206 fn note_transmitted_ack_requested(&mut self, receipt: SendReceipt, frame: &[u8]) {
4207 let sent_ms = self.clock.now_ms();
4208 let direct_ack_deadline_ms = sent_ms.saturating_add(self.direct_ack_timeout_ms());
4209 let forwarded_ack_deadline_ms = sent_ms.saturating_add(self.forwarded_ack_timeout_ms());
4210 let confirm_timeout_ms = self.forward_confirm_timeout_ms();
4211 let confirm_key = Self::confirmation_key(frame);
4212
4213 let post_tx_listen = {
4214 let Some((identity_id, pending)) = self.pending_ack_mut(receipt) else {
4215 return;
4216 };
4217
4218 let needs_forward_confirmation = match pending.state {
4219 crate::AckState::Queued {
4220 needs_forward_confirmation,
4221 } => needs_forward_confirmation,
4222 crate::AckState::RetryQueued => true,
4223 _ => return,
4224 };
4225
4226 pending.sent_ms = sent_ms;
4227 if pending.ack_deadline_ms == 0 {
4228 pending.ack_deadline_ms = if needs_forward_confirmation {
4229 forwarded_ack_deadline_ms
4230 } else {
4231 direct_ack_deadline_ms
4232 };
4233 }
4234
4235 if needs_forward_confirmation {
4236 let deadline_ms = sent_ms.saturating_add(confirm_timeout_ms);
4237 pending.state = crate::AckState::AwaitingForward {
4238 confirm_deadline_ms: deadline_ms,
4239 };
4240 confirm_key.map(|confirm_key| PostTxListen {
4241 identity_id,
4242 receipt,
4243 confirm_key,
4244 deadline_ms,
4245 })
4246 } else {
4247 pending.state = crate::AckState::AwaitingAck;
4248 None
4249 }
4250 };
4251
4252 self.post_tx_listen = post_tx_listen;
4253 }
4254
4255 fn expire_post_tx_listen_if_needed(&mut self) {
4256 let should_clear = self
4257 .post_tx_listen
4258 .as_ref()
4259 .map(|listen| self.clock.now_ms() >= listen.deadline_ms)
4260 .unwrap_or(false);
4261 if should_clear {
4262 self.post_tx_listen = None;
4263 }
4264 }
4265
4266 fn forward_confirm_timeout_ms(&self) -> u64 {
4267 let t_frame_ms = u64::from(self.radio.t_frame_ms());
4268 t_frame_ms
4269 .saturating_add(self.max_forward_contention_delay_ms())
4270 .saturating_add(t_frame_ms)
4271 }
4272
4273 fn max_forward_contention_delay_ms(&self) -> u64 {
4274 u64::from(self.radio.t_frame_ms())
4275 .saturating_mul(u64::from(self.repeater.flood_contention_max_window_frames))
4276 }
4277
4278 fn forward_retry_backoff_cap_ms(&self, retry_number: u8) -> u32 {
4279 Self::forward_retry_backoff_cap_ms_for_t_frame(self.radio.t_frame_ms(), retry_number)
4280 }
4281
4282 fn forward_retry_backoff_cap_ms_for_t_frame(t_frame_ms: u32, retry_number: u8) -> u32 {
4283 let exponent = retry_number.saturating_sub(1).min(2);
4284 t_frame_ms
4285 .saturating_mul(1u32 << exponent)
4286 .min(t_frame_ms.saturating_mul(4))
4287 }
4288
4289 fn can_attempt_route_retry(pending: &PendingAck<FRAME>) -> bool {
4290 let Ok(header) = PacketHeader::parse(pending.resend.frame.as_slice()) else {
4291 return false;
4292 };
4293 let Ok(options) = ParsedOptions::extract(
4294 pending.resend.frame.as_slice(),
4295 header.options_range.clone(),
4296 ) else {
4297 return false;
4298 };
4299 !options.route_retry
4300 && pending
4301 .resend
4302 .source_route
4303 .as_ref()
4304 .map(|route| !route.is_empty())
4305 .unwrap_or(false)
4306 }
4307
4308 fn direct_ack_timeout_ms(&self) -> u64 {
4309 u64::from(self.radio.t_frame_ms()).saturating_mul(10)
4310 }
4311
4312 fn forwarded_ack_timeout_ms(&self) -> u64 {
4313 let mut total = self.forward_confirm_timeout_ms();
4314 for retry_number in 1..=MAX_FORWARD_RETRIES {
4315 total = total
4316 .saturating_add(u64::from(self.forward_retry_backoff_cap_ms(retry_number)))
4317 .saturating_add(self.forward_confirm_timeout_ms());
4318 }
4319 total.saturating_add(u64::from(self.radio.t_frame_ms()))
4320 }
4321
4322 fn pending_ack_mut(
4323 &mut self,
4324 receipt: SendReceipt,
4325 ) -> Option<(LocalIdentityId, &mut PendingAck<FRAME>)> {
4326 for (index, slot) in self.identities.iter_mut().enumerate() {
4327 let Some(slot) = slot.as_mut() else {
4328 continue;
4329 };
4330 if let Some(pending) = slot.pending_ack_mut(&receipt) {
4331 return Some((LocalIdentityId(index as u8), pending));
4332 }
4333 }
4334 None
4335 }
4336
4337 pub(crate) fn confirmation_key(frame: &[u8]) -> Option<DupCacheKey> {
4338 let header = PacketHeader::parse(frame).ok()?;
4339 Self::routable_packet_identity(&header, frame)
4340 }
4341
4342 fn normalized_routable_hash32(header: &PacketHeader, frame: &[u8]) -> u32 {
4343 let mut hash = 0x811C_9DC5u32;
4344
4345 Self::hash_u8(&mut hash, header.packet_type() as u8);
4346 Self::hash_u8(&mut hash, header.fcf.full_source() as u8);
4347
4348 if !header.options_range.is_empty() {
4349 for entry in umsh_core::iter_options(frame, header.options_range.clone()) {
4350 let Ok((number, value)) = entry else {
4351 continue;
4352 };
4353 let option = OptionNumber::from(number);
4354 if option.is_dynamic() {
4355 continue;
4356 }
4357 Self::hash_u16(&mut hash, number);
4358 Self::hash_u16(&mut hash, value.len() as u16);
4359 Self::hash_bytes(&mut hash, value);
4360 }
4361 }
4362
4363 match header.packet_type() {
4364 PacketType::Broadcast => {
4365 match header.source {
4366 umsh_core::SourceAddrRef::Hint(hint) => Self::hash_bytes(&mut hash, &hint.0),
4367 umsh_core::SourceAddrRef::FullKeyAt { offset } => {
4368 if let Some(key) = frame.get(offset..offset + 32) {
4369 Self::hash_bytes(&mut hash, key);
4370 }
4371 }
4372 umsh_core::SourceAddrRef::Encrypted { offset, len } => {
4373 if let Some(src) = frame.get(offset..offset + len) {
4374 Self::hash_bytes(&mut hash, src);
4375 }
4376 }
4377 umsh_core::SourceAddrRef::None => {}
4378 }
4379 if let Some(payload) = frame.get(header.body_range.clone()) {
4380 Self::hash_bytes(&mut hash, payload);
4381 }
4382 }
4383 PacketType::MacAck => {
4384 if let Some(dst) = header.ack_dst {
4385 Self::hash_bytes(&mut hash, &dst.0);
4386 }
4387 if let Some(tag) = frame.get(header.mic_range.clone()) {
4388 Self::hash_bytes(&mut hash, tag);
4389 }
4390 }
4391 _ => {
4392 if let Some(bytes) = frame.get(header.body_range.clone()) {
4393 Self::hash_bytes(&mut hash, bytes);
4394 }
4395 }
4396 }
4397 hash
4398 }
4399
4400 fn hash_u8(hash: &mut u32, value: u8) {
4401 *hash ^= u32::from(value);
4402 *hash = hash.wrapping_mul(0x0100_0193);
4403 }
4404
4405 fn hash_u16(hash: &mut u32, value: u16) {
4406 Self::hash_bytes(hash, &value.to_be_bytes());
4407 }
4408
4409 fn hash_bytes(hash: &mut u32, bytes: &[u8]) {
4410 for byte in bytes {
4411 Self::hash_u8(hash, *byte);
4412 }
4413 }
4414
4415 fn observe_forwarding_confirmation(
4420 &mut self,
4421 frame: &[u8],
4422 ) -> Option<(LocalIdentityId, SendReceipt)> {
4423 self.expire_post_tx_listen_if_needed();
4424 let listen = self.post_tx_listen.clone()?;
4425
4426 let received_key = Self::confirmation_key(frame)?;
4427 if received_key != listen.confirm_key {
4428 return None;
4429 }
4430
4431 let Some(slot) = self.identity_mut(listen.identity_id) else {
4432 self.post_tx_listen = None;
4433 return None;
4434 };
4435 let Some(pending) = slot.pending_ack_mut(&listen.receipt) else {
4436 self.post_tx_listen = None;
4437 return None;
4438 };
4439 if !matches!(pending.state, crate::AckState::AwaitingForward { .. }) {
4440 self.post_tx_listen = None;
4441 return None;
4442 }
4443
4444 pending.state = crate::AckState::AwaitingAck;
4445 self.post_tx_listen = None;
4446 Some((listen.identity_id, listen.receipt))
4447 }
4448
4449 fn match_pending_peer_for_ack(
4450 &self,
4451 slot: &IdentitySlot<P::Identity, PEERS, ACKS, FRAME>,
4452 ack_tag_bytes: &[u8],
4453 ) -> Option<PublicKey> {
4454 if ack_tag_bytes.len() != 8 {
4455 return None;
4456 }
4457
4458 slot.pending_acks
4459 .iter()
4460 .find_map(|(_, pending)| (pending.ack_tag == ack_tag_bytes).then_some(pending.peer))
4461 }
4462}
4463
4464fn align_counter_boundary(value: u32) -> u32 {
4465 value & !COUNTER_PERSIST_BLOCK_MASK
4466}
4467
4468fn next_counter_persist_target(next_counter: u32) -> u32 {
4469 next_counter.wrapping_add(COUNTER_PERSIST_BLOCK_SIZE) & !COUNTER_PERSIST_BLOCK_MASK
4470}