1use core::num::NonZeroU8;
2use core::{future::poll_fn, task::Poll};
3
4use hamaddr::HamAddr;
5use heapless::{LinearMap, Vec};
6use rand::{Rng, RngExt as _};
7use umsh_core::{
8 BuildError, ChannelId, ChannelKey, FloodHops, NodeHint, OptionNumber, PacketBuilder,
9 PacketHeader, PacketType, ParseError, ParsedOptions, PayloadType, PublicKey, RouterHint,
10 SourceAddrRef, UnsealedPacket, feed_aad, options::OptionEncoder,
11};
12use umsh_crypto::{
13 CmacState, CryptoEngine, CryptoError, DerivedChannelKeys, NodeIdentity, PairwiseKeys,
14};
15use umsh_hal::{Clock, CounterStore, Radio, RxInfo, Snr, TxError, TxOptions};
16
17use crate::{
18 CapacityError, DEFAULT_ACKS, DEFAULT_CHANNELS, DEFAULT_DUP, DEFAULT_IDENTITIES, DEFAULT_PEERS,
19 DEFAULT_TX, MAX_CAD_ATTEMPTS, MAX_FORWARD_RETRIES, MAX_RESEND_FRAME_LEN, MAX_SOURCE_ROUTE_HOPS,
20 Platform, ReplayVerdict, ReplayWindow,
21 cache::{DupCacheKey, DuplicateCache},
22 peers::CachedRoute,
23 peers::{ChannelTable, PeerCryptoMap, PeerId, PeerRegistry},
24 send::{
25 PendingAck, PendingAckError, ResendRecord, SendOptions, SendReceipt, TxPriority, TxQueue,
26 },
27};
28
29const COUNTER_PERSIST_BLOCK_SIZE: u32 = 128;
30const COUNTER_PERSIST_BLOCK_MASK: u32 = COUNTER_PERSIST_BLOCK_SIZE - 1;
31const COUNTER_PERSIST_SCHEDULE_OFFSET: u32 = 100;
32const MAC_COMMAND_ECHO_REQUEST_ID: u8 = 4;
33const MAC_COMMAND_ECHO_RESPONSE_ID: u8 = 5;
34const COUNTER_RESYNC_NONCE_LEN: usize = 4;
35const COUNTER_RESYNC_REQUEST_RETRY_MS: u64 = 5_000;
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
38struct PendingCounterResync {
39 nonce: u32,
40 requested_ms: u64,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44struct DeferredCounterResyncFrame<const FRAME: usize> {
45 local_id: LocalIdentityId,
46 peer_id: PeerId,
47 frame: Vec<u8, FRAME>,
48 rssi: i16,
49 snr: Snr,
50 lqi: Option<NonZeroU8>,
51 received_at_ms: u64,
52}
53
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55struct ResolvedMulticastSource {
56 peer_id: Option<PeerId>,
57 public_key: Option<PublicKey>,
58 hint: Option<NodeHint>,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq)]
62struct PostTxListen {
63 identity_id: LocalIdentityId,
64 receipt: SendReceipt,
65 confirm_key: DupCacheKey,
66 deadline_ms: u64,
67}
68
69#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
82pub struct LocalIdentityId(pub u8);
83
84pub enum LocalIdentity<I: NodeIdentity> {
104 LongTerm(I),
106 #[cfg(feature = "software-crypto")]
107 Ephemeral(umsh_crypto::software::SoftwareIdentity),
109}
110
111impl<I: NodeIdentity> LocalIdentity<I> {
112 pub fn public_key(&self) -> &PublicKey {
114 match self {
115 Self::LongTerm(identity) => identity.public_key(),
116 #[cfg(feature = "software-crypto")]
117 Self::Ephemeral(identity) => identity.public_key(),
118 }
119 }
120
121 pub fn hint(&self) -> umsh_core::NodeHint {
123 self.public_key().hint()
124 }
125
126 pub fn is_ephemeral(&self) -> bool {
128 match self {
129 Self::LongTerm(_) => false,
130 #[cfg(feature = "software-crypto")]
131 Self::Ephemeral(_) => true,
132 }
133 }
134}
135
136impl<I: NodeIdentity> From<I> for LocalIdentity<I> {
137 fn from(value: I) -> Self {
138 Self::LongTerm(value)
139 }
140}
141
142pub struct IdentitySlot<
178 I: NodeIdentity,
179 const PEERS: usize,
180 const ACKS: usize,
181 const FRAME: usize = MAX_RESEND_FRAME_LEN,
182> {
183 identity: LocalIdentity<I>,
184 peer_crypto: PeerCryptoMap<PEERS>,
185 frame_counter: u32,
186 persisted_counter: u32,
187 pending_persist_target: Option<u32>,
188 save_scheduled_since_boot: bool,
189 counter_persistence_enabled: bool,
190 pending_acks: LinearMap<SendReceipt, PendingAck<FRAME>, ACKS>,
191 next_receipt: u32,
192 pfs_parent: Option<LocalIdentityId>,
193 pending_counter_resync: LinearMap<PeerId, PendingCounterResync, PEERS>,
194}
195
196impl<I: NodeIdentity, const PEERS: usize, const ACKS: usize, const FRAME: usize>
197 IdentitySlot<I, PEERS, ACKS, FRAME>
198{
199 pub fn new(
201 identity: LocalIdentity<I>,
202 frame_counter: u32,
203 pfs_parent: Option<LocalIdentityId>,
204 ) -> Self {
205 let counter_persistence_enabled = !identity.is_ephemeral();
206 Self {
207 identity,
208 peer_crypto: PeerCryptoMap::new(),
209 frame_counter,
210 persisted_counter: frame_counter,
211 pending_persist_target: None,
212 save_scheduled_since_boot: false,
213 counter_persistence_enabled,
214 pending_acks: LinearMap::new(),
215 next_receipt: 0,
216 pfs_parent,
217 pending_counter_resync: LinearMap::new(),
218 }
219 }
220
221 pub fn identity(&self) -> &LocalIdentity<I> {
223 &self.identity
224 }
225 pub fn peer_crypto(&self) -> &PeerCryptoMap<PEERS> {
227 &self.peer_crypto
228 }
229 pub fn peer_crypto_mut(&mut self) -> &mut PeerCryptoMap<PEERS> {
231 &mut self.peer_crypto
232 }
233 pub fn frame_counter(&self) -> u32 {
235 self.frame_counter
236 }
237 pub fn persisted_counter(&self) -> u32 {
239 self.persisted_counter
240 }
241 #[cfg(test)]
246 pub(crate) fn set_frame_counter(&mut self, value: u32) {
247 self.frame_counter = value;
248 }
249 pub fn pending_persist_target(&self) -> Option<u32> {
251 self.pending_persist_target
252 }
253
254 pub fn counter_persistence_enabled(&self) -> bool {
256 self.counter_persistence_enabled
257 }
258
259 pub(crate) fn advance_frame_counter(&mut self) -> u32 {
261 let current = self.frame_counter;
262 self.frame_counter = self.frame_counter.wrapping_add(1);
263 current
264 }
265
266 pub fn load_persisted_counter(&mut self, value: u32) {
268 let aligned = align_counter_boundary(value);
269 self.frame_counter = aligned;
270 self.persisted_counter = aligned;
271 self.pending_persist_target = None;
272 self.save_scheduled_since_boot = false;
273 }
274
275 fn schedule_counter_persist_if_needed(&mut self) {
276 if !self.counter_persistence_enabled {
277 return;
278 }
279
280 let should_schedule = !self.save_scheduled_since_boot
281 || (self.frame_counter & COUNTER_PERSIST_BLOCK_MASK) == COUNTER_PERSIST_SCHEDULE_OFFSET;
282 if !should_schedule {
283 return;
284 }
285
286 let target = next_counter_persist_target(self.frame_counter);
287 self.pending_persist_target = Some(
288 self.pending_persist_target
289 .map(|existing| existing.max(target))
290 .unwrap_or(target),
291 );
292 self.save_scheduled_since_boot = true;
293 }
294
295 fn mark_counter_persisted(&mut self, value: u32) {
296 let aligned = align_counter_boundary(value);
297 self.persisted_counter = aligned;
298 if self.pending_persist_target == Some(aligned) {
299 self.pending_persist_target = None;
300 }
301 }
302
303 fn counter_window_exhausted(&self) -> bool {
304 if !self.counter_persistence_enabled {
305 return false;
306 }
307
308 let ahead = self.persisted_counter.wrapping_sub(self.frame_counter);
309 if ahead > 0 && ahead <= COUNTER_PERSIST_BLOCK_SIZE {
310 return false;
311 }
312
313 if ahead == 0 {
314 return self.save_scheduled_since_boot;
315 }
316
317 self.frame_counter.wrapping_sub(self.persisted_counter) >= COUNTER_PERSIST_BLOCK_SIZE
318 }
319
320 pub fn next_receipt(&mut self) -> SendReceipt {
322 let receipt = SendReceipt(self.next_receipt);
323 self.next_receipt = self.next_receipt.wrapping_add(1);
324 receipt
325 }
326
327 #[cfg(test)]
329 pub(crate) fn set_next_receipt_for_test(&mut self, value: u32) {
330 self.next_receipt = value;
331 }
332
333 pub fn try_insert_pending_ack(
335 &mut self,
336 receipt: SendReceipt,
337 pending: PendingAck<FRAME>,
338 ) -> Result<Option<PendingAck<FRAME>>, PendingAckError> {
339 self.pending_acks
340 .insert(receipt, pending)
341 .map_err(|_| PendingAckError::TableFull)
342 }
343
344 pub fn pending_ack(&self, receipt: &SendReceipt) -> Option<&PendingAck<FRAME>> {
346 self.pending_acks.get(receipt)
347 }
348
349 pub fn pending_ack_mut(&mut self, receipt: &SendReceipt) -> Option<&mut PendingAck<FRAME>> {
351 self.pending_acks.get_mut(receipt)
352 }
353
354 pub fn remove_pending_ack(&mut self, receipt: &SendReceipt) -> Option<PendingAck<FRAME>> {
356 self.pending_acks.remove(receipt)
357 }
358
359 pub fn pfs_parent(&self) -> Option<LocalIdentityId> {
361 self.pfs_parent
362 }
363
364 fn pending_counter_resync(&self) -> &LinearMap<PeerId, PendingCounterResync, PEERS> {
366 &self.pending_counter_resync
367 }
368
369 fn pending_counter_resync_mut(
371 &mut self,
372 ) -> &mut LinearMap<PeerId, PendingCounterResync, PEERS> {
373 &mut self.pending_counter_resync
374 }
375}
376
377#[derive(Clone, Debug, PartialEq, Eq)]
397pub struct ChannelPolicy {
398 pub channel_id: ChannelId,
400 pub require_unencrypted: bool,
402 pub require_full_source: bool,
404 pub max_flood_hops: Option<u8>,
406}
407
408#[derive(Clone, Copy, Debug, PartialEq, Eq)]
426pub enum AmateurRadioMode {
427 Unlicensed,
434 LicensedOnly,
440 Hybrid,
447}
448
449#[derive(Clone, Copy, Debug, PartialEq, Eq)]
450enum TransmitAuthority {
451 Unlicensed,
452 Amateur,
453}
454
455#[derive(Clone, Copy, Debug, PartialEq, Eq)]
456enum ForwardStationAction {
457 Remove,
458 Replace,
459}
460
461#[derive(Clone, Copy, Debug, PartialEq, Eq)]
462struct ForwardPlan {
463 router_hint: RouterHint,
464 consume_source_route: bool,
465 decrement_flood_hops: bool,
466 insert_region_code: Option<[u8; 2]>,
467 delay_ms: u64,
468 station_action: ForwardStationAction,
469}
470
471#[derive(Clone, Debug, PartialEq, Eq)]
490pub struct OperatingPolicy {
491 pub amateur_radio_mode: AmateurRadioMode,
493 pub operator_callsign: Option<HamAddr>,
495 pub channel_policies: Vec<ChannelPolicy, 4>,
497}
498
499impl Default for OperatingPolicy {
500 fn default() -> Self {
501 Self {
502 amateur_radio_mode: AmateurRadioMode::Unlicensed,
503 operator_callsign: None,
504 channel_policies: Vec::new(),
505 }
506 }
507}
508
509#[derive(Clone, Debug, PartialEq, Eq)]
538pub struct RepeaterConfig {
539 pub enabled: bool,
541 pub regions: Vec<[u8; 2], 8>,
543 pub min_rssi: Option<i16>,
545 pub min_snr: Option<i8>,
547 pub flood_contention_snr_low_db: i8,
549 pub flood_contention_snr_high_db: i8,
551 pub flood_contention_min_window_percent: u8,
553 pub flood_contention_max_window_frames: u8,
555 pub flood_contention_max_deferrals: u8,
557 pub amateur_radio_mode: AmateurRadioMode,
559 pub station_callsign: Option<HamAddr>,
561}
562
563impl Default for RepeaterConfig {
564 fn default() -> Self {
565 Self {
566 enabled: false,
567 regions: Vec::new(),
568 min_rssi: None,
569 min_snr: None,
570 flood_contention_snr_low_db: -6,
571 flood_contention_snr_high_db: 15,
572 flood_contention_min_window_percent: 20,
573 flood_contention_max_window_frames: 2,
574 flood_contention_max_deferrals: 3,
575 amateur_radio_mode: AmateurRadioMode::Unlicensed,
576 station_callsign: None,
577 }
578 }
579}
580
581#[derive(Clone, Debug, PartialEq, Eq)]
587pub enum SendError {
588 IdentityMissing,
591 PeerMissing,
594 PairwiseKeysMissing,
598 IdentityAgreementFailed,
600 ChannelMissing,
603 PolicyViolation,
606 AckUnsupported,
608 EncryptionUnsupported,
610 SaltUnsupported,
612 Build(BuildError),
615 Parse(ParseError),
618 Crypto(CryptoError),
621 QueueFull,
624 PendingAckFull,
627 CounterPersistenceLag,
631}
632
633impl From<BuildError> for SendError {
634 fn from(value: BuildError) -> Self {
635 Self::Build(value)
636 }
637}
638
639impl From<ParseError> for SendError {
640 fn from(value: ParseError) -> Self {
641 Self::Parse(value)
642 }
643}
644
645impl From<CryptoError> for SendError {
646 fn from(value: CryptoError) -> Self {
647 Self::Crypto(value)
648 }
649}
650
651#[derive(Clone, Debug, PartialEq, Eq)]
657pub enum MacError<RadioError> {
658 Radio(RadioError),
662 Transmit(TxError<RadioError>),
665 QueueFull,
669}
670
671#[derive(Clone, Debug, PartialEq, Eq)]
677pub enum CounterPersistenceError<StoreError> {
678 IdentityMissing,
680 Store(StoreError),
684}
685
686impl<RadioError> From<RadioError> for MacError<RadioError> {
687 fn from(value: RadioError) -> Self {
688 Self::Radio(value)
689 }
690}
691
692impl<RadioError> From<TxError<RadioError>> for MacError<RadioError> {
693 fn from(value: TxError<RadioError>) -> Self {
694 Self::Transmit(value)
695 }
696}
697
698pub struct Mac<
760 P: Platform,
761 const IDENTITIES: usize = DEFAULT_IDENTITIES,
762 const PEERS: usize = DEFAULT_PEERS,
763 const CHANNELS: usize = DEFAULT_CHANNELS,
764 const ACKS: usize = DEFAULT_ACKS,
765 const TX: usize = DEFAULT_TX,
766 const FRAME: usize = MAX_RESEND_FRAME_LEN,
767 const DUP: usize = DEFAULT_DUP,
768> {
769 radio: P::Radio,
770 crypto: CryptoEngine<P::Aes, P::Sha>,
771 clock: P::Clock,
772 rng: P::Rng,
773 counter_store: P::CounterStore,
774 identities: Vec<Option<IdentitySlot<P::Identity, PEERS, ACKS, FRAME>>, IDENTITIES>,
775 peer_registry: PeerRegistry<PEERS>,
776 channels: ChannelTable<CHANNELS>,
777 dup_cache: DuplicateCache<DUP>,
778 multicast_unknown_dup_cache: DuplicateCache<DUP>,
779 tx_queue: TxQueue<TX, FRAME>,
780 post_tx_listen: Option<PostTxListen>,
781 repeater: RepeaterConfig,
782 operating_policy: OperatingPolicy,
783 auto_register_full_key_peers: bool,
784 deferred_counter_resync_frame: Option<DeferredCounterResyncFrame<FRAME>>,
785}
786
787impl<
788 P: Platform,
789 const IDENTITIES: usize,
790 const PEERS: usize,
791 const CHANNELS: usize,
792 const ACKS: usize,
793 const TX: usize,
794 const FRAME: usize,
795 const DUP: usize,
796> Mac<P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>
797{
798 pub fn new(
800 radio: P::Radio,
801 crypto: CryptoEngine<P::Aes, P::Sha>,
802 clock: P::Clock,
803 rng: P::Rng,
804 counter_store: P::CounterStore,
805 repeater: RepeaterConfig,
806 operating_policy: OperatingPolicy,
807 ) -> Self {
808 Self {
809 radio,
810 crypto,
811 clock,
812 rng,
813 counter_store,
814 identities: Vec::new(),
815 peer_registry: PeerRegistry::new(),
816 channels: ChannelTable::new(),
817 dup_cache: DuplicateCache::new(),
818 multicast_unknown_dup_cache: DuplicateCache::new(),
819 tx_queue: TxQueue::new(),
820 post_tx_listen: None,
821 repeater,
822 operating_policy,
823 auto_register_full_key_peers: false,
824 deferred_counter_resync_frame: None,
825 }
826 }
827
828 pub fn radio(&self) -> &P::Radio {
830 &self.radio
831 }
832
833 pub fn radio_mut(&mut self) -> &mut P::Radio {
835 &mut self.radio
836 }
837
838 pub fn crypto(&self) -> &CryptoEngine<P::Aes, P::Sha> {
840 &self.crypto
841 }
842
843 pub fn clock(&self) -> &P::Clock {
845 &self.clock
846 }
847
848 pub fn rng(&self) -> &P::Rng {
850 &self.rng
851 }
852
853 pub fn rng_mut(&mut self) -> &mut P::Rng {
855 &mut self.rng
856 }
857
858 pub fn counter_store(&self) -> &P::CounterStore {
860 &self.counter_store
861 }
862 pub fn tx_queue(&self) -> &TxQueue<TX, FRAME> {
864 &self.tx_queue
865 }
866 pub fn tx_queue_mut(&mut self) -> &mut TxQueue<TX, FRAME> {
868 &mut self.tx_queue
869 }
870 pub fn dup_cache(&self) -> &DuplicateCache<DUP> {
872 &self.dup_cache
873 }
874 pub fn peer_registry(&self) -> &PeerRegistry<PEERS> {
876 &self.peer_registry
877 }
878 pub fn peer_registry_mut(&mut self) -> &mut PeerRegistry<PEERS> {
880 &mut self.peer_registry
881 }
882 pub fn channels(&self) -> &ChannelTable<CHANNELS> {
884 &self.channels
885 }
886 pub fn channels_mut(&mut self) -> &mut ChannelTable<CHANNELS> {
888 &mut self.channels
889 }
890 pub fn repeater_config(&self) -> &RepeaterConfig {
892 &self.repeater
893 }
894 pub fn repeater_config_mut(&mut self) -> &mut RepeaterConfig {
896 &mut self.repeater
897 }
898 pub fn operating_policy(&self) -> &OperatingPolicy {
900 &self.operating_policy
901 }
902 pub fn operating_policy_mut(&mut self) -> &mut OperatingPolicy {
904 &mut self.operating_policy
905 }
906
907 pub fn auto_register_full_key_peers(&self) -> bool {
909 self.auto_register_full_key_peers
910 }
911
912 pub fn set_auto_register_full_key_peers(&mut self, enabled: bool) {
914 self.auto_register_full_key_peers = enabled;
915 }
916
917 pub fn add_identity(
919 &mut self,
920 identity: P::Identity,
921 ) -> Result<LocalIdentityId, CapacityError> {
922 self.insert_identity(LocalIdentity::LongTerm(identity), None)
923 }
924
925 pub async fn load_persisted_counter(
927 &mut self,
928 id: LocalIdentityId,
929 ) -> Result<u32, CounterPersistenceError<<P::CounterStore as CounterStore>::Error>> {
930 let context = {
931 let slot = self
932 .identity(id)
933 .ok_or(CounterPersistenceError::IdentityMissing)?;
934 if !slot.counter_persistence_enabled() {
935 return Ok(slot.frame_counter());
936 }
937 *slot.identity().public_key()
938 };
939 let loaded = self
940 .counter_store
941 .load(&context.0)
942 .await
943 .map_err(CounterPersistenceError::Store)?;
944 let aligned = align_counter_boundary(loaded);
945 let slot = self
946 .identity_mut(id)
947 .ok_or(CounterPersistenceError::IdentityMissing)?;
948 slot.load_persisted_counter(aligned);
949 Ok(aligned)
950 }
951
952 pub async fn service_counter_persistence(
954 &mut self,
955 ) -> Result<usize, <P::CounterStore as CounterStore>::Error> {
956 let mut pending = Vec::<(LocalIdentityId, [u8; 32], u32), IDENTITIES>::new();
957 for (index, slot) in self.identities.iter().enumerate() {
958 let Some(slot) = slot.as_ref() else {
959 continue;
960 };
961 let Some(target) = slot.pending_persist_target() else {
962 continue;
963 };
964 if !slot.counter_persistence_enabled() {
965 continue;
966 }
967 pending
968 .push((
969 LocalIdentityId(index as u8),
970 slot.identity().public_key().0,
971 target,
972 ))
973 .expect("identity enumeration must fit configured identity capacity");
974 }
975
976 let mut wrote = 0usize;
977 for (_, context, target) in pending.iter() {
978 self.counter_store
979 .store(context, align_counter_boundary(*target))
980 .await?;
981 wrote += 1;
982 }
983 if wrote > 0 {
984 self.counter_store.flush().await?;
985 for (id, _, target) in pending {
986 if let Some(slot) = self.identity_mut(id) {
987 slot.mark_counter_persisted(target);
988 }
989 }
990 }
991 Ok(wrote)
992 }
993
994 #[cfg(feature = "software-crypto")]
995 pub fn register_ephemeral(
997 &mut self,
998 parent: LocalIdentityId,
999 identity: umsh_crypto::software::SoftwareIdentity,
1000 ) -> Result<LocalIdentityId, CapacityError> {
1001 self.insert_identity(LocalIdentity::Ephemeral(identity), Some(parent))
1002 }
1003
1004 #[cfg(feature = "software-crypto")]
1005 pub fn remove_ephemeral(&mut self, id: LocalIdentityId) -> bool {
1007 if let Some(slot) = self.identities.get_mut(id.0 as usize) {
1008 let should_remove = slot
1009 .as_ref()
1010 .map(|identity_slot| identity_slot.identity().is_ephemeral())
1011 .unwrap_or(false);
1012 if should_remove {
1013 *slot = None;
1014 return true;
1015 }
1016 }
1017 false
1018 }
1019
1020 pub fn identity(
1022 &self,
1023 id: LocalIdentityId,
1024 ) -> Option<&IdentitySlot<P::Identity, PEERS, ACKS, FRAME>> {
1025 self.identities.get(id.0 as usize)?.as_ref()
1026 }
1027
1028 pub fn identity_mut(
1030 &mut self,
1031 id: LocalIdentityId,
1032 ) -> Option<&mut IdentitySlot<P::Identity, PEERS, ACKS, FRAME>> {
1033 self.identities.get_mut(id.0 as usize)?.as_mut()
1034 }
1035
1036 pub fn add_peer(&mut self, key: PublicKey) -> Result<PeerId, CapacityError> {
1038 self.peer_registry.try_insert_or_update(key)
1039 }
1040
1041 pub fn add_channel(&mut self, key: ChannelKey) -> Result<(), CapacityError> {
1043 let derived = self.crypto.derive_channel_keys(&key);
1044 self.channels.try_add(key, derived)
1045 }
1046
1047 pub fn add_named_channel(&mut self, name: &str) -> Result<(), CapacityError> {
1049 let key = self.crypto.derive_named_channel_key(name);
1050 self.add_channel(key)
1051 }
1052
1053 pub fn identity_count(&self) -> usize {
1055 self.identities.iter().filter(|slot| slot.is_some()).count()
1056 }
1057
1058 #[cfg(any(feature = "unsafe-advanced", test))]
1065 pub(crate) fn install_pairwise_keys(
1066 &mut self,
1067 identity_id: LocalIdentityId,
1068 peer_id: PeerId,
1069 pairwise_keys: PairwiseKeys,
1070 ) -> Result<Option<crate::peers::PeerCryptoState>, SendError> {
1071 let slot = self
1072 .identity_mut(identity_id)
1073 .ok_or(SendError::IdentityMissing)?;
1074 slot.peer_crypto_mut()
1075 .insert(
1076 peer_id,
1077 crate::peers::PeerCryptoState {
1078 pairwise_keys,
1079 replay_window: ReplayWindow::new(),
1080 },
1081 )
1082 .map_err(|_| SendError::QueueFull)
1083 }
1084
1085 #[cfg(feature = "unsafe-advanced")]
1092 pub fn install_pairwise_keys_advanced(
1093 &mut self,
1094 identity_id: LocalIdentityId,
1095 peer_id: PeerId,
1096 pairwise_keys: PairwiseKeys,
1097 ) -> Result<Option<crate::peers::PeerCryptoState>, SendError> {
1098 self.install_pairwise_keys(identity_id, peer_id, pairwise_keys)
1099 }
1100
1101 pub fn queue_broadcast(
1103 &mut self,
1104 from: LocalIdentityId,
1105 payload: &[u8],
1106 options: &SendOptions,
1107 ) -> Result<SendReceipt, SendError> {
1108 self.enforce_send_policy(None, options, false)?;
1109 if options.encrypted {
1110 return Err(SendError::EncryptionUnsupported);
1111 }
1112 if options.ack_requested {
1113 return Err(SendError::AckUnsupported);
1114 }
1115 if options.salt {
1116 return Err(SendError::SaltUnsupported);
1117 }
1118
1119 let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
1120 let source_key = *slot.identity().public_key();
1121 let receipt = slot.next_receipt();
1122 let mut buf = [0u8; FRAME];
1123 let builder = PacketBuilder::new(&mut buf).broadcast();
1124 let mut builder = if options.full_source {
1125 builder.source_full(&source_key)
1126 } else {
1127 builder.source_hint(source_key.hint())
1128 };
1129 if let Some(hops) = options.flood_hops {
1130 builder = builder.flood_hops(hops);
1131 }
1132 if options.trace_route {
1133 builder = builder.trace_route();
1134 }
1135 if let Some(route) = options.source_route.as_ref() {
1136 builder = builder.source_route(route.as_slice());
1137 }
1138 if let Some(region_code) = options.region_code {
1139 builder = builder.region_code(region_code);
1140 }
1141 if let Some(callsign) = self.operating_policy.operator_callsign {
1142 builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1143 }
1144 let frame = builder.payload(payload).build()?;
1145 if frame.len() > self.radio.max_frame_size() {
1146 return Err(SendError::Build(BuildError::BufferTooSmall));
1147 }
1148 self.tx_queue
1149 .enqueue(TxPriority::Application, frame, Some(receipt), Some(from))
1150 .map_err(|_| SendError::QueueFull)?;
1151 Ok(receipt)
1152 }
1153
1154 pub async fn send_broadcast(
1156 &mut self,
1157 from: LocalIdentityId,
1158 payload: &[u8],
1159 options: &SendOptions,
1160 ) -> Result<SendReceipt, SendError> {
1161 self.queue_broadcast(from, payload, options)
1162 }
1163
1164 pub fn queue_multicast(
1166 &mut self,
1167 from: LocalIdentityId,
1168 channel_id: &ChannelId,
1169 payload: &[u8],
1170 options: &SendOptions,
1171 ) -> Result<SendReceipt, SendError> {
1172 self.enforce_send_policy(Some(*channel_id), options, false)?;
1173 if options.ack_requested {
1174 return Err(SendError::AckUnsupported);
1175 }
1176
1177 let derived = self
1178 .channels
1179 .lookup_by_id(channel_id)
1180 .next()
1181 .ok_or(SendError::ChannelMissing)?
1182 .derived
1183 .clone();
1184 let keys = PairwiseKeys {
1185 k_enc: derived.k_enc,
1186 k_mic: derived.k_mic,
1187 };
1188 let receipt = self
1189 .identity_mut(from)
1190 .ok_or(SendError::IdentityMissing)?
1191 .next_receipt();
1192 let (source_key, frame_counter) = self.identity_and_advance(from)?;
1193 let salt = self.take_salt(options);
1194 let mut buf = [0u8; FRAME];
1195 let builder = PacketBuilder::new(&mut buf).multicast(*channel_id);
1196 let builder = if options.full_source {
1197 builder.source_full(&source_key)
1198 } else {
1199 builder.source_hint(source_key.hint())
1200 };
1201 let mut builder = builder.frame_counter(frame_counter);
1202 if options.encrypted {
1203 builder = builder.encrypted();
1204 }
1205 builder = builder.mic_size(options.mic_size);
1206 if let Some(salt) = salt {
1207 builder = builder.salt(salt);
1208 }
1209 if let Some(hops) = options.flood_hops {
1210 builder = builder.flood_hops(hops);
1211 }
1212 if options.trace_route {
1213 builder = builder.trace_route();
1214 }
1215 if let Some(route) = options.source_route.as_ref() {
1216 builder = builder.source_route(route.as_slice());
1217 }
1218 if let Some(region_code) = options.region_code {
1219 builder = builder.region_code(region_code);
1220 }
1221 if let Some(callsign) = self.operating_policy.operator_callsign {
1222 builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1223 }
1224 let mut packet = builder.payload(payload).build()?;
1225 self.crypto.seal_packet(&mut packet, &keys)?;
1226 self.enqueue_packet(packet, Some(receipt), Some(from))?;
1227 Ok(receipt)
1228 }
1229
1230 pub async fn send_multicast(
1232 &mut self,
1233 from: LocalIdentityId,
1234 channel_id: &ChannelId,
1235 payload: &[u8],
1236 options: &SendOptions,
1237 ) -> Result<SendReceipt, SendError> {
1238 self.queue_multicast(from, channel_id, payload, options)
1239 }
1240
1241 pub fn queue_mac_ack_for_peer(
1243 &mut self,
1244 peer_id: PeerId,
1245 dst: NodeHint,
1246 ack_tag: [u8; 8],
1247 ) -> Result<(), SendError> {
1248 let mut buf = [0u8; FRAME];
1249 let mut builder = PacketBuilder::new(&mut buf).mac_ack(dst, ack_tag);
1250 if let Some(peer) = self.peer_registry.get(peer_id) {
1251 match peer.route.as_ref() {
1252 Some(CachedRoute::Source(route)) => {
1253 builder = builder.source_route(route.as_slice());
1254 }
1255 Some(CachedRoute::Flood { hops }) => {
1256 builder = builder.flood_hops((*hops).clamp(1, 15));
1257 }
1258 None => {}
1259 }
1260 }
1261 let frame = builder.build()?;
1262 if frame.len() > self.radio.max_frame_size() {
1263 return Err(SendError::Build(BuildError::BufferTooSmall));
1264 }
1265 self.tx_queue
1266 .enqueue(TxPriority::ImmediateAck, frame, None, None)
1267 .map_err(|_| SendError::QueueFull)?;
1268 Ok(())
1269 }
1270
1271 pub fn queue_mac_ack(&mut self, dst: NodeHint, ack_tag: [u8; 8]) -> Result<(), SendError> {
1273 let mut buf = [0u8; FRAME];
1274 let frame = PacketBuilder::new(&mut buf).mac_ack(dst, ack_tag).build()?;
1275 if frame.len() > self.radio.max_frame_size() {
1276 return Err(SendError::Build(BuildError::BufferTooSmall));
1277 }
1278 self.tx_queue
1279 .enqueue(TxPriority::ImmediateAck, frame, None, None)
1280 .map_err(|_| SendError::QueueFull)?;
1281 Ok(())
1282 }
1283
1284 pub fn queue_unicast(
1286 &mut self,
1287 from: LocalIdentityId,
1288 peer: &PublicKey,
1289 payload: &[u8],
1290 options: &SendOptions,
1291 ) -> Result<Option<SendReceipt>, SendError> {
1292 self.enforce_send_policy(None, options, false)?;
1293 let (peer_id, _) = self
1294 .peer_registry
1295 .lookup_by_key(peer)
1296 .ok_or(SendError::PeerMissing)?;
1297 let pairwise_keys = self
1298 .identity(from)
1299 .ok_or(SendError::IdentityMissing)?
1300 .peer_crypto()
1301 .get(&peer_id)
1302 .ok_or(SendError::PairwiseKeysMissing)?
1303 .pairwise_keys
1304 .clone();
1305 let effective_source_route = self.effective_source_route(peer_id, options);
1306
1307 let (source_key, frame_counter) = self.identity_and_advance(from)?;
1308 let salt = self.take_salt(options);
1309 let mut buf = [0u8; FRAME];
1310 let builder = PacketBuilder::new(&mut buf).unicast(peer.hint());
1311 let builder = if options.full_source {
1312 builder.source_full(&source_key)
1313 } else {
1314 builder.source_hint(source_key.hint())
1315 };
1316 let mut builder = builder.frame_counter(frame_counter);
1317 if options.ack_requested {
1318 builder = builder.ack_requested();
1319 }
1320 if options.encrypted {
1321 builder = builder.encrypted();
1322 }
1323 builder = builder.mic_size(options.mic_size);
1324 if let Some(salt) = salt {
1325 builder = builder.salt(salt);
1326 }
1327 if let Some(hops) = options.flood_hops {
1328 builder = builder.flood_hops(hops);
1329 }
1330 if options.trace_route {
1331 builder = builder.trace_route();
1332 }
1333 if let Some(route) = effective_source_route.as_ref() {
1334 builder = builder.source_route(route.as_slice());
1335 }
1336 if let Some(region_code) = options.region_code {
1337 builder = builder.region_code(region_code);
1338 }
1339 if let Some(callsign) = self.operating_policy.operator_callsign {
1340 builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1341 }
1342 let mut packet = builder.payload(payload).build()?;
1343
1344 let receipt = if options.ack_requested {
1345 Some(self.prepare_pending_ack(from, *peer, &packet, &pairwise_keys, options)?)
1346 } else {
1347 None
1348 };
1349
1350 self.crypto.seal_packet(&mut packet, &pairwise_keys)?;
1351 if let Some(receipt) = receipt {
1352 self.refresh_pending_resend(
1353 from,
1354 receipt,
1355 packet.as_bytes(),
1356 effective_source_route
1357 .as_ref()
1358 .map(|route| route.as_slice()),
1359 )?;
1360 }
1361 if let Err(err) = self.enqueue_packet(packet, receipt, Some(from)) {
1362 if let Some(receipt) = receipt {
1363 let _ = self
1364 .identity_mut(from)
1365 .and_then(|slot| slot.remove_pending_ack(&receipt));
1366 }
1367 return Err(err);
1368 }
1369 Ok(receipt)
1370 }
1371
1372 pub async fn send_unicast(
1374 &mut self,
1375 from: LocalIdentityId,
1376 peer: &PublicKey,
1377 payload: &[u8],
1378 options: &SendOptions,
1379 ) -> Result<Option<SendReceipt>, SendError> {
1380 let (peer_id, _) = self
1381 .peer_registry
1382 .lookup_by_key(peer)
1383 .ok_or(SendError::PeerMissing)?;
1384 let _ = self.ensure_peer_crypto(from, peer_id).await?;
1385 self.queue_unicast(from, peer, payload, options)
1386 }
1387
1388 pub fn queue_blind_unicast(
1390 &mut self,
1391 from: LocalIdentityId,
1392 peer: &PublicKey,
1393 channel_id: &ChannelId,
1394 payload: &[u8],
1395 options: &SendOptions,
1396 ) -> Result<Option<SendReceipt>, SendError> {
1397 self.enforce_send_policy(Some(*channel_id), options, true)?;
1398 let (peer_id, _) = self
1399 .peer_registry
1400 .lookup_by_key(peer)
1401 .ok_or(SendError::PeerMissing)?;
1402 let pairwise_keys = self
1403 .identity(from)
1404 .ok_or(SendError::IdentityMissing)?
1405 .peer_crypto()
1406 .get(&peer_id)
1407 .ok_or(SendError::PairwiseKeysMissing)?
1408 .pairwise_keys
1409 .clone();
1410 let channel_keys = self
1411 .channels
1412 .lookup_by_id(channel_id)
1413 .next()
1414 .ok_or(SendError::ChannelMissing)?
1415 .derived
1416 .clone();
1417 let blind_keys = self.crypto.derive_blind_keys(&pairwise_keys, &channel_keys);
1418 let effective_source_route = self.effective_source_route(peer_id, options);
1419
1420 let (source_key, frame_counter) = self.identity_and_advance(from)?;
1421 let salt = self.take_salt(options);
1422 let mut buf = [0u8; FRAME];
1423 let builder = PacketBuilder::new(&mut buf).blind_unicast(*channel_id, peer.hint());
1424 let builder = if options.full_source {
1425 builder.source_full(&source_key)
1426 } else {
1427 builder.source_hint(source_key.hint())
1428 };
1429 let mut builder = builder.frame_counter(frame_counter);
1430 if options.ack_requested {
1431 builder = builder.ack_requested();
1432 }
1433 if !options.encrypted {
1434 builder = builder.unencrypted();
1435 }
1436 builder = builder.mic_size(options.mic_size);
1437 if let Some(salt) = salt {
1438 builder = builder.salt(salt);
1439 }
1440 if let Some(hops) = options.flood_hops {
1441 builder = builder.flood_hops(hops);
1442 }
1443 if options.trace_route {
1444 builder = builder.trace_route();
1445 }
1446 if let Some(route) = effective_source_route.as_ref() {
1447 builder = builder.source_route(route.as_slice());
1448 }
1449 if let Some(region_code) = options.region_code {
1450 builder = builder.region_code(region_code);
1451 }
1452 if let Some(callsign) = self.operating_policy.operator_callsign {
1453 builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1454 }
1455 let mut packet = builder.payload(payload).build()?;
1456
1457 let receipt = if options.ack_requested {
1458 Some(self.prepare_pending_ack(from, *peer, &packet, &blind_keys, options)?)
1459 } else {
1460 None
1461 };
1462
1463 self.crypto
1464 .seal_blind_packet(&mut packet, &blind_keys, &channel_keys)
1465 .map_err(SendError::Crypto)?;
1466 if let Some(receipt) = receipt {
1467 self.refresh_pending_resend(
1468 from,
1469 receipt,
1470 packet.as_bytes(),
1471 effective_source_route
1472 .as_ref()
1473 .map(|route| route.as_slice()),
1474 )?;
1475 }
1476 if let Err(err) = self.enqueue_packet(packet, receipt, Some(from)) {
1477 if let Some(receipt) = receipt {
1478 let _ = self
1479 .identity_mut(from)
1480 .and_then(|slot| slot.remove_pending_ack(&receipt));
1481 }
1482 return Err(err);
1483 }
1484 Ok(receipt)
1485 }
1486
1487 pub async fn send_blind_unicast(
1489 &mut self,
1490 from: LocalIdentityId,
1491 peer: &PublicKey,
1492 channel_id: &ChannelId,
1493 payload: &[u8],
1494 options: &SendOptions,
1495 ) -> Result<Option<SendReceipt>, SendError> {
1496 let (peer_id, _) = self
1497 .peer_registry
1498 .lookup_by_key(peer)
1499 .ok_or(SendError::PeerMissing)?;
1500 let _ = self.ensure_peer_crypto(from, peer_id).await?;
1501 self.queue_blind_unicast(from, peer, channel_id, payload, options)
1502 }
1503
1504 pub async fn transmit_next(
1512 &mut self,
1513 on_event: &mut impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1514 ) -> Result<Option<SendReceipt>, MacError<<P::Radio as Radio>::Error>> {
1515 self.expire_post_tx_listen_if_needed();
1516 let Some(queued) = self.tx_queue.pop_next() else {
1517 return Ok(None);
1518 };
1519 let now_ms = self.clock.now_ms();
1520
1521 if queued.not_before_ms > now_ms {
1522 self.requeue_tx(&queued).map_err(|_| MacError::QueueFull)?;
1523 return Ok(None);
1524 }
1525
1526 if self.post_tx_listen.is_some() && queued.priority != TxPriority::ImmediateAck {
1527 self.requeue_tx(&queued).map_err(|_| MacError::QueueFull)?;
1528 return Ok(None);
1529 }
1530
1531 let receipt = queued.receipt;
1532 let identity_id = queued.identity_id;
1533 let tx_options = if queued.priority == TxPriority::ImmediateAck {
1534 TxOptions::default()
1535 } else {
1536 TxOptions {
1537 cad_timeout_ms: Some(0),
1538 }
1539 };
1540 match self
1541 .radio
1542 .transmit(queued.frame.as_slice(), tx_options)
1543 .await
1544 {
1545 Ok(()) => {}
1546 Err(TxError::CadTimeout) => {
1547 let next_attempt = queued.cad_attempts.saturating_add(1);
1548 if next_attempt >= MAX_CAD_ATTEMPTS {
1549 return Ok(None);
1550 }
1551 let backoff_ms = u64::from(
1552 self.rng
1553 .random_range(..self.radio.t_frame_ms().saturating_add(1)),
1554 );
1555 self.tx_queue
1556 .enqueue_with_state(
1557 queued.priority,
1558 queued.frame.as_slice(),
1559 queued.receipt,
1560 queued.identity_id,
1561 now_ms.saturating_add(backoff_ms),
1562 next_attempt,
1563 queued.forward_deferrals,
1564 )
1565 .map_err(|_| MacError::QueueFull)?;
1566 return Ok(None);
1567 }
1568 Err(error) => return Err(MacError::Transmit(error)),
1569 }
1570 if let Some(identity_id) = identity_id {
1571 on_event(
1572 identity_id,
1573 crate::MacEventRef::Transmitted {
1574 identity_id,
1575 receipt,
1576 },
1577 );
1578 }
1579 if let Some(receipt) = receipt {
1580 self.note_transmitted_ack_requested(receipt, queued.frame.as_slice());
1581 }
1582 Ok(receipt)
1583 }
1584
1585 pub async fn drain_tx_queue(
1590 &mut self,
1591 on_event: &mut impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1592 ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1593 while !self.tx_queue.is_empty() {
1594 let queue_len = self.tx_queue.len();
1595 let _ = self.transmit_next(on_event).await?;
1596 if self.tx_queue.len() >= queue_len {
1597 break;
1598 }
1599 }
1600 Ok(())
1601 }
1602
1603 pub async fn poll_cycle(
1616 &mut self,
1617 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1618 ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1619 self.drain_tx_queue(&mut on_event).await?;
1620 if self.post_tx_listen.is_some() {
1621 self.service_post_tx_listen(&mut on_event).await?;
1622 } else {
1623 let _ = self.receive_one(&mut on_event).await?;
1624 }
1625 self.drain_tx_queue(&mut on_event).await?;
1626 self.service_pending_ack_timeouts(&mut on_event)
1627 .map_err(|_| MacError::QueueFull)?;
1628 Ok(())
1629 }
1630
1631 pub fn earliest_deadline_ms(&self) -> Option<u64> {
1638 let mut earliest: Option<u64> = None;
1639
1640 if let Some(listen) = &self.post_tx_listen {
1641 earliest =
1642 Some(earliest.map_or(listen.deadline_ms, |e: u64| e.min(listen.deadline_ms)));
1643 }
1644
1645 for slot in self.identities.iter().filter_map(|s| s.as_ref()) {
1646 for (_, pending) in slot.pending_acks.iter() {
1647 if !matches!(pending.state, crate::AckState::Queued { .. }) {
1648 earliest = Some(earliest.map_or(pending.ack_deadline_ms, |e: u64| {
1649 e.min(pending.ack_deadline_ms)
1650 }));
1651 }
1652 if let crate::AckState::AwaitingForward {
1653 confirm_deadline_ms,
1654 } = pending.state
1655 {
1656 earliest = Some(
1657 earliest.map_or(confirm_deadline_ms, |e: u64| e.min(confirm_deadline_ms)),
1658 );
1659 }
1660 }
1661 }
1662
1663 if let Some(nb) = self.tx_queue.earliest_not_before_ms() {
1664 earliest = Some(earliest.map_or(nb, |e: u64| e.min(nb)));
1665 }
1666
1667 earliest
1668 }
1669
1670 pub async fn next_event(
1689 &mut self,
1690 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1691 ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1692 loop {
1693 self.drain_tx_queue(&mut on_event).await?;
1695
1696 let mut buf = [0u8; FRAME];
1698 enum WakeReason {
1699 Received(RxInfo),
1700 TimerExpired,
1701 }
1702 let reason = poll_fn(|cx| {
1703 match self.radio.poll_receive(cx, &mut buf) {
1705 Poll::Ready(Ok(rx)) => return Poll::Ready(Ok(WakeReason::Received(rx))),
1706 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
1707 Poll::Pending => {}
1708 }
1709
1710 let now_ms = self.clock.now_ms();
1712 if let Some(deadline) = self.earliest_deadline_ms() {
1713 if now_ms >= deadline {
1714 return Poll::Ready(Ok(WakeReason::TimerExpired));
1715 }
1716 let _ = self.clock.poll_delay_until(cx, deadline);
1718 }
1719
1720 if self.tx_queue.has_ready(now_ms) {
1722 return Poll::Ready(Ok(WakeReason::TimerExpired));
1723 }
1724
1725 Poll::Pending
1726 })
1727 .await
1728 .map_err(MacError::Radio)?;
1729
1730 match reason {
1732 WakeReason::Received(rx) => {
1733 let frame_len = rx.len.min(buf.len());
1734 let _ = self
1735 .process_received_frame(&mut buf, frame_len, &rx, &mut on_event)
1736 .await;
1737 }
1738 WakeReason::TimerExpired => {
1739 }
1741 }
1742
1743 self.drain_tx_queue(&mut on_event).await?;
1745
1746 self.service_pending_ack_timeouts(&mut on_event)
1748 .map_err(|_| MacError::QueueFull)?;
1749
1750 if !self.tx_queue.is_empty() {
1753 continue;
1754 }
1755
1756 return Ok(());
1757 }
1758 }
1759
1760 pub async fn run(
1768 &mut self,
1769 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1770 ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1771 loop {
1772 self.next_event(&mut on_event).await?;
1773 }
1774 }
1775
1776 pub async fn run_quiet(&mut self) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1782 self.run(|_, _| {}).await
1783 }
1784
1785 async fn process_received_frame(
1791 &mut self,
1792 buf: &mut [u8; FRAME],
1793 frame_len: usize,
1794 rx: &RxInfo,
1795 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1796 ) -> bool {
1797 let received_at_ms = self.clock.now_ms();
1798 let mut current_len = frame_len;
1799 let mut current_rx = RxInfo {
1800 len: frame_len,
1801 rssi: rx.rssi,
1802 snr: rx.snr,
1803 lqi: rx.lqi,
1804 };
1805 let mut current_received_at_ms = received_at_ms;
1806 let mut handled_any = false;
1807
1808 loop {
1809 let Ok(header) = PacketHeader::parse(&buf[..current_len]) else {
1810 return handled_any;
1811 };
1812 let forwarding_confirmed = if let Some((identity_id, receipt)) =
1813 self.observe_forwarding_confirmation(&buf[..current_len])
1814 {
1815 let hint = match header.source {
1816 SourceAddrRef::Hint(h) => Some(RouterHint([h.0[0], h.0[1]])),
1817 SourceAddrRef::FullKeyAt { offset } => {
1818 let mut key_bytes = [0u8; 32];
1819 key_bytes.copy_from_slice(&buf[offset..offset + 32]);
1820 let h = PublicKey(key_bytes).hint();
1821 Some(RouterHint([h.0[0], h.0[1]]))
1822 }
1823 _ => None,
1824 };
1825 on_event(
1826 identity_id,
1827 crate::MacEventRef::Forwarded {
1828 identity_id,
1829 receipt,
1830 hint,
1831 },
1832 );
1833 true
1834 } else {
1835 false
1836 };
1837
1838 let (handled, replay_target) = match header.packet_type() {
1839 PacketType::Broadcast => (
1840 self.process_broadcast(
1841 buf,
1842 current_len,
1843 &header,
1844 ¤t_rx,
1845 current_received_at_ms,
1846 &mut on_event,
1847 ),
1848 None,
1849 ),
1850 PacketType::MacAck => (
1851 self.process_mac_ack(
1852 buf,
1853 current_len,
1854 &header,
1855 ¤t_rx,
1856 forwarding_confirmed,
1857 &mut on_event,
1858 ),
1859 None,
1860 ),
1861 PacketType::Unicast | PacketType::UnicastAckReq => {
1862 self.process_unicast(
1863 buf,
1864 current_len,
1865 &header,
1866 ¤t_rx,
1867 current_received_at_ms,
1868 forwarding_confirmed,
1869 &mut on_event,
1870 )
1871 .await
1872 }
1873 PacketType::Multicast => (
1874 self.process_multicast(
1875 buf,
1876 current_len,
1877 &header,
1878 ¤t_rx,
1879 current_received_at_ms,
1880 forwarding_confirmed,
1881 &mut on_event,
1882 ),
1883 None,
1884 ),
1885 PacketType::BlindUnicast | PacketType::BlindUnicastAckReq => {
1886 self.process_blind_unicast(
1887 buf,
1888 current_len,
1889 &header,
1890 ¤t_rx,
1891 current_received_at_ms,
1892 forwarding_confirmed,
1893 &mut on_event,
1894 )
1895 .await
1896 }
1897 PacketType::Reserved5 => (false, None),
1898 };
1899 handled_any |= handled;
1900
1901 let Some((local_id, peer_id)) = replay_target else {
1902 return handled_any;
1903 };
1904 let Some(deferred) = self.take_deferred_counter_resync_frame(local_id, peer_id) else {
1905 return handled_any;
1906 };
1907 current_len = deferred.frame.len();
1908 buf[..current_len].copy_from_slice(deferred.frame.as_slice());
1909 current_rx = RxInfo {
1910 len: current_len,
1911 rssi: deferred.rssi,
1912 snr: deferred.snr,
1913 lqi: deferred.lqi,
1914 };
1915 current_received_at_ms = deferred.received_at_ms;
1916 }
1917 }
1918
1919 fn process_broadcast(
1920 &mut self,
1921 buf: &[u8; FRAME],
1922 frame_len: usize,
1923 header: &PacketHeader,
1924 rx: &RxInfo,
1925 received_at_ms: u64,
1926 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1927 ) -> bool {
1928 let Some((from_hint, from_key)) = Self::resolve_broadcast_source(&buf[..frame_len], header)
1929 else {
1930 return false;
1931 };
1932 if !Self::payload_is_allowed(header.packet_type(), &buf[header.body_range.clone()]) {
1933 return false;
1934 }
1935 let mut delivered = false;
1936 for (index, slot) in self.identities.iter().enumerate() {
1937 if slot.is_none() {
1938 continue;
1939 }
1940 delivered = true;
1941 on_event(
1942 LocalIdentityId(index as u8),
1943 crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
1944 &buf[..frame_len],
1945 &buf[header.body_range.clone()],
1946 header.clone(),
1947 ParsedOptions::extract(&buf[..frame_len], header.options_range.clone())
1948 .unwrap_or_default(),
1949 from_key,
1950 Some(from_hint),
1951 false,
1952 None,
1953 crate::send::RxMetadata::new(
1954 Some(rx.rssi),
1955 Some(rx.snr),
1956 rx.lqi,
1957 Some(received_at_ms),
1958 ),
1959 )),
1960 );
1961 }
1962 let forwarded = self.maybe_forward_received(&buf[..frame_len], header, rx, false);
1966 delivered || forwarded
1967 }
1968
1969 fn process_mac_ack(
1970 &mut self,
1971 buf: &[u8; FRAME],
1972 frame_len: usize,
1973 header: &PacketHeader,
1974 rx: &RxInfo,
1975 forwarding_confirmed: bool,
1976 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1977 ) -> bool {
1978 let Some(ack_dst) = header.ack_dst else {
1979 return false;
1980 };
1981 let target_peer = self
1982 .identities
1983 .iter()
1984 .filter_map(|slot| slot.as_ref())
1985 .find(|slot| slot.identity().public_key().hint() == ack_dst)
1986 .and_then(|slot| self.match_pending_peer_for_ack(slot, &buf[header.mic_range.clone()]));
1987 if let Some(target_peer) = target_peer {
1988 let mut ack_tag = [0u8; 8];
1989 ack_tag.copy_from_slice(&buf[header.mic_range.clone()]);
1990 if let Some((identity_id, receipt)) = self.complete_ack(&target_peer, &ack_tag) {
1991 on_event(
1992 identity_id,
1993 crate::MacEventRef::AckReceived {
1994 peer: target_peer,
1995 receipt,
1996 },
1997 );
1998 return true;
1999 }
2000 }
2001 forwarding_confirmed || self.maybe_forward_received(&buf[..frame_len], header, rx, false)
2002 }
2003
2004 async fn process_unicast(
2005 &mut self,
2006 buf: &mut [u8; FRAME],
2007 frame_len: usize,
2008 header: &PacketHeader,
2009 rx: &RxInfo,
2010 received_at_ms: u64,
2011 forwarding_confirmed: bool,
2012 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2013 ) -> (bool, Option<(LocalIdentityId, PeerId)>) {
2014 let mut original = [0u8; FRAME];
2015 original[..frame_len].copy_from_slice(&buf[..frame_len]);
2016 let mut replay_target = None;
2017 let handled = if let Some(local_id) = self.find_local_identity_for_dst(header.dst) {
2018 let mut handled = false;
2019 for (peer_id, peer_key) in
2020 self.resolve_source_peer_candidates(&buf[..frame_len], header)
2021 {
2022 let Ok(keys) = self.ensure_peer_crypto(local_id, peer_id).await else {
2023 continue;
2024 };
2025 let Ok(body_range) = self
2026 .crypto
2027 .open_packet(&mut buf[..frame_len], header, &keys)
2028 else {
2029 continue;
2030 };
2031 let payload = &buf[body_range.clone()];
2032 if !Self::payload_is_allowed(header.packet_type(), payload) {
2033 continue;
2034 }
2035 match self.unicast_replay_verdict(local_id, peer_id, header, &buf[..frame_len]) {
2036 Some(ReplayVerdict::Accept) => {
2037 let _ = self.accept_unicast_replay(
2038 local_id,
2039 peer_id,
2040 header,
2041 &buf[..frame_len],
2042 );
2043 }
2044 Some(ReplayVerdict::OutOfWindow | ReplayVerdict::Stale) => {
2045 if self.try_accept_counter_resync_response(
2046 local_id,
2047 peer_id,
2048 header,
2049 &buf[..frame_len],
2050 payload,
2051 ) {
2052 replay_target = Some((local_id, peer_id));
2053 } else {
2054 self.store_deferred_counter_resync_frame(
2055 local_id,
2056 peer_id,
2057 &original[..frame_len],
2058 rx,
2059 received_at_ms,
2060 );
2061 self.maybe_request_counter_resync(local_id, peer_id, peer_key)
2062 .await;
2063 continue;
2064 }
2065 }
2066 Some(ReplayVerdict::Replay) | None => continue,
2067 }
2068 self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2069
2070 if header.ack_requested()
2071 && self.should_emit_destination_ack(&buf[..frame_len], header)
2072 {
2073 let ack_tag = self.compute_received_ack_tag(
2074 &buf[..frame_len],
2075 header,
2076 body_range.clone(),
2077 &keys,
2078 );
2079 self.queue_mac_ack_for_peer(peer_id, peer_key.hint(), ack_tag)
2080 .ok();
2081 }
2082
2083 if let Some(data) = Self::echo_request_data(payload) {
2084 let response =
2085 Self::build_echo_command_payload(MAC_COMMAND_ECHO_RESPONSE_ID, data);
2086 let _ = self
2087 .send_unicast(
2088 local_id,
2089 &peer_key,
2090 response.as_slice(),
2091 &SendOptions::default(),
2092 )
2093 .await;
2094 }
2095
2096 on_event(
2097 local_id,
2098 crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2099 &original[..frame_len],
2100 &buf[body_range],
2101 header.clone(),
2102 ParsedOptions::extract(
2103 &original[..frame_len],
2104 header.options_range.clone(),
2105 )
2106 .unwrap_or_default(),
2107 Some(peer_key),
2108 Some(peer_key.hint()),
2109 true,
2110 None,
2111 crate::send::RxMetadata::new(
2112 Some(rx.rssi),
2113 Some(rx.snr),
2114 rx.lqi,
2115 Some(received_at_ms),
2116 ),
2117 )),
2118 );
2119 handled = true;
2120 break;
2121 }
2122 handled
2123 } else {
2124 false
2125 };
2126 let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, handled);
2127 (
2128 handled || forwarding_confirmed || forwarded,
2129 handled.then_some(()).and(replay_target),
2130 )
2131 }
2132
2133 fn process_multicast(
2134 &mut self,
2135 buf: &mut [u8; FRAME],
2136 frame_len: usize,
2137 header: &PacketHeader,
2138 rx: &RxInfo,
2139 received_at_ms: u64,
2140 forwarding_confirmed: bool,
2141 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2142 ) -> bool {
2143 let mut original = [0u8; FRAME];
2144 original[..frame_len].copy_from_slice(&buf[..frame_len]);
2145 let delivered = if let Some(channel_id) = header.channel {
2146 let channel_info = {
2147 self.channels
2148 .lookup_by_id(&channel_id)
2149 .next()
2150 .map(|channel| (channel.channel_key.clone(), channel.derived.clone()))
2151 };
2152 if let Some((channel_key, derived)) = channel_info {
2153 let keys = PairwiseKeys {
2154 k_enc: derived.k_enc,
2155 k_mic: derived.k_mic,
2156 };
2157 if let Ok(body_range) =
2158 self.crypto
2159 .open_packet(&mut buf[..frame_len], header, &keys)
2160 {
2161 if !Self::payload_is_allowed(header.packet_type(), &buf[body_range.clone()]) {
2162 false
2163 } else if let Some(source) =
2164 self.resolve_multicast_source(&buf[..frame_len], header)
2165 {
2166 let accepted = if let Some(peer_id) = source.peer_id {
2167 let accepted = self.accept_multicast_replay(
2168 channel_id,
2169 peer_id,
2170 header,
2171 &buf[..frame_len],
2172 );
2173 if accepted {
2174 self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2175 }
2176 accepted
2177 } else {
2178 self.accept_unknown_multicast_replay(header, &buf[..frame_len])
2179 };
2180 if accepted {
2181 let mut delivered = false;
2182 for (index, slot) in self.identities.iter().enumerate() {
2183 if slot.is_none() {
2184 continue;
2185 }
2186 delivered = true;
2187 on_event(
2188 LocalIdentityId(index as u8),
2189 crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2190 &original[..frame_len],
2191 &buf[body_range.clone()],
2192 header.clone(),
2193 ParsedOptions::extract(
2194 &original[..frame_len],
2195 header.options_range.clone(),
2196 )
2197 .unwrap_or_default(),
2198 source.public_key,
2199 source
2200 .hint
2201 .or_else(|| source.public_key.map(|key| key.hint())),
2202 true,
2203 Some(crate::ChannelInfoRef {
2204 id: channel_id,
2205 key: &channel_key,
2206 }),
2207 crate::send::RxMetadata::new(
2208 Some(rx.rssi),
2209 Some(rx.snr),
2210 rx.lqi,
2211 Some(received_at_ms),
2212 ),
2213 )),
2214 );
2215 }
2216 delivered
2217 } else {
2218 false
2219 }
2220 } else {
2221 false
2222 }
2223 } else {
2224 false
2225 }
2226 } else {
2227 false
2228 }
2229 } else {
2230 false
2231 };
2232 let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, false);
2233 delivered || forwarding_confirmed || forwarded
2234 }
2235
2236 async fn process_blind_unicast(
2237 &mut self,
2238 buf: &mut [u8; FRAME],
2239 frame_len: usize,
2240 header: &PacketHeader,
2241 rx: &RxInfo,
2242 received_at_ms: u64,
2243 forwarding_confirmed: bool,
2244 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2245 ) -> (bool, Option<(LocalIdentityId, PeerId)>) {
2246 let mut original = [0u8; FRAME];
2247 original[..frame_len].copy_from_slice(&buf[..frame_len]);
2248 let mut replay_target = None;
2249 let handled = if let Some(channel_id) = header.channel {
2250 let channel_candidates: Vec<(ChannelKey, DerivedChannelKeys), CHANNELS> = self
2251 .channels
2252 .lookup_by_id(&channel_id)
2253 .map(|channel| (channel.channel_key.clone(), channel.derived.clone()))
2254 .collect();
2255 if channel_candidates.is_empty() {
2256 false
2257 } else {
2258 let mut handled = false;
2259 for (resolved_channel_key, channel_keys) in channel_candidates {
2260 buf[..frame_len].copy_from_slice(&original[..frame_len]);
2261 let Ok((dst, source_addr)) = self.crypto.decrypt_blind_addr(
2262 &mut buf[..frame_len],
2263 header,
2264 &channel_keys,
2265 ) else {
2266 continue;
2267 };
2268 let Some(local_id) = self.find_local_identity_for_dst(Some(dst)) else {
2269 continue;
2270 };
2271 for (peer_id, peer_key) in
2272 self.resolve_blind_source_peer_candidates(&buf[..frame_len], source_addr)
2273 {
2274 let Ok(pairwise_keys) = self.ensure_peer_crypto(local_id, peer_id).await
2275 else {
2276 continue;
2277 };
2278 let blind_keys =
2279 self.crypto.derive_blind_keys(&pairwise_keys, &channel_keys);
2280 let body_range = match self.crypto.open_packet(
2281 &mut buf[..frame_len],
2282 header,
2283 &blind_keys,
2284 ) {
2285 Ok(range) => range,
2286 Err(_) => continue,
2287 };
2288 let payload = &buf[body_range.clone()];
2289 if !Self::payload_is_allowed(header.packet_type(), payload) {
2290 continue;
2291 }
2292 match self.unicast_replay_verdict(
2293 local_id,
2294 peer_id,
2295 header,
2296 &buf[..frame_len],
2297 ) {
2298 Some(ReplayVerdict::Accept) => {
2299 let _ = self.accept_unicast_replay(
2300 local_id,
2301 peer_id,
2302 header,
2303 &buf[..frame_len],
2304 );
2305 }
2306 Some(ReplayVerdict::OutOfWindow | ReplayVerdict::Stale) => {
2307 if self.try_accept_counter_resync_response(
2308 local_id,
2309 peer_id,
2310 header,
2311 &buf[..frame_len],
2312 payload,
2313 ) {
2314 replay_target = Some((local_id, peer_id));
2315 } else {
2316 self.store_deferred_counter_resync_frame(
2317 local_id,
2318 peer_id,
2319 &original[..frame_len],
2320 rx,
2321 received_at_ms,
2322 );
2323 self.maybe_request_counter_resync(local_id, peer_id, peer_key)
2324 .await;
2325 continue;
2326 }
2327 }
2328 Some(ReplayVerdict::Replay) | None => continue,
2329 }
2330 self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2331
2332 if header.ack_requested()
2333 && self.should_emit_destination_ack(&buf[..frame_len], header)
2334 {
2335 let ack_tag = self.compute_received_ack_tag(
2336 &buf[..frame_len],
2337 header,
2338 body_range.clone(),
2339 &blind_keys,
2340 );
2341 self.queue_mac_ack_for_peer(peer_id, peer_key.hint(), ack_tag)
2342 .ok();
2343 }
2344
2345 if let Some(data) = Self::echo_request_data(payload) {
2346 let response = Self::build_echo_command_payload(
2347 MAC_COMMAND_ECHO_RESPONSE_ID,
2348 data,
2349 );
2350 let _ = self
2351 .send_unicast(
2352 local_id,
2353 &peer_key,
2354 response.as_slice(),
2355 &SendOptions::default(),
2356 )
2357 .await;
2358 }
2359
2360 on_event(
2361 local_id,
2362 crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2363 &original[..frame_len],
2364 &buf[body_range],
2365 header.clone(),
2366 ParsedOptions::extract(
2367 &original[..frame_len],
2368 header.options_range.clone(),
2369 )
2370 .unwrap_or_default(),
2371 Some(peer_key),
2372 Some(peer_key.hint()),
2373 true,
2374 Some(crate::ChannelInfoRef {
2375 id: channel_id,
2376 key: &resolved_channel_key,
2377 }),
2378 crate::send::RxMetadata::new(
2379 Some(rx.rssi),
2380 Some(rx.snr),
2381 rx.lqi,
2382 Some(received_at_ms),
2383 ),
2384 )),
2385 );
2386 handled = true;
2387 break;
2388 }
2389 if handled {
2390 break;
2391 }
2392 }
2393 handled
2394 }
2395 } else {
2396 false
2397 };
2398 let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, handled);
2399 (
2400 handled || forwarding_confirmed || forwarded,
2401 handled.then_some(()).and(replay_target),
2402 )
2403 }
2404
2405 pub async fn receive_one(
2411 &mut self,
2412 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2413 ) -> Result<bool, MacError<<P::Radio as Radio>::Error>> {
2414 let mut buf = [0u8; FRAME];
2415 let Some(rx) = poll_fn(|cx| match self.radio.poll_receive(cx, &mut buf) {
2416 Poll::Ready(Ok(rx)) => Poll::Ready(Ok(Some(rx))),
2417 Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
2418 Poll::Pending => Poll::Ready(Ok(None)),
2419 })
2420 .await
2421 .map_err(MacError::Radio)?
2422 else {
2423 return Ok(false);
2424 };
2425
2426 let frame_len = rx.len.min(buf.len());
2427 Ok(self
2428 .process_received_frame(&mut buf, frame_len, &rx, &mut on_event)
2429 .await)
2430 }
2431
2432 pub fn complete_ack(
2434 &mut self,
2435 peer: &PublicKey,
2436 ack_tag: &[u8; 8],
2437 ) -> Option<(LocalIdentityId, SendReceipt)> {
2438 for (index, slot) in self.identities.iter_mut().enumerate() {
2439 let Some(slot) = slot.as_mut() else {
2440 continue;
2441 };
2442
2443 let receipt = slot.pending_acks.iter().find_map(|(receipt, pending)| {
2444 (pending.peer == *peer && pending.ack_tag == *ack_tag).then_some(*receipt)
2445 });
2446
2447 if let Some(receipt) = receipt {
2448 slot.pending_acks.remove(&receipt);
2449 if self
2450 .post_tx_listen
2451 .as_ref()
2452 .map(|listen| {
2453 listen.identity_id == LocalIdentityId(index as u8)
2454 && listen.receipt == receipt
2455 })
2456 .unwrap_or(false)
2457 {
2458 self.post_tx_listen = None;
2459 }
2460 return Some((LocalIdentityId(index as u8), receipt));
2461 }
2462 }
2463
2464 None
2465 }
2466
2467 pub fn service_pending_ack_timeouts(
2469 &mut self,
2470 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2471 ) -> Result<(), CapacityError> {
2472 self.expire_post_tx_listen_if_needed();
2473
2474 #[derive(Clone)]
2475 enum Action<const FRAME: usize> {
2476 Retry {
2477 receipt: SendReceipt,
2478 resend: ResendRecord<FRAME>,
2479 not_before_ms: u64,
2480 },
2481 RouteRetry {
2482 receipt: SendReceipt,
2483 peer: PublicKey,
2484 resend: ResendRecord<FRAME>,
2485 not_before_ms: u64,
2486 },
2487 Timeout {
2488 receipt: SendReceipt,
2489 peer: PublicKey,
2490 },
2491 }
2492
2493 let now_ms = self.clock.now_ms();
2494 let t_frame_ms = self.radio.t_frame_ms();
2495
2496 for index in 0..self.identities.len() {
2497 let identity_id = LocalIdentityId(index as u8);
2498 let actions = {
2499 let Some(slot) = self.identities[index].as_mut() else {
2500 continue;
2501 };
2502
2503 let mut actions: Vec<Action<FRAME>, ACKS> = Vec::new();
2504 for (receipt, pending) in slot.pending_acks.iter_mut() {
2505 if !matches!(pending.state, crate::AckState::Queued { .. })
2506 && now_ms >= pending.ack_deadline_ms
2507 {
2508 if Self::can_attempt_route_retry(pending) {
2509 let backoff_cap_ms =
2510 Self::forward_retry_backoff_cap_ms_for_t_frame(t_frame_ms, 1);
2511 let backoff_ms = if backoff_cap_ms == 0 {
2512 0
2513 } else {
2514 u64::from(self.rng.random_range(..backoff_cap_ms.saturating_add(1)))
2515 };
2516 actions
2517 .push(Action::RouteRetry {
2518 receipt: *receipt,
2519 peer: pending.peer,
2520 resend: pending.resend.clone(),
2521 not_before_ms: now_ms.saturating_add(backoff_ms),
2522 })
2523 .map_err(|_| CapacityError)?;
2524 } else {
2525 actions
2526 .push(Action::Timeout {
2527 receipt: *receipt,
2528 peer: pending.peer,
2529 })
2530 .map_err(|_| CapacityError)?;
2531 }
2532 continue;
2533 }
2534
2535 if let crate::AckState::AwaitingForward {
2536 confirm_deadline_ms,
2537 } = pending.state
2538 {
2539 if now_ms >= confirm_deadline_ms && pending.retries < MAX_FORWARD_RETRIES {
2540 pending.retries = pending.retries.saturating_add(1);
2541 let backoff_cap_ms = Self::forward_retry_backoff_cap_ms_for_t_frame(
2542 t_frame_ms,
2543 pending.retries,
2544 );
2545 let backoff_ms = if backoff_cap_ms == 0 {
2546 0
2547 } else {
2548 u64::from(self.rng.random_range(..backoff_cap_ms.saturating_add(1)))
2549 };
2550 let not_before_ms = now_ms.saturating_add(backoff_ms);
2551 pending.state = crate::AckState::RetryQueued;
2552 actions
2553 .push(Action::Retry {
2554 receipt: *receipt,
2555 resend: pending.resend.clone(),
2556 not_before_ms,
2557 })
2558 .map_err(|_| CapacityError)?;
2559 }
2560 }
2561 }
2562 actions
2563 };
2564
2565 for action in actions {
2566 match action {
2567 Action::Retry {
2568 receipt,
2569 resend,
2570 not_before_ms,
2571 } => {
2572 self.tx_queue.enqueue_with_state(
2573 TxPriority::Retry,
2574 resend.frame.as_slice(),
2575 Some(receipt),
2576 Some(identity_id),
2577 not_before_ms,
2578 0,
2579 0,
2580 )?;
2581 }
2582 Action::RouteRetry {
2583 receipt,
2584 peer,
2585 resend,
2586 not_before_ms,
2587 } => {
2588 if let Some(rewritten) = self.synthesize_route_retry_resend(&peer, &resend)
2589 {
2590 if let Some(pending) = self
2591 .identity_mut(identity_id)
2592 .and_then(|slot| slot.pending_ack_mut(&receipt))
2593 {
2594 pending.resend = rewritten.clone();
2595 pending.retries = 0;
2596 pending.sent_ms = 0;
2597 pending.ack_deadline_ms = 0;
2598 pending.state = crate::AckState::RetryQueued;
2599 }
2600 self.tx_queue.enqueue_with_state(
2601 TxPriority::Retry,
2602 rewritten.frame.as_slice(),
2603 Some(receipt),
2604 Some(identity_id),
2605 not_before_ms,
2606 0,
2607 0,
2608 )?;
2609 } else {
2610 if let Some(slot) = self.identity_mut(identity_id) {
2611 slot.pending_acks.remove(&receipt);
2612 }
2613 on_event(
2614 identity_id,
2615 crate::MacEventRef::AckTimeout { peer, receipt },
2616 );
2617 }
2618 }
2619 Action::Timeout { receipt, peer } => {
2620 if let Some(slot) = self.identity_mut(identity_id) {
2621 slot.pending_acks.remove(&receipt);
2622 }
2623 on_event(
2624 identity_id,
2625 crate::MacEventRef::AckTimeout { peer, receipt },
2626 );
2627 }
2628 }
2629 }
2630 }
2631
2632 Ok(())
2633 }
2634
2635 pub fn cancel_pending_ack(
2641 &mut self,
2642 identity_id: LocalIdentityId,
2643 receipt: SendReceipt,
2644 ) -> bool {
2645 let removed = self
2646 .identity_mut(identity_id)
2647 .and_then(|slot| slot.remove_pending_ack(&receipt))
2648 .is_some();
2649
2650 self.tx_queue.remove_first_matching(|entry| {
2652 entry.receipt == Some(receipt) && entry.identity_id == Some(identity_id)
2653 });
2654
2655 if let Some(listen) = &self.post_tx_listen {
2657 if listen.identity_id == identity_id && listen.receipt == receipt {
2658 self.post_tx_listen = None;
2659 }
2660 }
2661
2662 removed
2663 }
2664
2665 fn identity_and_advance(
2666 &mut self,
2667 from: LocalIdentityId,
2668 ) -> Result<(PublicKey, u32), SendError> {
2669 let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
2670 if slot.counter_window_exhausted() {
2671 return Err(SendError::CounterPersistenceLag);
2672 }
2673 let source_key = *slot.identity().public_key();
2674 let frame_counter = slot.advance_frame_counter();
2675 slot.schedule_counter_persist_if_needed();
2676 Ok((source_key, frame_counter))
2677 }
2678
2679 fn take_salt(&mut self, options: &SendOptions) -> Option<u16> {
2680 options.salt.then(|| self.rng.next_u32() as u16)
2681 }
2682
2683 fn enforce_send_policy(
2684 &self,
2685 channel_id: Option<ChannelId>,
2686 options: &SendOptions,
2687 blind_unicast: bool,
2688 ) -> Result<(), SendError> {
2689 let _authority = self.classify_send_authority(options, blind_unicast)?;
2690
2691 let Some(channel_id) = channel_id else {
2692 return Ok(());
2693 };
2694 let Some(policy) = self
2695 .operating_policy
2696 .channel_policies
2697 .iter()
2698 .find(|policy| policy.channel_id == channel_id)
2699 else {
2700 return Ok(());
2701 };
2702
2703 if policy.require_unencrypted && options.encrypted {
2704 return Err(SendError::PolicyViolation);
2705 }
2706 if policy.require_full_source && !options.full_source {
2707 return Err(SendError::PolicyViolation);
2708 }
2709 if let Some(max_flood_hops) = policy.max_flood_hops {
2710 if options
2711 .flood_hops
2712 .map(|hops| hops > max_flood_hops)
2713 .unwrap_or(false)
2714 {
2715 return Err(SendError::PolicyViolation);
2716 }
2717 }
2718
2719 Ok(())
2720 }
2721
2722 fn classify_send_authority(
2723 &self,
2724 options: &SendOptions,
2725 _blind_unicast: bool,
2726 ) -> Result<TransmitAuthority, SendError> {
2727 match self.operating_policy.amateur_radio_mode {
2728 AmateurRadioMode::Unlicensed => Ok(TransmitAuthority::Unlicensed),
2729 AmateurRadioMode::LicensedOnly => {
2730 if options.encrypted || self.operating_policy.operator_callsign.is_none() {
2731 return Err(SendError::PolicyViolation);
2732 }
2733 Ok(TransmitAuthority::Amateur)
2734 }
2735 AmateurRadioMode::Hybrid => {
2736 if options.encrypted {
2737 Ok(TransmitAuthority::Unlicensed)
2740 } else if self.operating_policy.operator_callsign.is_some() {
2741 Ok(TransmitAuthority::Amateur)
2742 } else {
2743 Ok(TransmitAuthority::Unlicensed)
2744 }
2745 }
2746 }
2747 }
2748
2749 fn enqueue_packet(
2750 &mut self,
2751 packet: UnsealedPacket<'_>,
2752 receipt: Option<SendReceipt>,
2753 identity_id: Option<LocalIdentityId>,
2754 ) -> Result<(), SendError> {
2755 if packet.total_len() > self.radio.max_frame_size() {
2756 return Err(SendError::Build(BuildError::BufferTooSmall));
2757 }
2758 self.tx_queue
2759 .enqueue(
2760 TxPriority::Application,
2761 packet.as_bytes(),
2762 receipt,
2763 identity_id,
2764 )
2765 .map_err(|_| SendError::QueueFull)?;
2766 Ok(())
2767 }
2768
2769 fn refresh_pending_resend(
2770 &mut self,
2771 from: LocalIdentityId,
2772 receipt: SendReceipt,
2773 frame: &[u8],
2774 source_route: Option<&[RouterHint]>,
2775 ) -> Result<(), SendError> {
2776 let resend =
2777 ResendRecord::try_new(frame, source_route).map_err(|_| SendError::QueueFull)?;
2778 let pending = self
2779 .identity_mut(from)
2780 .ok_or(SendError::IdentityMissing)?
2781 .pending_ack_mut(&receipt)
2782 .ok_or(SendError::IdentityMissing)?;
2783 pending.resend = resend;
2784 Ok(())
2785 }
2786
2787 fn prepare_pending_ack(
2788 &mut self,
2789 from: LocalIdentityId,
2790 peer: PublicKey,
2791 packet: &UnsealedPacket<'_>,
2792 keys: &PairwiseKeys,
2793 options: &SendOptions,
2794 ) -> Result<SendReceipt, SendError> {
2795 let header = PacketHeader::parse(packet.as_bytes())?;
2796 let mut cmac: CmacState<_> = self.crypto.cmac_state(&keys.k_mic);
2797 feed_aad(&header, packet.as_bytes(), |chunk| cmac.update(chunk));
2798 cmac.update(packet.body());
2799 let full_mac = cmac.finalize();
2800 let ack_tag = self.crypto.compute_ack_tag(&full_mac, &keys.k_enc);
2801 let is_forwarded = options
2802 .source_route
2803 .as_ref()
2804 .map(|route| !route.is_empty())
2805 .unwrap_or(false)
2806 || options.flood_hops.unwrap_or(0) > 0;
2807 let resend = ResendRecord::try_new(
2808 packet.as_bytes(),
2809 options.source_route.as_ref().map(|route| route.as_slice()),
2810 )
2811 .map_err(|_| SendError::QueueFull)?;
2812
2813 let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
2814 let receipt = slot.next_receipt();
2815 let pending = if is_forwarded {
2816 PendingAck::forwarded(ack_tag, peer, resend)
2817 } else {
2818 PendingAck::direct(ack_tag, peer, resend)
2819 };
2820 slot.try_insert_pending_ack(receipt, pending)
2821 .map_err(|_| SendError::PendingAckFull)?;
2822 Ok(receipt)
2823 }
2824
2825 async fn derive_pairwise_keys_for_peer(
2826 &self,
2827 local_id: LocalIdentityId,
2828 peer_key: &PublicKey,
2829 ) -> Result<PairwiseKeys, SendError> {
2830 let shared_secret = {
2831 let slot = self.identity(local_id).ok_or(SendError::IdentityMissing)?;
2832 match slot.identity() {
2833 LocalIdentity::LongTerm(identity) => identity
2834 .agree(peer_key)
2835 .await
2836 .map_err(|_| SendError::IdentityAgreementFailed)?,
2837 #[cfg(feature = "software-crypto")]
2838 LocalIdentity::Ephemeral(identity) => identity
2839 .agree(peer_key)
2840 .await
2841 .map_err(|_| SendError::IdentityAgreementFailed)?,
2842 }
2843 };
2844
2845 Ok(self.crypto.derive_pairwise_keys(&shared_secret))
2846 }
2847
2848 fn effective_source_route(
2849 &self,
2850 peer_id: PeerId,
2851 options: &SendOptions,
2852 ) -> Option<Vec<RouterHint, MAX_SOURCE_ROUTE_HOPS>> {
2853 if let Some(route) = options.source_route.as_ref() {
2854 return Some(route.clone());
2855 }
2856
2857 let Some(peer) = self.peer_registry.get(peer_id) else {
2858 return None;
2859 };
2860 match peer.route.as_ref() {
2861 Some(CachedRoute::Source(route)) => Some(route.clone()),
2862 _ => None,
2863 }
2864 }
2865
2866 fn cache_peer_crypto(
2867 &mut self,
2868 local_id: LocalIdentityId,
2869 peer_id: PeerId,
2870 pairwise_keys: PairwiseKeys,
2871 ) -> Result<(), SendError> {
2872 let slot = self
2873 .identity_mut(local_id)
2874 .ok_or(SendError::IdentityMissing)?;
2875 if slot.peer_crypto().get(&peer_id).is_some() {
2876 return Ok(());
2877 }
2878 slot.peer_crypto_mut()
2879 .insert(
2880 peer_id,
2881 crate::peers::PeerCryptoState {
2882 pairwise_keys,
2883 replay_window: ReplayWindow::new(),
2884 },
2885 )
2886 .map_err(|_| SendError::QueueFull)?;
2887 Ok(())
2888 }
2889
2890 async fn ensure_peer_crypto(
2891 &mut self,
2892 local_id: LocalIdentityId,
2893 peer_id: PeerId,
2894 ) -> Result<PairwiseKeys, SendError> {
2895 if let Some(keys) = self
2896 .identity(local_id)
2897 .and_then(|slot| slot.peer_crypto().get(&peer_id))
2898 .map(|state| state.pairwise_keys.clone())
2899 {
2900 return Ok(keys);
2901 }
2902
2903 let peer_key = self
2904 .peer_registry
2905 .get(peer_id)
2906 .ok_or(SendError::PeerMissing)?
2907 .public_key;
2908 let pairwise_keys = self
2909 .derive_pairwise_keys_for_peer(local_id, &peer_key)
2910 .await?;
2911 self.cache_peer_crypto(local_id, peer_id, pairwise_keys.clone())?;
2912 Ok(pairwise_keys)
2913 }
2914
2915 fn insert_identity(
2916 &mut self,
2917 identity: LocalIdentity<P::Identity>,
2918 pfs_parent: Option<LocalIdentityId>,
2919 ) -> Result<LocalIdentityId, CapacityError> {
2920 let initial_frame_counter = self.rng.next_u32();
2921
2922 if let Some((index, slot)) = self
2923 .identities
2924 .iter_mut()
2925 .enumerate()
2926 .find(|(_, slot)| slot.is_none())
2927 {
2928 *slot = Some(IdentitySlot::new(
2929 identity,
2930 initial_frame_counter,
2931 pfs_parent,
2932 ));
2933 return Ok(LocalIdentityId(index as u8));
2934 }
2935
2936 let next_id = self.identities.len();
2937 self.identities
2938 .push(Some(IdentitySlot::new(
2939 identity,
2940 initial_frame_counter,
2941 pfs_parent,
2942 )))
2943 .map_err(|_| CapacityError)?;
2944 Ok(LocalIdentityId(next_id as u8))
2945 }
2946
2947 fn compute_received_ack_tag(
2948 &self,
2949 buf: &[u8],
2950 header: &PacketHeader,
2951 body_range: core::ops::Range<usize>,
2952 keys: &PairwiseKeys,
2953 ) -> [u8; 8] {
2954 let mut cmac: CmacState<_> = self.crypto.cmac_state(&keys.k_mic);
2955 feed_aad(header, buf, |chunk| cmac.update(chunk));
2956 cmac.update(&buf[body_range]);
2957 let full_mac = cmac.finalize();
2958 self.crypto.compute_ack_tag(&full_mac, &keys.k_enc)
2959 }
2960
2961 fn requeue_tx(&mut self, queued: &crate::QueuedTx<FRAME>) -> Result<u32, CapacityError> {
2962 self.tx_queue.enqueue_with_state(
2963 queued.priority,
2964 queued.frame.as_slice(),
2965 queued.receipt,
2966 queued.identity_id,
2967 queued.not_before_ms,
2968 queued.cad_attempts,
2969 queued.forward_deferrals,
2970 )
2971 }
2972
2973 fn accept_unicast_replay(
2974 &mut self,
2975 local_id: LocalIdentityId,
2976 peer_id: PeerId,
2977 header: &PacketHeader,
2978 frame: &[u8],
2979 ) -> bool {
2980 let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
2981 return false;
2982 };
2983 let now_ms = self.clock.now_ms();
2984 let Some(window) = self
2985 .identity_mut(local_id)
2986 .and_then(|slot| slot.peer_crypto_mut().get_mut(&peer_id))
2987 .map(|state| &mut state.replay_window)
2988 else {
2989 return false;
2990 };
2991
2992 if window.check(counter, mic, now_ms) != ReplayVerdict::Accept {
2993 return false;
2994 }
2995
2996 window.accept(counter, mic, now_ms);
2997 true
2998 }
2999
3000 fn unicast_replay_verdict(
3001 &mut self,
3002 local_id: LocalIdentityId,
3003 peer_id: PeerId,
3004 header: &PacketHeader,
3005 frame: &[u8],
3006 ) -> Option<ReplayVerdict> {
3007 let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3008 return None;
3009 };
3010 let now_ms = self.clock.now_ms();
3011 self.identity_mut(local_id)
3012 .and_then(|slot| slot.peer_crypto_mut().get_mut(&peer_id))
3013 .map(|state| state.replay_window.check(counter, mic, now_ms))
3014 }
3015
3016 fn try_accept_counter_resync_response(
3017 &mut self,
3018 local_id: LocalIdentityId,
3019 peer_id: PeerId,
3020 header: &PacketHeader,
3021 frame: &[u8],
3022 payload: &[u8],
3023 ) -> bool {
3024 let Some(nonce) = Self::echo_response_nonce(payload) else {
3025 return false;
3026 };
3027 let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3028 return false;
3029 };
3030 let now_ms = self.clock.now_ms();
3031 let Some(slot) = self.identity_mut(local_id) else {
3032 return false;
3033 };
3034 let Some(pending) = slot.pending_counter_resync().get(&peer_id).copied() else {
3035 return false;
3036 };
3037 if pending.nonce != nonce {
3038 return false;
3039 }
3040 let Some(state) = slot.peer_crypto_mut().get_mut(&peer_id) else {
3041 return false;
3042 };
3043 state.replay_window.reset(counter, now_ms);
3044 state.replay_window.accept(counter, mic, now_ms);
3045 let _ = slot.pending_counter_resync_mut().remove(&peer_id);
3046 true
3047 }
3048
3049 async fn maybe_request_counter_resync(
3050 &mut self,
3051 local_id: LocalIdentityId,
3052 peer_id: PeerId,
3053 peer_key: PublicKey,
3054 ) {
3055 let now_ms = self.clock.now_ms();
3056 let should_send = {
3057 let Some(slot) = self.identity(local_id) else {
3058 return;
3059 };
3060 match slot.pending_counter_resync().get(&peer_id).copied() {
3061 Some(pending) => {
3062 now_ms.saturating_sub(pending.requested_ms) >= COUNTER_RESYNC_REQUEST_RETRY_MS
3063 }
3064 None => true,
3065 }
3066 };
3067 if !should_send {
3068 return;
3069 }
3070
3071 let nonce = self.rng.next_u32();
3072 let payload =
3073 Self::build_echo_command_payload(MAC_COMMAND_ECHO_REQUEST_ID, &nonce.to_be_bytes());
3074 let options = SendOptions::default();
3075 if self
3076 .send_unicast(local_id, &peer_key, payload.as_slice(), &options)
3077 .await
3078 .is_ok()
3079 {
3080 if let Some(slot) = self.identity_mut(local_id) {
3081 let _ = slot.pending_counter_resync_mut().insert(
3082 peer_id,
3083 PendingCounterResync {
3084 nonce,
3085 requested_ms: now_ms,
3086 },
3087 );
3088 }
3089 }
3090 }
3091
3092 fn store_deferred_counter_resync_frame(
3093 &mut self,
3094 local_id: LocalIdentityId,
3095 peer_id: PeerId,
3096 frame: &[u8],
3097 rx: &RxInfo,
3098 received_at_ms: u64,
3099 ) {
3100 let mut stored = Vec::new();
3101 stored
3102 .extend_from_slice(frame)
3103 .expect("received frame length must fit configured frame capacity");
3104 self.deferred_counter_resync_frame = Some(DeferredCounterResyncFrame {
3105 local_id,
3106 peer_id,
3107 frame: stored,
3108 rssi: rx.rssi,
3109 snr: rx.snr,
3110 lqi: rx.lqi,
3111 received_at_ms,
3112 });
3113 }
3114
3115 fn take_deferred_counter_resync_frame(
3116 &mut self,
3117 local_id: LocalIdentityId,
3118 peer_id: PeerId,
3119 ) -> Option<DeferredCounterResyncFrame<FRAME>> {
3120 match self.deferred_counter_resync_frame.as_ref() {
3121 Some(deferred) if deferred.local_id == local_id && deferred.peer_id == peer_id => {
3122 self.deferred_counter_resync_frame.take()
3123 }
3124 _ => None,
3125 }
3126 }
3127
3128 fn accept_multicast_replay(
3129 &mut self,
3130 channel_id: ChannelId,
3131 peer_id: PeerId,
3132 header: &PacketHeader,
3133 frame: &[u8],
3134 ) -> bool {
3135 let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3136 return false;
3137 };
3138 let now_ms = self.clock.now_ms();
3139 let Some(channel) = self.channels.get_mut_by_id(&channel_id) else {
3140 return false;
3141 };
3142
3143 if let Some(window) = channel.replay.get_mut(&peer_id) {
3144 if window.check(counter, mic, now_ms) != ReplayVerdict::Accept {
3145 return false;
3146 }
3147 window.accept(counter, mic, now_ms);
3148 return true;
3149 }
3150
3151 let mut window = ReplayWindow::new();
3152 window.accept(counter, mic, now_ms);
3153 channel.replay.insert(peer_id, window).is_ok()
3154 }
3155
3156 fn clear_peer_slot_state(&mut self, peer_id: PeerId) {
3157 for slot in self.identities.iter_mut().filter_map(|slot| slot.as_mut()) {
3158 let _ = slot.peer_crypto_mut().remove(&peer_id);
3159 let _ = slot.pending_counter_resync_mut().remove(&peer_id);
3160 }
3161 for channel in self.channels.iter_mut() {
3162 let _ = channel.replay.remove(&peer_id);
3163 }
3164 }
3165
3166 fn try_auto_register_peer(&mut self, key: PublicKey) -> Result<PeerId, CapacityError> {
3167 let now_ms = self.clock.now_ms();
3168 let outcome = self.peer_registry.try_insert_or_update_auto(key, now_ms)?;
3169 if outcome.evicted_key.is_some() {
3170 self.clear_peer_slot_state(outcome.peer_id);
3171 }
3172 Ok(outcome.peer_id)
3173 }
3174
3175 fn replay_metadata<'a>(header: &PacketHeader, frame: &'a [u8]) -> Option<(u32, &'a [u8])> {
3176 let counter = header.sec_info?.frame_counter;
3177 let mic = frame.get(header.mic_range.clone())?;
3178 Some((counter, mic))
3179 }
3180
3181 fn build_echo_command_payload(command_id: u8, data: &[u8]) -> Vec<u8, FRAME> {
3182 let mut payload = Vec::new();
3183 let _ = payload.push(PayloadType::MacCommand as u8);
3184 let _ = payload.push(command_id);
3185 let _ = payload.extend_from_slice(data);
3186 payload
3187 }
3188
3189 fn echo_request_data(payload: &[u8]) -> Option<&[u8]> {
3190 let (&payload_type, rest) = payload.split_first()?;
3191 let (&command_id, data) = rest.split_first()?;
3192 if PayloadType::from_byte(payload_type)? != PayloadType::MacCommand
3193 || command_id != MAC_COMMAND_ECHO_REQUEST_ID
3194 {
3195 return None;
3196 }
3197 Some(data)
3198 }
3199
3200 fn echo_response_nonce(payload: &[u8]) -> Option<u32> {
3201 let (&payload_type, rest) = payload.split_first()?;
3202 let (&command_id, data) = rest.split_first()?;
3203 if PayloadType::from_byte(payload_type)? != PayloadType::MacCommand
3204 || command_id != MAC_COMMAND_ECHO_RESPONSE_ID
3205 || data.len() != COUNTER_RESYNC_NONCE_LEN
3206 {
3207 return None;
3208 }
3209 Some(u32::from_be_bytes(data.try_into().ok()?))
3210 }
3211
3212 fn full_key_at(frame: &[u8], offset: usize) -> Option<PublicKey> {
3213 let mut key = [0u8; 32];
3214 key.copy_from_slice(frame.get(offset..offset + 32)?);
3215 Some(PublicKey(key))
3216 }
3217
3218 fn find_local_identity_for_dst(
3219 &self,
3220 dst: Option<umsh_core::NodeHint>,
3221 ) -> Option<LocalIdentityId> {
3222 let dst = dst?;
3223 self.identities
3224 .iter()
3225 .enumerate()
3226 .find(|(_, slot)| {
3227 slot.as_ref()
3228 .map(|slot| slot.identity().public_key().hint() == dst)
3229 .unwrap_or(false)
3230 })
3231 .map(|(index, _)| LocalIdentityId(index as u8))
3232 }
3233
3234 fn resolve_source_peer_candidates(
3235 &mut self,
3236 frame: &[u8],
3237 header: &PacketHeader,
3238 ) -> Vec<(PeerId, PublicKey), PEERS> {
3239 match header.source {
3240 SourceAddrRef::FullKeyAt { offset } => {
3241 let Some(peer_key) = Self::full_key_at(frame, offset) else {
3242 return Vec::new();
3243 };
3244
3245 if let Some((peer_id, _)) = self.peer_registry.lookup_by_key(&peer_key) {
3246 let mut out = Vec::new();
3247 let _ = out.push((peer_id, peer_key));
3248 return out;
3249 }
3250
3251 if self.auto_register_full_key_peers {
3252 if let Ok(peer_id) = self.try_auto_register_peer(peer_key) {
3253 let mut out = Vec::new();
3254 let _ = out.push((peer_id, peer_key));
3255 return out;
3256 }
3257 }
3258
3259 Vec::new()
3260 }
3261 SourceAddrRef::Hint(hint) => self
3262 .peer_registry
3263 .lookup_by_hint(&hint)
3264 .map(|(peer_id, info)| (peer_id, info.public_key))
3265 .collect(),
3266 SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => Vec::new(),
3267 }
3268 }
3269
3270 fn resolve_multicast_source(
3271 &mut self,
3272 frame: &[u8],
3273 header: &PacketHeader,
3274 ) -> Option<ResolvedMulticastSource> {
3275 match header.source {
3276 SourceAddrRef::FullKeyAt { offset } => {
3277 let mut key = [0u8; 32];
3278 key.copy_from_slice(frame.get(offset..offset + 32)?);
3279 let public_key = PublicKey(key);
3280 let peer_id = self
3281 .peer_registry
3282 .lookup_by_key(&public_key)
3283 .map(|(peer_id, _)| peer_id);
3284 Some(ResolvedMulticastSource {
3285 peer_id,
3286 public_key: Some(public_key),
3287 hint: Some(public_key.hint()),
3288 })
3289 }
3290 SourceAddrRef::Hint(hint) => {
3291 let resolved = self.resolve_unique_hint(hint);
3292 Some(ResolvedMulticastSource {
3293 peer_id: resolved.map(|(peer_id, _)| peer_id),
3294 public_key: resolved.map(|(_, key)| key),
3295 hint: Some(hint),
3296 })
3297 }
3298 SourceAddrRef::Encrypted { offset, len } => match len {
3299 32 => {
3300 let mut key = [0u8; 32];
3301 key.copy_from_slice(frame.get(offset..offset + 32)?);
3302 let public_key = PublicKey(key);
3303 let peer_id = self
3304 .peer_registry
3305 .lookup_by_key(&public_key)
3306 .map(|(peer_id, _)| peer_id);
3307 Some(ResolvedMulticastSource {
3308 peer_id,
3309 public_key: Some(public_key),
3310 hint: Some(public_key.hint()),
3311 })
3312 }
3313 3 => {
3314 let hint = umsh_core::NodeHint([
3315 *frame.get(offset)?,
3316 *frame.get(offset + 1)?,
3317 *frame.get(offset + 2)?,
3318 ]);
3319 let resolved = self.resolve_unique_hint(hint);
3320 Some(ResolvedMulticastSource {
3321 peer_id: resolved.map(|(peer_id, _)| peer_id),
3322 public_key: resolved.map(|(_, key)| key),
3323 hint: Some(hint),
3324 })
3325 }
3326 _ => None,
3327 },
3328 SourceAddrRef::None => None,
3329 }
3330 }
3331
3332 fn accept_unknown_multicast_replay(&mut self, header: &PacketHeader, frame: &[u8]) -> bool {
3333 let Some(cache_key) = Self::forward_dup_key(header, frame) else {
3334 return false;
3335 };
3336 if self.multicast_unknown_dup_cache.contains(&cache_key) {
3337 return false;
3338 }
3339 self.multicast_unknown_dup_cache
3340 .insert(cache_key, self.clock.now_ms());
3341 true
3342 }
3343
3344 fn resolve_blind_source_peer_candidates(
3345 &mut self,
3346 frame: &[u8],
3347 source: SourceAddrRef,
3348 ) -> Vec<(PeerId, PublicKey), PEERS> {
3349 match source {
3350 SourceAddrRef::FullKeyAt { offset } => {
3351 let Some(peer_key) = Self::full_key_at(frame, offset) else {
3352 return Vec::new();
3353 };
3354
3355 if let Some((peer_id, _)) = self.peer_registry.lookup_by_key(&peer_key) {
3356 let mut out = Vec::new();
3357 let _ = out.push((peer_id, peer_key));
3358 return out;
3359 }
3360
3361 if self.auto_register_full_key_peers {
3362 if let Ok(peer_id) = self.try_auto_register_peer(peer_key) {
3363 let mut out = Vec::new();
3364 let _ = out.push((peer_id, peer_key));
3365 return out;
3366 }
3367 }
3368
3369 Vec::new()
3370 }
3371 SourceAddrRef::Hint(hint) => self
3372 .peer_registry
3373 .lookup_by_hint(&hint)
3374 .map(|(peer_id, info)| (peer_id, info.public_key))
3375 .collect(),
3376 SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => Vec::new(),
3377 }
3378 }
3379
3380 fn resolve_unique_hint(&self, hint: umsh_core::NodeHint) -> Option<(PeerId, PublicKey)> {
3381 let mut matches = self.peer_registry.lookup_by_hint(&hint);
3382 let (peer_id, info) = matches.next()?;
3383 if matches.next().is_some() {
3384 return None;
3385 }
3386 Some((peer_id, info.public_key))
3387 }
3388
3389 fn learn_route_for_peer(&mut self, peer_id: PeerId, frame: &[u8], header: &PacketHeader) {
3390 let now_ms = self.clock.now_ms();
3391 self.peer_registry.touch(peer_id, now_ms);
3392
3393 let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3394 return;
3395 };
3396
3397 if let Some(trace_range) = options.trace_route {
3398 if let Some(route) = self.source_route_from_trace(frame.get(trace_range).unwrap_or(&[]))
3399 {
3400 self.peer_registry
3401 .update_route(peer_id, crate::CachedRoute::Source(route));
3402 return;
3403 }
3404 }
3405
3406 if let Some(flood_hops) = header.flood_hops {
3407 self.peer_registry.update_route(
3408 peer_id,
3409 crate::CachedRoute::Flood {
3410 hops: flood_hops.accumulated(),
3411 },
3412 );
3413 }
3414 }
3415
3416 fn resolve_broadcast_source(
3417 frame: &[u8],
3418 header: &PacketHeader,
3419 ) -> Option<(umsh_core::NodeHint, Option<PublicKey>)> {
3420 match header.source {
3421 SourceAddrRef::Hint(hint) => Some((hint, None)),
3422 SourceAddrRef::FullKeyAt { offset } => {
3423 let mut key = [0u8; 32];
3424 key.copy_from_slice(frame.get(offset..offset + 32)?);
3425 let public_key = PublicKey(key);
3426 Some((public_key.hint(), Some(public_key)))
3427 }
3428 SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => None,
3429 }
3430 }
3431
3432 fn payload_is_allowed(packet_type: PacketType, payload: &[u8]) -> bool {
3433 if payload.is_empty() {
3434 return true;
3435 }
3436
3437 PayloadType::from_byte(payload[0])
3438 .unwrap_or(PayloadType::Empty)
3439 .allowed_for(packet_type)
3440 }
3441
3442 fn source_route_from_trace(
3443 &self,
3444 trace_bytes: &[u8],
3445 ) -> Option<heapless::Vec<RouterHint, { crate::MAX_SOURCE_ROUTE_HOPS }>> {
3446 if trace_bytes.len() % 2 != 0 {
3447 return None;
3448 }
3449
3450 let mut route = heapless::Vec::new();
3451 for chunk in trace_bytes.chunks_exact(2) {
3452 route.push(RouterHint([chunk[0], chunk[1]])).ok()?;
3453 }
3454 Some(route)
3455 }
3456
3457 fn should_emit_destination_ack(&self, frame: &[u8], header: &PacketHeader) -> bool {
3458 let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3459 return false;
3460 };
3461
3462 options
3467 .source_route
3468 .map(|range| range.is_empty())
3469 .unwrap_or(true)
3470 }
3471
3472 fn maybe_forward_received(
3473 &mut self,
3474 frame: &[u8],
3475 header: &PacketHeader,
3476 rx: &RxInfo,
3477 locally_handled_unicast: bool,
3478 ) -> bool {
3479 if !self.repeater.enabled {
3480 return false;
3481 }
3482 if !header.packet_type().is_routable() {
3483 return false;
3484 }
3485 if locally_handled_unicast
3489 && matches!(
3490 header.packet_type(),
3491 PacketType::Unicast
3492 | PacketType::UnicastAckReq
3493 | PacketType::BlindUnicast
3494 | PacketType::BlindUnicastAckReq
3495 )
3496 {
3497 return false;
3498 }
3499
3500 let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3501 return false;
3502 };
3503 let Some(cache_key) = Self::forward_dup_key(header, frame) else {
3504 return false;
3505 };
3506 if self.dup_cache.contains(&cache_key) {
3507 self.defer_pending_forward(&cache_key, rx, &options);
3508 return false;
3509 }
3510
3511 let Some(plan) = self.plan_forwarding(frame, header, &options, rx) else {
3512 return false;
3513 };
3514
3515 let mut rewritten = [0u8; FRAME];
3516 let Ok(total_len) =
3517 self.rewrite_forwarded_frame(frame, header, &options, plan, &mut rewritten)
3518 else {
3519 return false;
3520 };
3521 if total_len > self.radio.max_frame_size() {
3522 return false;
3523 }
3524
3525 let now_ms = self.clock.now_ms();
3526 if self
3527 .tx_queue
3528 .enqueue_with_state(
3529 TxPriority::Forward,
3530 &rewritten[..total_len],
3531 None,
3532 None,
3533 now_ms.saturating_add(plan.delay_ms),
3534 0,
3535 0,
3536 )
3537 .is_err()
3538 {
3539 return false;
3540 }
3541 self.dup_cache.insert(cache_key, now_ms);
3542 true
3543 }
3544
3545 fn plan_forwarding(
3546 &mut self,
3547 frame: &[u8],
3548 header: &PacketHeader,
3549 options: &ParsedOptions,
3550 rx: &RxInfo,
3551 ) -> Option<ForwardPlan> {
3552 if options.has_unknown_critical {
3553 return None;
3554 }
3555
3556 let router_hint = self.repeater_router_hint()?;
3557 let station_action = self.classify_forward_station_action(frame, header)?;
3558
3559 let source_route_bytes = options
3560 .source_route
3561 .as_ref()
3562 .and_then(|range| frame.get(range.clone()))
3563 .unwrap_or(&[]);
3564 if source_route_bytes.len() % 2 != 0 {
3565 return None;
3566 }
3567
3568 let mut consume_source_route = false;
3569 let mut decrement_flood_hops = false;
3570 let mut insert_region_code = None;
3571 let mut delay_ms = 0u64;
3572
3573 if !source_route_bytes.is_empty() {
3574 if source_route_bytes[..2] != router_hint.0 {
3575 return None;
3576 }
3577 consume_source_route = true;
3578 if source_route_bytes.len() == 2 {
3579 decrement_flood_hops = header.flood_hops.is_some();
3580 }
3581 } else {
3582 decrement_flood_hops = true;
3583 }
3584
3585 if decrement_flood_hops {
3586 let flood_hops = header.flood_hops?;
3587 if flood_hops.remaining() == 0 {
3588 return None;
3589 }
3590 if let Some(min_rssi) = Self::effective_min_rssi(options, &self.repeater) {
3593 if rx.rssi < min_rssi {
3594 return None;
3595 }
3596 }
3597 if let Some(min_snr) = Self::effective_min_snr(options, &self.repeater) {
3598 if rx.snr < Snr::from_decibels(min_snr) {
3599 return None;
3600 }
3601 }
3602 let mut saw_region_code = false;
3603 let mut matched_region_code = false;
3604 if !header.options_range.is_empty() {
3605 for entry in umsh_core::iter_options(frame, header.options_range.clone()) {
3606 let (number, value) = entry.ok()?;
3607 if OptionNumber::from(number) != OptionNumber::RegionCode || value.len() != 2 {
3608 continue;
3609 }
3610 saw_region_code = true;
3611 let region_code = [value[0], value[1]];
3612 if self
3613 .repeater
3614 .regions
3615 .iter()
3616 .any(|configured| *configured == region_code)
3617 {
3618 matched_region_code = true;
3619 }
3620 }
3621 }
3622 if saw_region_code {
3623 if !matched_region_code {
3624 return None;
3625 }
3626 } else {
3627 insert_region_code = self.repeater.regions.first().copied();
3628 }
3629 delay_ms = self.sample_flood_contention_delay_ms(rx, options);
3630 }
3631
3632 Some(ForwardPlan {
3633 router_hint,
3634 consume_source_route,
3635 decrement_flood_hops,
3636 insert_region_code,
3637 delay_ms,
3638 station_action,
3639 })
3640 }
3641
3642 fn classify_forward_station_action(
3643 &self,
3644 frame: &[u8],
3645 header: &PacketHeader,
3646 ) -> Option<ForwardStationAction> {
3647 let has_operator_callsign = if header.options_range.is_empty() {
3648 false
3649 } else {
3650 umsh_core::iter_options(frame, header.options_range.clone())
3651 .filter_map(Result::ok)
3652 .any(|(number, _)| OptionNumber::from(number) == OptionNumber::OperatorCallsign)
3653 };
3654 let encrypted = header
3655 .sec_info
3656 .map(|sec| sec.scf.encrypted())
3657 .unwrap_or(false);
3658
3659 match self.repeater.amateur_radio_mode {
3660 AmateurRadioMode::Unlicensed => Some(ForwardStationAction::Remove),
3661 AmateurRadioMode::LicensedOnly => {
3662 if encrypted || !has_operator_callsign || self.repeater.station_callsign.is_none() {
3663 None
3664 } else {
3665 Some(ForwardStationAction::Replace)
3666 }
3667 }
3668 AmateurRadioMode::Hybrid => {
3669 if !encrypted && has_operator_callsign {
3670 self.repeater
3671 .station_callsign
3672 .as_ref()
3673 .map(|_| ForwardStationAction::Replace)
3674 } else {
3675 Some(ForwardStationAction::Remove)
3676 }
3677 }
3678 }
3679 }
3680
3681 fn rewrite_forwarded_frame(
3682 &self,
3683 src: &[u8],
3684 header: &PacketHeader,
3685 options: &ParsedOptions,
3686 plan: ForwardPlan,
3687 dst: &mut [u8],
3688 ) -> Result<usize, CapacityError> {
3689 if dst.is_empty() {
3690 return Err(CapacityError);
3691 }
3692
3693 let options_len =
3694 self.encode_forwarded_options(src, header, options, plan, &mut dst[1..])?;
3695 let mut cursor = 1 + options_len;
3696 let has_options = options_len > 0;
3697 dst[0] = umsh_core::Fcf::new(
3698 header.packet_type(),
3699 header.fcf.full_source(),
3700 has_options,
3701 header.flood_hops.is_some(),
3702 )
3703 .0;
3704
3705 if let Some(flood_hops) = header.flood_hops {
3706 let next = if plan.decrement_flood_hops {
3707 flood_hops.decremented().0
3708 } else {
3709 flood_hops.0
3710 };
3711 *dst.get_mut(cursor).ok_or(CapacityError)? = next;
3712 cursor += 1;
3713 }
3714
3715 let fixed_start = header.options_range.end + usize::from(header.flood_hops.is_some());
3716 let tail = src.get(fixed_start..).ok_or(CapacityError)?;
3717 let end = cursor + tail.len();
3718 dst.get_mut(cursor..end)
3719 .ok_or(CapacityError)?
3720 .copy_from_slice(tail);
3721 Ok(end)
3722 }
3723
3724 fn encode_forwarded_options(
3725 &self,
3726 src: &[u8],
3727 header: &PacketHeader,
3728 _options: &ParsedOptions,
3729 plan: ForwardPlan,
3730 dst: &mut [u8],
3731 ) -> Result<usize, CapacityError> {
3732 let mut encoder = OptionEncoder::new(dst);
3733 let mut inserted_region = false;
3734 let mut inserted_station = false;
3735 let mut saw_station = false;
3736 let mut wrote_any = false;
3737
3738 if !header.options_range.is_empty() {
3739 for entry in umsh_core::iter_options(src, header.options_range.clone()) {
3740 let (number, value) = entry.map_err(|_| CapacityError)?;
3741 if !inserted_region {
3742 if let Some(region_code) = plan.insert_region_code {
3743 if number > OptionNumber::RegionCode.as_u16() {
3744 encoder
3745 .put(OptionNumber::RegionCode.as_u16(), ®ion_code)
3746 .map_err(|_| CapacityError)?;
3747 inserted_region = true;
3748 wrote_any = true;
3749 }
3750 }
3751 }
3752 if !inserted_station
3753 && matches!(plan.station_action, ForwardStationAction::Replace)
3754 && number > OptionNumber::StationCallsign.as_u16()
3755 {
3756 encoder
3757 .put(
3758 OptionNumber::StationCallsign.as_u16(),
3759 self.repeater
3760 .station_callsign
3761 .as_ref()
3762 .ok_or(CapacityError)?
3763 .as_trimmed_slice(),
3764 )
3765 .map_err(|_| CapacityError)?;
3766 inserted_station = true;
3767 wrote_any = true;
3768 }
3769
3770 match OptionNumber::from(number) {
3771 OptionNumber::RegionCode => {
3772 inserted_region = true;
3773 encoder.put(number, value).map_err(|_| CapacityError)?;
3774 wrote_any = true;
3775 }
3776 OptionNumber::TraceRoute => {
3777 let mut trace = [0u8; crate::MAX_SOURCE_ROUTE_HOPS * 2 + 2];
3778 trace[..2].copy_from_slice(&plan.router_hint.0);
3779 trace[2..2 + value.len()].copy_from_slice(value);
3780 encoder
3781 .put(number, &trace[..2 + value.len()])
3782 .map_err(|_| CapacityError)?;
3783 wrote_any = true;
3784 }
3785 OptionNumber::SourceRoute if plan.consume_source_route => {
3786 if value.len() < 2 || value.len() % 2 != 0 {
3787 return Err(CapacityError);
3788 }
3789 let remaining = if value.len() > 2 { &value[2..] } else { &[] };
3790 encoder.put(number, remaining).map_err(|_| CapacityError)?;
3791 wrote_any = true;
3792 }
3793 OptionNumber::StationCallsign => {
3794 saw_station = true;
3795 match plan.station_action {
3796 ForwardStationAction::Remove => {}
3797 ForwardStationAction::Replace => {
3798 encoder
3799 .put(
3800 number,
3801 self.repeater
3802 .station_callsign
3803 .as_ref()
3804 .ok_or(CapacityError)?
3805 .as_trimmed_slice(),
3806 )
3807 .map_err(|_| CapacityError)?;
3808 inserted_station = true;
3809 wrote_any = true;
3810 }
3811 }
3812 }
3813 _ => {
3814 encoder.put(number, value).map_err(|_| CapacityError)?;
3815 wrote_any = true;
3816 }
3817 }
3818 }
3819 }
3820
3821 if matches!(plan.station_action, ForwardStationAction::Replace)
3822 && !inserted_station
3823 && !saw_station
3824 {
3825 encoder
3826 .put(
3827 OptionNumber::StationCallsign.as_u16(),
3828 self.repeater
3829 .station_callsign
3830 .as_ref()
3831 .ok_or(CapacityError)?
3832 .as_trimmed_slice(),
3833 )
3834 .map_err(|_| CapacityError)?;
3835 wrote_any = true;
3836 }
3837 if let Some(region_code) = plan.insert_region_code {
3838 if !inserted_region {
3839 encoder
3840 .put(OptionNumber::RegionCode.as_u16(), ®ion_code)
3841 .map_err(|_| CapacityError)?;
3842 wrote_any = true;
3843 }
3844 }
3845 if wrote_any {
3846 encoder.end_marker().map_err(|_| CapacityError)?;
3847 Ok(encoder.finish())
3848 } else {
3849 Ok(0)
3850 }
3851 }
3852
3853 fn synthesize_route_retry_resend(
3854 &self,
3855 peer: &PublicKey,
3856 resend: &ResendRecord<FRAME>,
3857 ) -> Option<ResendRecord<FRAME>> {
3858 let header = PacketHeader::parse(resend.frame.as_slice()).ok()?;
3859 let options =
3860 ParsedOptions::extract(resend.frame.as_slice(), header.options_range.clone()).ok()?;
3861 if options.route_retry {
3862 return None;
3863 }
3864 let source_route = resend.source_route.as_ref()?;
3865 if source_route.is_empty() {
3866 return None;
3867 }
3868
3869 let flood_hops = self.route_retry_flood_hops(peer, &header, source_route)?;
3870 let mut rewritten = [0u8; FRAME];
3871 let options_len = self
3872 .encode_route_retry_options(
3873 resend.frame.as_slice(),
3874 header.options_range.clone(),
3875 &options,
3876 &mut rewritten[1..],
3877 )
3878 .ok()?;
3879 let mut cursor = 1 + options_len;
3880 let has_options = options_len > 0;
3881 let has_flood_hops = flood_hops > 0;
3882 rewritten[0] = umsh_core::Fcf::new(
3883 header.packet_type(),
3884 header.fcf.full_source(),
3885 has_options,
3886 has_flood_hops,
3887 )
3888 .0;
3889
3890 if has_flood_hops {
3891 *rewritten.get_mut(cursor)? = FloodHops::new(flood_hops, 0)?.0;
3892 cursor += 1;
3893 }
3894
3895 let fixed_start = header.options_range.end + usize::from(header.flood_hops.is_some());
3896 let tail = resend.frame.get(fixed_start..)?;
3897 let end = cursor + tail.len();
3898 rewritten.get_mut(cursor..end)?.copy_from_slice(tail);
3899
3900 ResendRecord::try_new(&rewritten[..end], None).ok()
3901 }
3902
3903 fn encode_route_retry_options(
3904 &self,
3905 src: &[u8],
3906 options_range: core::ops::Range<usize>,
3907 _options: &ParsedOptions,
3908 dst: &mut [u8],
3909 ) -> Result<usize, CapacityError> {
3910 let mut encoder = OptionEncoder::new(dst);
3911 let mut wrote_any = false;
3912 let mut inserted_trace_route = false;
3913 let mut inserted_route_retry = false;
3914
3915 if !options_range.is_empty() {
3916 for entry in umsh_core::iter_options(src, options_range) {
3917 let (number, value) = entry.map_err(|_| CapacityError)?;
3918 if !inserted_trace_route && number > OptionNumber::TraceRoute.as_u16() {
3919 encoder
3920 .put(OptionNumber::TraceRoute.as_u16(), &[])
3921 .map_err(|_| CapacityError)?;
3922 wrote_any = true;
3923 inserted_trace_route = true;
3924 }
3925 if !inserted_route_retry && number > OptionNumber::RouteRetry.as_u16() {
3926 encoder
3927 .put(OptionNumber::RouteRetry.as_u16(), &[])
3928 .map_err(|_| CapacityError)?;
3929 wrote_any = true;
3930 inserted_route_retry = true;
3931 }
3932 match OptionNumber::from(number) {
3933 OptionNumber::SourceRoute => {}
3934 OptionNumber::TraceRoute => {
3935 encoder.put(number, value).map_err(|_| CapacityError)?;
3936 wrote_any = true;
3937 inserted_trace_route = true;
3938 }
3939 OptionNumber::RouteRetry => {}
3940 _ => {
3941 encoder.put(number, value).map_err(|_| CapacityError)?;
3942 wrote_any = true;
3943 }
3944 }
3945 }
3946 }
3947
3948 if !inserted_trace_route {
3949 encoder
3950 .put(OptionNumber::TraceRoute.as_u16(), &[])
3951 .map_err(|_| CapacityError)?;
3952 wrote_any = true;
3953 }
3954 if !inserted_route_retry {
3955 encoder
3956 .put(OptionNumber::RouteRetry.as_u16(), &[])
3957 .map_err(|_| CapacityError)?;
3958 wrote_any = true;
3959 }
3960
3961 if wrote_any {
3962 encoder.end_marker().map_err(|_| CapacityError)?;
3963 Ok(encoder.finish())
3964 } else {
3965 Ok(0)
3966 }
3967 }
3968
3969 fn route_retry_flood_hops(
3970 &self,
3971 peer: &PublicKey,
3972 header: &PacketHeader,
3973 source_route: &heapless::Vec<RouterHint, MAX_SOURCE_ROUTE_HOPS>,
3974 ) -> Option<u8> {
3975 let existing = header
3976 .flood_hops
3977 .map(|hops| hops.remaining())
3978 .filter(|hops| *hops > 0);
3979 let cached = self
3980 .peer_registry
3981 .lookup_by_key(peer)
3982 .and_then(|(_, info)| match info.route.as_ref() {
3983 Some(crate::CachedRoute::Flood { hops }) => Some((*hops).clamp(1, 15)),
3984 _ => None,
3985 });
3986 let route_len = u8::try_from(source_route.len())
3987 .ok()
3988 .map(|hops| hops.clamp(1, 15));
3989
3990 existing.or(cached).or(route_len).or(Some(5))
3991 }
3992
3993 fn repeater_router_hint(&self) -> Option<RouterHint> {
3994 self.identities
3995 .iter()
3996 .filter_map(|slot| slot.as_ref())
3997 .next()
3998 .map(|slot| slot.identity().public_key().router_hint())
3999 }
4000
4001 fn effective_min_rssi(options: &ParsedOptions, repeater: &RepeaterConfig) -> Option<i16> {
4002 match (options.min_rssi, repeater.min_rssi) {
4003 (Some(packet), Some(local)) => Some(packet.max(local)),
4004 (Some(packet), None) => Some(packet),
4005 (None, Some(local)) => Some(local),
4006 (None, None) => None,
4007 }
4008 }
4009
4010 fn effective_min_snr(options: &ParsedOptions, repeater: &RepeaterConfig) -> Option<i8> {
4011 match (options.min_snr, repeater.min_snr) {
4012 (Some(packet), Some(local)) => Some(packet.max(local)),
4013 (Some(packet), None) => Some(packet),
4014 (None, Some(local)) => Some(local),
4015 (None, None) => None,
4016 }
4017 }
4018
4019 fn sample_flood_contention_delay_ms(&mut self, rx: &RxInfo, options: &ParsedOptions) -> u64 {
4020 let effective_threshold_db =
4021 Self::effective_min_snr(options, &self.repeater).unwrap_or(i8::MIN);
4022 let low_db = self
4023 .repeater
4024 .flood_contention_snr_low_db
4025 .max(effective_threshold_db);
4026 let high_db = self
4027 .repeater
4028 .flood_contention_snr_high_db
4029 .max(low_db.saturating_add(1));
4030 let low = i32::from(Snr::from_decibels(low_db).as_centibels());
4031 let high = i32::from(Snr::from_decibels(high_db).as_centibels());
4032 let received = i32::from(rx.snr.as_centibels());
4033 let clamped = (received - low).clamp(0, high - low) as u32;
4034 let range = (high - low) as u32;
4035 let t_frame_ms = u64::from(self.radio.t_frame_ms());
4036 let min_window_ms = t_frame_ms
4037 .saturating_mul(u64::from(self.repeater.flood_contention_min_window_percent))
4038 / 100;
4039 let max_window_ms = t_frame_ms
4040 .saturating_mul(u64::from(self.repeater.flood_contention_max_window_frames))
4041 .max(min_window_ms);
4042 let window_span_ms = max_window_ms.saturating_sub(min_window_ms);
4043 let window_ms = if range == 0 {
4044 max_window_ms
4045 } else {
4046 max_window_ms.saturating_sub(
4047 window_span_ms.saturating_mul(u64::from(clamped)) / u64::from(range),
4048 )
4049 };
4050 if window_ms == 0 {
4051 0
4052 } else {
4053 self.rng.random_range(..window_ms.saturating_add(1))
4054 }
4055 }
4056
4057 pub(crate) fn forward_dup_key(header: &PacketHeader, frame: &[u8]) -> Option<DupCacheKey> {
4068 Self::routable_packet_identity(header, frame)
4069 }
4070
4071 fn routable_packet_identity(header: &PacketHeader, frame: &[u8]) -> Option<DupCacheKey> {
4072 if !header.packet_type().is_secure() {
4073 return Some(DupCacheKey::Hash32(Self::normalized_routable_hash32(
4074 header, frame,
4075 )));
4076 }
4077 let options = ParsedOptions::extract(frame, header.options_range.clone()).ok()?;
4078 let mic = frame.get(header.mic_range.clone())?;
4079 if mic.is_empty() || mic.len() > 16 {
4080 return None;
4081 }
4082 let mut bytes = [0u8; 16];
4083 bytes[..mic.len()].copy_from_slice(mic);
4084 Some(DupCacheKey::Mic {
4085 bytes,
4086 len: mic.len() as u8,
4087 route_retry: options.route_retry,
4088 })
4089 }
4090
4091 fn defer_pending_forward(&mut self, key: &DupCacheKey, rx: &RxInfo, options: &ParsedOptions) {
4092 let Some(queued) = self.tx_queue.remove_first_matching(|entry| {
4093 entry.priority == TxPriority::Forward
4094 && Self::confirmation_key(entry.frame.as_slice())
4095 .map(|entry_key| &entry_key == key)
4096 .unwrap_or(false)
4097 }) else {
4098 return;
4099 };
4100
4101 if queued.forward_deferrals >= self.repeater.flood_contention_max_deferrals {
4102 return;
4103 }
4104
4105 let now_ms = self.clock.now_ms();
4106 let delay_ms = self.sample_flood_contention_delay_ms(rx, options);
4107 let _ = self.tx_queue.enqueue_with_state(
4108 queued.priority,
4109 queued.frame.as_slice(),
4110 queued.receipt,
4111 queued.identity_id,
4112 now_ms.saturating_add(delay_ms),
4113 queued.cad_attempts,
4114 queued.forward_deferrals.saturating_add(1),
4115 );
4116 }
4117
4118 async fn service_post_tx_listen(
4119 &mut self,
4120 mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
4121 ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
4122 loop {
4123 self.expire_post_tx_listen_if_needed();
4124 if self.post_tx_listen.is_none() {
4125 return Ok(());
4126 }
4127
4128 let handled = self.receive_one(&mut on_event).await?;
4129 if !handled {
4130 return Ok(());
4131 }
4132 }
4133 }
4134
4135 fn note_transmitted_ack_requested(&mut self, receipt: SendReceipt, frame: &[u8]) {
4136 let sent_ms = self.clock.now_ms();
4137 let direct_ack_deadline_ms = sent_ms.saturating_add(self.direct_ack_timeout_ms());
4138 let forwarded_ack_deadline_ms = sent_ms.saturating_add(self.forwarded_ack_timeout_ms());
4139 let confirm_timeout_ms = self.forward_confirm_timeout_ms();
4140 let confirm_key = Self::confirmation_key(frame);
4141
4142 let post_tx_listen = {
4143 let Some((identity_id, pending)) = self.pending_ack_mut(receipt) else {
4144 return;
4145 };
4146
4147 let needs_forward_confirmation = match pending.state {
4148 crate::AckState::Queued {
4149 needs_forward_confirmation,
4150 } => needs_forward_confirmation,
4151 crate::AckState::RetryQueued => true,
4152 _ => return,
4153 };
4154
4155 pending.sent_ms = sent_ms;
4156 if pending.ack_deadline_ms == 0 {
4157 pending.ack_deadline_ms = if needs_forward_confirmation {
4158 forwarded_ack_deadline_ms
4159 } else {
4160 direct_ack_deadline_ms
4161 };
4162 }
4163
4164 if needs_forward_confirmation {
4165 let deadline_ms = sent_ms.saturating_add(confirm_timeout_ms);
4166 pending.state = crate::AckState::AwaitingForward {
4167 confirm_deadline_ms: deadline_ms,
4168 };
4169 confirm_key.map(|confirm_key| PostTxListen {
4170 identity_id,
4171 receipt,
4172 confirm_key,
4173 deadline_ms,
4174 })
4175 } else {
4176 pending.state = crate::AckState::AwaitingAck;
4177 None
4178 }
4179 };
4180
4181 self.post_tx_listen = post_tx_listen;
4182 }
4183
4184 fn expire_post_tx_listen_if_needed(&mut self) {
4185 let should_clear = self
4186 .post_tx_listen
4187 .as_ref()
4188 .map(|listen| self.clock.now_ms() >= listen.deadline_ms)
4189 .unwrap_or(false);
4190 if should_clear {
4191 self.post_tx_listen = None;
4192 }
4193 }
4194
4195 fn forward_confirm_timeout_ms(&self) -> u64 {
4196 let t_frame_ms = u64::from(self.radio.t_frame_ms());
4197 t_frame_ms
4198 .saturating_add(self.max_forward_contention_delay_ms())
4199 .saturating_add(t_frame_ms)
4200 }
4201
4202 fn max_forward_contention_delay_ms(&self) -> u64 {
4203 u64::from(self.radio.t_frame_ms())
4204 .saturating_mul(u64::from(self.repeater.flood_contention_max_window_frames))
4205 }
4206
4207 fn forward_retry_backoff_cap_ms(&self, retry_number: u8) -> u32 {
4208 Self::forward_retry_backoff_cap_ms_for_t_frame(self.radio.t_frame_ms(), retry_number)
4209 }
4210
4211 fn forward_retry_backoff_cap_ms_for_t_frame(t_frame_ms: u32, retry_number: u8) -> u32 {
4212 let exponent = retry_number.saturating_sub(1).min(2);
4213 t_frame_ms
4214 .saturating_mul(1u32 << exponent)
4215 .min(t_frame_ms.saturating_mul(4))
4216 }
4217
4218 fn can_attempt_route_retry(pending: &PendingAck<FRAME>) -> bool {
4219 let Ok(header) = PacketHeader::parse(pending.resend.frame.as_slice()) else {
4220 return false;
4221 };
4222 let Ok(options) = ParsedOptions::extract(
4223 pending.resend.frame.as_slice(),
4224 header.options_range.clone(),
4225 ) else {
4226 return false;
4227 };
4228 !options.route_retry
4229 && pending
4230 .resend
4231 .source_route
4232 .as_ref()
4233 .map(|route| !route.is_empty())
4234 .unwrap_or(false)
4235 }
4236
4237 fn direct_ack_timeout_ms(&self) -> u64 {
4238 u64::from(self.radio.t_frame_ms()).saturating_mul(10)
4239 }
4240
4241 fn forwarded_ack_timeout_ms(&self) -> u64 {
4242 let mut total = self.forward_confirm_timeout_ms();
4243 for retry_number in 1..=MAX_FORWARD_RETRIES {
4244 total = total
4245 .saturating_add(u64::from(self.forward_retry_backoff_cap_ms(retry_number)))
4246 .saturating_add(self.forward_confirm_timeout_ms());
4247 }
4248 total.saturating_add(u64::from(self.radio.t_frame_ms()))
4249 }
4250
4251 fn pending_ack_mut(
4252 &mut self,
4253 receipt: SendReceipt,
4254 ) -> Option<(LocalIdentityId, &mut PendingAck<FRAME>)> {
4255 for (index, slot) in self.identities.iter_mut().enumerate() {
4256 let Some(slot) = slot.as_mut() else {
4257 continue;
4258 };
4259 if let Some(pending) = slot.pending_ack_mut(&receipt) {
4260 return Some((LocalIdentityId(index as u8), pending));
4261 }
4262 }
4263 None
4264 }
4265
4266 pub(crate) fn confirmation_key(frame: &[u8]) -> Option<DupCacheKey> {
4267 let header = PacketHeader::parse(frame).ok()?;
4268 Self::routable_packet_identity(&header, frame)
4269 }
4270
4271 fn normalized_routable_hash32(header: &PacketHeader, frame: &[u8]) -> u32 {
4272 let mut hash = 0x811C_9DC5u32;
4273
4274 Self::hash_u8(&mut hash, header.packet_type() as u8);
4275 Self::hash_u8(&mut hash, header.fcf.full_source() as u8);
4276
4277 if !header.options_range.is_empty() {
4278 for entry in umsh_core::iter_options(frame, header.options_range.clone()) {
4279 let Ok((number, value)) = entry else {
4280 continue;
4281 };
4282 let option = OptionNumber::from(number);
4283 if option.is_dynamic() {
4284 continue;
4285 }
4286 Self::hash_u16(&mut hash, number);
4287 Self::hash_u16(&mut hash, value.len() as u16);
4288 Self::hash_bytes(&mut hash, value);
4289 }
4290 }
4291
4292 match header.packet_type() {
4293 PacketType::Broadcast => {
4294 match header.source {
4295 umsh_core::SourceAddrRef::Hint(hint) => Self::hash_bytes(&mut hash, &hint.0),
4296 umsh_core::SourceAddrRef::FullKeyAt { offset } => {
4297 if let Some(key) = frame.get(offset..offset + 32) {
4298 Self::hash_bytes(&mut hash, key);
4299 }
4300 }
4301 umsh_core::SourceAddrRef::Encrypted { offset, len } => {
4302 if let Some(src) = frame.get(offset..offset + len) {
4303 Self::hash_bytes(&mut hash, src);
4304 }
4305 }
4306 umsh_core::SourceAddrRef::None => {}
4307 }
4308 if let Some(payload) = frame.get(header.body_range.clone()) {
4309 Self::hash_bytes(&mut hash, payload);
4310 }
4311 }
4312 PacketType::MacAck => {
4313 if let Some(dst) = header.ack_dst {
4314 Self::hash_bytes(&mut hash, &dst.0);
4315 }
4316 if let Some(tag) = frame.get(header.mic_range.clone()) {
4317 Self::hash_bytes(&mut hash, tag);
4318 }
4319 }
4320 _ => {
4321 if let Some(bytes) = frame.get(header.body_range.clone()) {
4322 Self::hash_bytes(&mut hash, bytes);
4323 }
4324 }
4325 }
4326 hash
4327 }
4328
4329 fn hash_u8(hash: &mut u32, value: u8) {
4330 *hash ^= u32::from(value);
4331 *hash = hash.wrapping_mul(0x0100_0193);
4332 }
4333
4334 fn hash_u16(hash: &mut u32, value: u16) {
4335 Self::hash_bytes(hash, &value.to_be_bytes());
4336 }
4337
4338 fn hash_bytes(hash: &mut u32, bytes: &[u8]) {
4339 for byte in bytes {
4340 Self::hash_u8(hash, *byte);
4341 }
4342 }
4343
4344 fn observe_forwarding_confirmation(
4349 &mut self,
4350 frame: &[u8],
4351 ) -> Option<(LocalIdentityId, SendReceipt)> {
4352 self.expire_post_tx_listen_if_needed();
4353 let listen = self.post_tx_listen.clone()?;
4354
4355 let received_key = Self::confirmation_key(frame)?;
4356 if received_key != listen.confirm_key {
4357 return None;
4358 }
4359
4360 let Some(slot) = self.identity_mut(listen.identity_id) else {
4361 self.post_tx_listen = None;
4362 return None;
4363 };
4364 let Some(pending) = slot.pending_ack_mut(&listen.receipt) else {
4365 self.post_tx_listen = None;
4366 return None;
4367 };
4368 if !matches!(pending.state, crate::AckState::AwaitingForward { .. }) {
4369 self.post_tx_listen = None;
4370 return None;
4371 }
4372
4373 pending.state = crate::AckState::AwaitingAck;
4374 self.post_tx_listen = None;
4375 Some((listen.identity_id, listen.receipt))
4376 }
4377
4378 fn match_pending_peer_for_ack(
4379 &self,
4380 slot: &IdentitySlot<P::Identity, PEERS, ACKS, FRAME>,
4381 ack_tag_bytes: &[u8],
4382 ) -> Option<PublicKey> {
4383 if ack_tag_bytes.len() != 8 {
4384 return None;
4385 }
4386
4387 slot.pending_acks
4388 .iter()
4389 .find_map(|(_, pending)| (pending.ack_tag == ack_tag_bytes).then_some(pending.peer))
4390 }
4391}
4392
4393fn align_counter_boundary(value: u32) -> u32 {
4394 value & !COUNTER_PERSIST_BLOCK_MASK
4395}
4396
4397fn next_counter_persist_target(next_counter: u32) -> u32 {
4398 next_counter.wrapping_add(COUNTER_PERSIST_BLOCK_SIZE) & !COUNTER_PERSIST_BLOCK_MASK
4399}