umsh_mac/
coordinator.rs

1use core::num::NonZeroU8;
2use core::{future::poll_fn, task::Poll};
3
4use hamaddr::HamAddr;
5use heapless::{LinearMap, Vec};
6use rand::{Rng, RngExt as _};
7use umsh_core::{
8    BuildError, ChannelId, ChannelKey, FloodHops, NodeHint, OptionNumber, PacketBuilder,
9    PacketHeader, PacketType, ParseError, ParsedOptions, PayloadType, PublicKey, RouterHint,
10    SourceAddrRef, UnsealedPacket, feed_aad, options::OptionEncoder,
11};
12use umsh_crypto::{
13    CmacState, CryptoEngine, CryptoError, DerivedChannelKeys, NodeIdentity, PairwiseKeys,
14};
15use umsh_hal::{Clock, CounterStore, Radio, RxInfo, Snr, TxError, TxOptions};
16
17use crate::{
18    CapacityError, DEFAULT_ACKS, DEFAULT_CHANNELS, DEFAULT_DUP, DEFAULT_IDENTITIES, DEFAULT_PEERS,
19    DEFAULT_TX, MAX_CAD_ATTEMPTS, MAX_FORWARD_RETRIES, MAX_RESEND_FRAME_LEN, MAX_SOURCE_ROUTE_HOPS,
20    Platform, ReplayVerdict, ReplayWindow,
21    cache::{DupCacheKey, DuplicateCache},
22    peers::CachedRoute,
23    peers::{ChannelTable, PeerCryptoMap, PeerId, PeerRegistry},
24    send::{
25        PendingAck, PendingAckError, ResendRecord, SendOptions, SendReceipt, TxPriority, TxQueue,
26    },
27};
28
29const COUNTER_PERSIST_BLOCK_SIZE: u32 = 128;
30const COUNTER_PERSIST_BLOCK_MASK: u32 = COUNTER_PERSIST_BLOCK_SIZE - 1;
31const COUNTER_PERSIST_SCHEDULE_OFFSET: u32 = 100;
32const MAC_COMMAND_ECHO_REQUEST_ID: u8 = 4;
33const MAC_COMMAND_ECHO_RESPONSE_ID: u8 = 5;
34const COUNTER_RESYNC_NONCE_LEN: usize = 4;
35const COUNTER_RESYNC_REQUEST_RETRY_MS: u64 = 5_000;
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
38struct PendingCounterResync {
39    nonce: u32,
40    requested_ms: u64,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44struct DeferredCounterResyncFrame<const FRAME: usize> {
45    local_id: LocalIdentityId,
46    peer_id: PeerId,
47    frame: Vec<u8, FRAME>,
48    rssi: i16,
49    snr: Snr,
50    lqi: Option<NonZeroU8>,
51    received_at_ms: u64,
52}
53
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55struct ResolvedMulticastSource {
56    peer_id: Option<PeerId>,
57    public_key: Option<PublicKey>,
58    hint: Option<NodeHint>,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq)]
62struct PostTxListen {
63    identity_id: LocalIdentityId,
64    receipt: SendReceipt,
65    confirm_key: DupCacheKey,
66    deadline_ms: u64,
67}
68
69/// Opaque handle that identifies a locally registered identity within the [`Mac`] coordinator.
70///
71/// Every UMSH node presents one or more Ed25519 public keys to the network. When a key is
72/// registered via [`Mac::add_identity`] (or [`Mac::register_ephemeral`] for PFS sessions),
73/// the coordinator allocates a slot and returns a `LocalIdentityId` that permanently names it.
74///
75/// The inner `u8` is a stable zero-based slot index — slot `0` is the first identity
76/// registered, slot `1` the second, and so on. All per-identity coordinator operations
77/// (`queue_unicast`, `queue_multicast`, ACK tracking, key installation, frame-counter
78/// persistence) accept a `LocalIdentityId` to select which local keypair to use, allowing a
79/// single coordinator instance to operate multiple identities simultaneously — for example, a
80/// persistent long-term identity alongside an ephemeral PFS session identity.
81#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
82pub struct LocalIdentityId(pub u8);
83
84/// A local node identity that the [`Mac`] coordinator owns and acts on behalf of.
85///
86/// UMSH nodes are identified by Ed25519 public keys. An identity provides the public key and
87/// the ability to derive pairwise keys via ECDH with the corresponding private key.
88/// Two variants are supported:
89///
90/// - **`LongTerm(I)`** — wraps the platform-supplied `I: NodeIdentity`, which is typically
91///   backed by secure-element storage, an HSM, or a platform keystore. Long-term identities
92///   persist across reboots; their frame counters are saved to the [`umsh_hal::CounterStore`]
93///   so that replay protection remains valid after a power cycle.
94///
95/// - **`Ephemeral`** — wraps an in-memory [`SoftwareIdentity`](umsh_crypto::software::SoftwareIdentity)
96///   generated fresh at runtime for Perfect Forward Secrecy sessions. Because the key material
97///   itself vanishes on power loss, ephemeral identities do not persist their frame counters;
98///   replay protection is meaningful only within a single session. Requires the
99///   `software-crypto` crate feature.
100///
101/// Use [`LocalIdentity::public_key`] or [`LocalIdentity::hint`] to inspect the address
102/// presented to the network without matching on the variant.
103pub enum LocalIdentity<I: NodeIdentity> {
104    /// Long-term platform identity.
105    LongTerm(I),
106    #[cfg(feature = "software-crypto")]
107    /// Software ephemeral identity used for PFS sessions.
108    Ephemeral(umsh_crypto::software::SoftwareIdentity),
109}
110
111impl<I: NodeIdentity> LocalIdentity<I> {
112    /// Return the public key for this identity.
113    pub fn public_key(&self) -> &PublicKey {
114        match self {
115            Self::LongTerm(identity) => identity.public_key(),
116            #[cfg(feature = "software-crypto")]
117            Self::Ephemeral(identity) => identity.public_key(),
118        }
119    }
120
121    /// Return the derived node hint for this identity.
122    pub fn hint(&self) -> umsh_core::NodeHint {
123        self.public_key().hint()
124    }
125
126    /// Return whether this identity is ephemeral.
127    pub fn is_ephemeral(&self) -> bool {
128        match self {
129            Self::LongTerm(_) => false,
130            #[cfg(feature = "software-crypto")]
131            Self::Ephemeral(_) => true,
132        }
133    }
134}
135
136impl<I: NodeIdentity> From<I> for LocalIdentity<I> {
137    fn from(value: I) -> Self {
138        Self::LongTerm(value)
139    }
140}
141
142/// Per-identity runtime state owned by the [`Mac`] coordinator.
143///
144/// There is exactly one `IdentitySlot` per registered local identity ([`LocalIdentityId`]).
145/// The slot bundles everything the coordinator needs to send, receive, and authenticate
146/// on behalf of a single local keypair:
147///
148/// - The [`LocalIdentity`] (public key + ECDH capability).
149/// - A [`PeerCryptoMap`](crate::peers::PeerCryptoMap) mapping each known remote peer to its
150///   established [`umsh_crypto::PairwiseKeys`] and replay window. Entries are populated on
151///   first secure contact, or through the advanced manual-install escape hatch.
152/// - A monotonically increasing **frame counter** stamped into SECINFO of every sealed
153///   packet, plus the bookkeeping needed to persist it safely (see below).
154/// - A [`LinearMap`] of in-flight [`PendingAck`](crate::send::PendingAck) records keyed by
155///   [`SendReceipt`](crate::send::SendReceipt), one entry per ACK-requested send awaiting
156///   either forwarding confirmation or a final transport ACK.
157/// - An internal `next_receipt` counter used to issue unique
158///   [`SendReceipt`](crate::send::SendReceipt) values without allocation.
159///
160/// ## Frame-counter persistence
161///
162/// UMSH uses a monotonic frame counter instead of a timestamp for replay protection.
163/// If the counter resets to a previously-seen value after a reboot, replayed old frames
164/// might be accepted. To prevent this, the coordinator "reserves" counter ranges by writing
165/// boundary values to the [`umsh_hal::CounterStore`] *before* using them. The slot tracks
166/// three values:
167///
168/// - `frame_counter` — the live in-use value, advanced on every secured send.
169/// - `persisted_counter` — the last boundary safely committed to the store.
170/// - `pending_persist_target` — a scheduled future boundary written on the next call to
171///   [`Mac::service_counter_persistence`].
172///
173/// If the live counter reaches `persisted_counter + COUNTER_PERSIST_BLOCK_SIZE` without a
174/// successful flush, secure sends on that identity are blocked
175/// ([`SendError::CounterPersistenceLag`]) until the store catches up. Ephemeral identities
176/// opt out of this mechanism entirely.
177pub struct IdentitySlot<
178    I: NodeIdentity,
179    const PEERS: usize,
180    const ACKS: usize,
181    const FRAME: usize = MAX_RESEND_FRAME_LEN,
182> {
183    identity: LocalIdentity<I>,
184    peer_crypto: PeerCryptoMap<PEERS>,
185    frame_counter: u32,
186    persisted_counter: u32,
187    pending_persist_target: Option<u32>,
188    save_scheduled_since_boot: bool,
189    counter_persistence_enabled: bool,
190    pending_acks: LinearMap<SendReceipt, PendingAck<FRAME>, ACKS>,
191    next_receipt: u32,
192    pfs_parent: Option<LocalIdentityId>,
193    pending_counter_resync: LinearMap<PeerId, PendingCounterResync, PEERS>,
194}
195
196impl<I: NodeIdentity, const PEERS: usize, const ACKS: usize, const FRAME: usize>
197    IdentitySlot<I, PEERS, ACKS, FRAME>
198{
199    /// Create a new identity slot.
200    pub fn new(
201        identity: LocalIdentity<I>,
202        frame_counter: u32,
203        pfs_parent: Option<LocalIdentityId>,
204    ) -> Self {
205        let counter_persistence_enabled = !identity.is_ephemeral();
206        Self {
207            identity,
208            peer_crypto: PeerCryptoMap::new(),
209            frame_counter,
210            persisted_counter: frame_counter,
211            pending_persist_target: None,
212            save_scheduled_since_boot: false,
213            counter_persistence_enabled,
214            pending_acks: LinearMap::new(),
215            next_receipt: 0,
216            pfs_parent,
217            pending_counter_resync: LinearMap::new(),
218        }
219    }
220
221    /// Borrow the underlying identity.
222    pub fn identity(&self) -> &LocalIdentity<I> {
223        &self.identity
224    }
225    /// Borrow the per-peer secure-state map.
226    pub fn peer_crypto(&self) -> &PeerCryptoMap<PEERS> {
227        &self.peer_crypto
228    }
229    /// Mutably borrow the per-peer secure-state map.
230    pub fn peer_crypto_mut(&mut self) -> &mut PeerCryptoMap<PEERS> {
231        &mut self.peer_crypto
232    }
233    /// Return the current frame counter.
234    pub fn frame_counter(&self) -> u32 {
235        self.frame_counter
236    }
237    /// Return the persisted frame-counter reservation boundary.
238    pub fn persisted_counter(&self) -> u32 {
239        self.persisted_counter
240    }
241    /// Overwrite the current frame counter.
242    ///
243    /// # Safety (logical)
244    /// Misuse can break replay protection.
245    #[cfg(test)]
246    pub(crate) fn set_frame_counter(&mut self, value: u32) {
247        self.frame_counter = value;
248    }
249    /// Return the next scheduled persist target, if any.
250    pub fn pending_persist_target(&self) -> Option<u32> {
251        self.pending_persist_target
252    }
253
254    /// Return whether counter persistence is enabled for this identity.
255    pub fn counter_persistence_enabled(&self) -> bool {
256        self.counter_persistence_enabled
257    }
258
259    /// Return the current frame counter and advance it with wrapping semantics.
260    pub(crate) fn advance_frame_counter(&mut self) -> u32 {
261        let current = self.frame_counter;
262        self.frame_counter = self.frame_counter.wrapping_add(1);
263        current
264    }
265
266    /// Load a persisted counter boundary for this identity.
267    pub fn load_persisted_counter(&mut self, value: u32) {
268        let aligned = align_counter_boundary(value);
269        self.frame_counter = aligned;
270        self.persisted_counter = aligned;
271        self.pending_persist_target = None;
272        self.save_scheduled_since_boot = false;
273    }
274
275    fn schedule_counter_persist_if_needed(&mut self) {
276        if !self.counter_persistence_enabled {
277            return;
278        }
279
280        let should_schedule = !self.save_scheduled_since_boot
281            || (self.frame_counter & COUNTER_PERSIST_BLOCK_MASK) == COUNTER_PERSIST_SCHEDULE_OFFSET;
282        if !should_schedule {
283            return;
284        }
285
286        let target = next_counter_persist_target(self.frame_counter);
287        self.pending_persist_target = Some(
288            self.pending_persist_target
289                .map(|existing| existing.max(target))
290                .unwrap_or(target),
291        );
292        self.save_scheduled_since_boot = true;
293    }
294
295    fn mark_counter_persisted(&mut self, value: u32) {
296        let aligned = align_counter_boundary(value);
297        self.persisted_counter = aligned;
298        if self.pending_persist_target == Some(aligned) {
299            self.pending_persist_target = None;
300        }
301    }
302
303    fn counter_window_exhausted(&self) -> bool {
304        if !self.counter_persistence_enabled {
305            return false;
306        }
307
308        let ahead = self.persisted_counter.wrapping_sub(self.frame_counter);
309        if ahead > 0 && ahead <= COUNTER_PERSIST_BLOCK_SIZE {
310            return false;
311        }
312
313        if ahead == 0 {
314            return self.save_scheduled_since_boot;
315        }
316
317        self.frame_counter.wrapping_sub(self.persisted_counter) >= COUNTER_PERSIST_BLOCK_SIZE
318    }
319
320    /// Allocate the next send receipt.
321    pub fn next_receipt(&mut self) -> SendReceipt {
322        let receipt = SendReceipt(self.next_receipt);
323        self.next_receipt = self.next_receipt.wrapping_add(1);
324        receipt
325    }
326
327    /// Overrides the next send receipt value in tests that exercise wraparound behavior.
328    #[cfg(test)]
329    pub(crate) fn set_next_receipt_for_test(&mut self, value: u32) {
330        self.next_receipt = value;
331    }
332
333    /// Insert or replace pending-ACK state for a send receipt.
334    pub fn try_insert_pending_ack(
335        &mut self,
336        receipt: SendReceipt,
337        pending: PendingAck<FRAME>,
338    ) -> Result<Option<PendingAck<FRAME>>, PendingAckError> {
339        self.pending_acks
340            .insert(receipt, pending)
341            .map_err(|_| PendingAckError::TableFull)
342    }
343
344    /// Borrow pending-ACK state by receipt.
345    pub fn pending_ack(&self, receipt: &SendReceipt) -> Option<&PendingAck<FRAME>> {
346        self.pending_acks.get(receipt)
347    }
348
349    /// Mutably borrow pending-ACK state by receipt.
350    pub fn pending_ack_mut(&mut self, receipt: &SendReceipt) -> Option<&mut PendingAck<FRAME>> {
351        self.pending_acks.get_mut(receipt)
352    }
353
354    /// Remove pending-ACK state by receipt.
355    pub fn remove_pending_ack(&mut self, receipt: &SendReceipt) -> Option<PendingAck<FRAME>> {
356        self.pending_acks.remove(receipt)
357    }
358
359    /// Return the parent long-term identity if this slot is ephemeral.
360    pub fn pfs_parent(&self) -> Option<LocalIdentityId> {
361        self.pfs_parent
362    }
363
364    /// Borrow the pending counter-resynchronization table.
365    fn pending_counter_resync(&self) -> &LinearMap<PeerId, PendingCounterResync, PEERS> {
366        &self.pending_counter_resync
367    }
368
369    /// Mutably borrow the pending counter-resynchronization table.
370    fn pending_counter_resync_mut(
371        &mut self,
372    ) -> &mut LinearMap<PeerId, PendingCounterResync, PEERS> {
373        &mut self.pending_counter_resync
374    }
375}
376
377/// Per-channel operating-policy overrides enforced on outgoing traffic.
378///
379/// [`OperatingPolicy`] holds a small list of `ChannelPolicy` entries, one per channel that
380/// requires non-default behavior. When the coordinator builds a multicast or blind-unicast
381/// frame, it checks whether the target `channel_id` appears in this list and applies any
382/// overrides before sealing the packet.
383///
384/// Typical use cases:
385/// - **Unlicensed spectrum compliance** — force `require_unencrypted = true` for channels
386///   that must operate under Part 15 / ISM-band rules where encryption is permissible but
387///   the channel operator has chosen to run openly.
388/// - **Metadata reduction** — force `require_full_source = true` when receiving nodes need
389///   to resolve the sender without a prior key-exchange round-trip (e.g., a public beacon
390///   channel where all senders are first-contact).
391/// - **Propagation budget** — set `max_flood_hops` for high-density channels where
392///   uncontrolled flooding would waste airtime.
393///
394/// Channels absent from the policy list use the permissive defaults inherited from
395/// [`SendOptions`](crate::send::SendOptions).
396#[derive(Clone, Debug, PartialEq, Eq)]
397pub struct ChannelPolicy {
398    /// Channel to which this policy applies.
399    pub channel_id: ChannelId,
400    /// Whether the channel must be sent unencrypted.
401    pub require_unencrypted: bool,
402    /// Whether the channel requires the full source public key.
403    pub require_full_source: bool,
404    /// Optional maximum flood-hop budget.
405    pub max_flood_hops: Option<u8>,
406}
407
408/// Controls how the coordinator and optional repeater handle amateur-radio legal requirements.
409///
410/// Amateur (ham) radio law in most jurisdictions prohibits encrypted transmissions and requires
411/// station identification on all transmitted frames. UMSH supports three operating modes to
412/// accommodate networks that mix licensed and unlicensed nodes, or that operate exclusively
413/// under one regulatory regime.
414///
415/// | Mode | Encryption | Operator callsign | Repeater station callsign |
416/// |------|------------|------------------|--------------------------|
417/// | `Unlicensed` | Allowed | Optional | Not added |
418/// | `LicensedOnly` | Prohibited | Required | Required |
419/// | `Hybrid` | Allowed (local) | Optional | Added to forwarded frames |
420///
421/// The mode appears on both [`OperatingPolicy`] (for locally-originated traffic) and
422/// [`RepeaterConfig`] (for forwarding decisions) and they may differ independently — a node
423/// might transmit its own encrypted application traffic (`Unlicensed`) while acting as a
424/// licensed-identified repeater (`LicensedOnly`) for third-party frames it forwards.
425#[derive(Clone, Copy, Debug, PartialEq, Eq)]
426pub enum AmateurRadioMode {
427    /// Treat traffic as unlicensed operation only.
428    ///
429    /// Local transmit policy does not require operator callsigns or amateur-only
430    /// restrictions. Repeaters operating in this mode must not add a station
431    /// callsign when forwarding and should only retransmit packets that can be
432    /// handled under unlicensed rules.
433    Unlicensed,
434    /// Treat forwarded and locally originated traffic as amateur-only.
435    ///
436    /// Encryption and blind unicast are disallowed, operator callsigns are
437    /// required on originated packets, and repeaters must identify themselves
438    /// with a station callsign on forwarded traffic.
439    LicensedOnly,
440    /// Permit both unlicensed and amateur-qualified forwarding behavior.
441    ///
442    /// Local transmit policy remains permissive, but repeaters identify
443    /// forwarded packets with their station callsign and may still forward
444    /// packets lacking an operator callsign when they can do so under
445    /// unlicensed rules.
446    Hybrid,
447}
448
449#[derive(Clone, Copy, Debug, PartialEq, Eq)]
450enum TransmitAuthority {
451    Unlicensed,
452    Amateur,
453}
454
455#[derive(Clone, Copy, Debug, PartialEq, Eq)]
456enum ForwardStationAction {
457    Remove,
458    Replace,
459}
460
461#[derive(Clone, Copy, Debug, PartialEq, Eq)]
462struct ForwardPlan {
463    router_hint: RouterHint,
464    consume_source_route: bool,
465    decrement_flood_hops: bool,
466    insert_region_code: Option<[u8; 2]>,
467    delay_ms: u64,
468    station_action: ForwardStationAction,
469}
470
471/// Local transmission policy enforced by the [`Mac`] coordinator on all outgoing frames.
472///
473/// `OperatingPolicy` governs what the coordinator is *allowed to send*, independent of what
474/// the application requests. It is consulted at the start of every `queue_*` call via an
475/// internal policy check, which returns [`SendError::PolicyViolation`] if the requested send
476/// would violate it. This policy applies only to locally-originated frames; forwarding
477/// decisions are governed separately by [`RepeaterConfig`].
478///
479/// - **`amateur_radio_mode`** — determines whether encryption and blind-unicast are permitted
480///   and whether an operator callsign must be appended to originated frames.
481///   See [`AmateurRadioMode`].
482/// - **`operator_callsign`** — the ARNCE/HAM-64 callsign automatically appended to every
483///   locally-originated frame when set. Required in `LicensedOnly` mode; optional otherwise.
484/// - **`channel_policies`** — a small list of per-channel overrides for multicast and
485///   blind-unicast traffic. Channels absent from the list use permissive defaults.
486///
487/// The default configuration (via [`Default`]) sets `Unlicensed` mode with no callsign and
488/// no per-channel overrides.
489#[derive(Clone, Debug, PartialEq, Eq)]
490pub struct OperatingPolicy {
491    /// Amateur-radio operating mode.
492    pub amateur_radio_mode: AmateurRadioMode,
493    /// Optional local operator callsign.
494    pub operator_callsign: Option<HamAddr>,
495    /// Per-channel overrides.
496    pub channel_policies: Vec<ChannelPolicy, 4>,
497}
498
499impl Default for OperatingPolicy {
500    fn default() -> Self {
501        Self {
502            amateur_radio_mode: AmateurRadioMode::Unlicensed,
503            operator_callsign: None,
504            channel_policies: Vec::new(),
505        }
506    }
507}
508
509/// Configuration governing whether and how the node forwards received frames.
510///
511/// The UMSH MAC layer includes an optional built-in repeater that forwards packets it
512/// successfully receives, extending the effective range of the network without requiring
513/// dedicated infrastructure. `RepeaterConfig` controls every facet of that behavior:
514///
515/// - **`enabled`** — master on/off switch. When `false`, all inbound forwarding logic is
516///   skipped even if the other fields are populated.
517/// - **`regions`** — a local list of 2-byte ARNCE region codes used both for flood-forwarding
518///   eligibility checks and, when a flood-forwarded packet is untagged, as the local policy
519///   source for inserting a region code. When non-empty, packets carrying a non-matching region
520///   code are not flood-forwarded; when empty, forwarding does not impose a region check and the
521///   repeater has no local region to insert.
522/// - **`min_rssi` / `min_snr`** — signal-quality thresholds for flood forwarding. Packets
523///   received below these values are not flood-forwarded; this prevents marginal receptions
524///   from being re-injected into the network at full power, which would degrade SNR for
525///   nearby nodes rather than help. These thresholds do not apply to source-routed hops.
526/// - **Flood contention tuning** — controls the SNR-to-delay mapping used when several
527///   eligible repeaters contend to flood-forward the same frame. These values should usually
528///   remain aligned across the mesh.
529/// - **`amateur_radio_mode`** — determines whether the repeater may forward encrypted or
530///   blind-unicast frames, and whether it must inject a station callsign. See
531///   [`AmateurRadioMode`].
532/// - **`station_callsign`** — the ARNCE/HAM-64 callsign injected into the options block of
533///   every forwarded frame when operating in `LicensedOnly` or `Hybrid` mode, satisfying the
534///   third-party identification requirements of FCC §97.119 and equivalent regulations.
535///
536/// The default configuration has `enabled: false`; repeating must be explicitly opted in.
537#[derive(Clone, Debug, PartialEq, Eq)]
538pub struct RepeaterConfig {
539    /// Whether repeater forwarding is enabled.
540    pub enabled: bool,
541    /// Allowed repeater region codes.
542    pub regions: Vec<[u8; 2], 8>,
543    /// Minimum RSSI threshold for flood forwarding.
544    pub min_rssi: Option<i16>,
545    /// Minimum SNR threshold for flood forwarding.
546    pub min_snr: Option<i8>,
547    /// Lower clamp bound for SNR-based flood forwarding contention.
548    pub flood_contention_snr_low_db: i8,
549    /// Upper clamp bound for SNR-based flood forwarding contention.
550    pub flood_contention_snr_high_db: i8,
551    /// Minimum forwarding contention window as a percentage of `T_frame`.
552    pub flood_contention_min_window_percent: u8,
553    /// Maximum forwarding contention window as a multiple of `T_frame`.
554    pub flood_contention_max_window_frames: u8,
555    /// Maximum number of overheard-repeat deferrals before abandoning a pending forward.
556    pub flood_contention_max_deferrals: u8,
557    /// Amateur-radio operating mode for forwarding.
558    pub amateur_radio_mode: AmateurRadioMode,
559    /// Optional station callsign injected on forwarded traffic.
560    pub station_callsign: Option<HamAddr>,
561}
562
563impl Default for RepeaterConfig {
564    fn default() -> Self {
565        Self {
566            enabled: false,
567            regions: Vec::new(),
568            min_rssi: None,
569            min_snr: None,
570            flood_contention_snr_low_db: -6,
571            flood_contention_snr_high_db: 15,
572            flood_contention_min_window_percent: 20,
573            flood_contention_max_window_frames: 2,
574            flood_contention_max_deferrals: 3,
575            amateur_radio_mode: AmateurRadioMode::Unlicensed,
576            station_callsign: None,
577        }
578    }
579}
580
581/// Errors returned by the [`Mac`] coordinator when queueing an outbound send.
582///
583/// Returned synchronously by `queue_broadcast`, `queue_unicast`, `queue_multicast`, and
584/// related methods. An error here means the send could not be *enqueued* — it says nothing
585/// about the fate of frames already in the transmit queue.
586#[derive(Clone, Debug, PartialEq, Eq)]
587pub enum SendError {
588    /// The [`LocalIdentityId`] passed to the queue call does not correspond to an occupied
589    /// identity slot. This indicates the identity was never registered or was removed.
590    IdentityMissing,
591    /// The destination [`umsh_core::PublicKey`] is not present in the peer registry.
592    /// Register the peer first via [`Mac::add_peer`].
593    PeerMissing,
594    /// No cached pairwise session keys exist for the target peer on this identity.
595    /// This is only returned by the low-level `queue_*` APIs; the public async send APIs
596    /// derive and cache peer state automatically.
597    PairwiseKeysMissing,
598    /// The local identity failed to derive a shared secret for this peer.
599    IdentityAgreementFailed,
600    /// The target [`umsh_core::ChannelId`] is not present in the channel table.
601    /// Register the channel first via [`Mac::add_channel`] or [`Mac::add_named_channel`].
602    ChannelMissing,
603    /// The [`OperatingPolicy`] rejected this send — for example, attempting to send an
604    /// encrypted frame while operating in [`AmateurRadioMode::LicensedOnly`] mode.
605    PolicyViolation,
606    /// The requested packet type does not support transport ACKs (e.g., broadcast).
607    AckUnsupported,
608    /// The requested encryption mode is not valid for this packet type.
609    EncryptionUnsupported,
610    /// The requested salt option is not valid for this packet type.
611    SaltUnsupported,
612    /// The low-level packet builder failed, typically because the frame buffer is too small
613    /// for the requested options and payload.
614    Build(BuildError),
615    /// Packet parsing failed while reprocessing a freshly-built frame, indicating an
616    /// internal inconsistency in the packet construction logic.
617    Parse(ParseError),
618    /// The cryptographic seal operation failed. This typically indicates a mismatched key
619    /// length or an internal crypto engine error.
620    Crypto(CryptoError),
621    /// The transmit queue is at the configured `TX` capacity. Back off and retry after
622    /// the event loop has drained some entries.
623    QueueFull,
624    /// The in-flight ACK table for this identity is at the configured `ACKS` capacity.
625    /// Wait for an existing ACK-requested send to complete or time out before sending another.
626    PendingAckFull,
627    /// Secure sends are blocked because the live frame counter has reached the persisted
628    /// reservation boundary. Call [`Mac::service_counter_persistence`] to flush a new
629    /// boundary to the counter store before retrying.
630    CounterPersistenceLag,
631}
632
633impl From<BuildError> for SendError {
634    fn from(value: BuildError) -> Self {
635        Self::Build(value)
636    }
637}
638
639impl From<ParseError> for SendError {
640    fn from(value: ParseError) -> Self {
641        Self::Parse(value)
642    }
643}
644
645impl From<CryptoError> for SendError {
646    fn from(value: CryptoError) -> Self {
647        Self::Crypto(value)
648    }
649}
650
651/// Runtime errors produced by the [`Mac`] coordinator's async event loop.
652///
653/// Unlike [`SendError`], which is returned synchronously when *enqueueing* a send,
654/// `MacError` surfaces from the async methods (`next_event`,
655/// `service_counter_persistence`) that actually drive the coordinator forward.
656#[derive(Clone, Debug, PartialEq, Eq)]
657pub enum MacError<RadioError> {
658    /// The underlying [`umsh_hal::Radio`] driver returned an error during a receive or
659    /// channel-sense operation. The inner type is platform-specific (e.g., SPI fault on
660    /// embedded hardware, socket error on a UDP transport).
661    Radio(RadioError),
662    /// A transmit-phase error from the radio, such as channel-activity-detection (CAD)
663    /// exhaustion after [`MAX_CAD_ATTEMPTS`] retries. The frame was not sent.
664    Transmit(TxError<RadioError>),
665    /// An internal capacity invariant was violated: the coordinator needed to enqueue a
666    /// control frame (MAC ACK, forwarded packet) but the transmit queue was full.
667    /// Increase the `TX` const generic on [`Mac`] to give the queue more headroom.
668    QueueFull,
669}
670
671/// Errors returned while loading persisted frame-counter boundaries via [`Mac::load_persisted_counter`].
672///
673/// On startup, applications should call [`Mac::load_persisted_counter`] for each registered
674/// long-term identity to restore the safe starting point for the frame counter from
675/// non-volatile storage.
676#[derive(Clone, Debug, PartialEq, Eq)]
677pub enum CounterPersistenceError<StoreError> {
678    /// The [`LocalIdentityId`] supplied does not correspond to an occupied identity slot.
679    IdentityMissing,
680    /// The underlying [`umsh_hal::CounterStore`] read operation failed. The application
681    /// should decide whether to halt, retry, or start the counter at a conservatively high
682    /// value to avoid replay-window collisions with pre-reset traffic.
683    Store(StoreError),
684}
685
686impl<RadioError> From<RadioError> for MacError<RadioError> {
687    fn from(value: RadioError) -> Self {
688        Self::Radio(value)
689    }
690}
691
692impl<RadioError> From<TxError<RadioError>> for MacError<RadioError> {
693    fn from(value: TxError<RadioError>) -> Self {
694        Self::Transmit(value)
695    }
696}
697
698/// Central MAC coordinator that owns and drives the full UMSH radio-facing state machine.
699///
700/// `Mac` is the top-level entry point for UMSH protocol operation. It combines a radio driver,
701/// cryptographic engine, clock, RNG, counter store, and all protocol state into a single
702/// fully-typed, allocation-free structure. All const-generic capacity parameters are enforced
703/// at compile time via `heapless` collections — there are no heap allocations inside `Mac`.
704///
705/// ## Generic parameters
706///
707/// - **`P: Platform`** — a trait bundle supplying the concrete driver types for `Radio`,
708///   `Aes`/`Sha` (crypto), `Clock`, `Rng`, and `CounterStore`. Implement [`Platform`] once
709///   per deployment target to swap in real hardware drivers, software stubs, or test doubles.
710/// - **`IDENTITIES`** — maximum simultaneously active local identities (default
711///   [`DEFAULT_IDENTITIES`]).
712/// - **`PEERS`** — maximum known remote peers and their per-identity pairwise key entries
713///   (default [`DEFAULT_PEERS`]).
714/// - **`CHANNELS`** — maximum registered multicast channel keys (default
715///   [`DEFAULT_CHANNELS`]).
716/// - **`ACKS`** — maximum simultaneously in-flight ACK-requested sends per identity
717///   (default [`DEFAULT_ACKS`]).
718/// - **`TX`** — depth of the transmit queue (default [`DEFAULT_TX`]). Must be large enough
719///   to absorb a burst of control frames (MAC ACKs + forwarded frames) alongside any
720///   backlogged application sends.
721/// - **`FRAME`** — maximum byte length of a stored frame buffer for retransmission
722///   (default [`MAX_RESEND_FRAME_LEN`]).
723/// - **`DUP`** — capacity of the duplicate-detection cache (default [`DEFAULT_DUP`]).
724///
725/// ## Lifecycle
726///
727/// 1. **Construct** with [`Mac::new`], supplying concrete driver instances and policy.
728/// 2. **Register identities** via [`Mac::add_identity`]; call
729///    [`Mac::load_persisted_counter`] on each long-term identity to restore the safe
730///    frame-counter start point from non-volatile storage.
731/// 3. **Register peers** via [`Mac::add_peer`]. Secure unicast and blind-unicast state is
732///    derived lazily from the local private key and peer public key on first use.
733/// 4. **Register channels** via [`Mac::add_channel`] or [`Mac::add_named_channel`].
734/// 5. **Drive the event loop** via [`Mac::run`] / [`Mac::run_quiet`] for long-lived tasks,
735///    or by awaiting [`Mac::next_event`] when you need to multiplex MAC progress with other
736///    async work. The coordinator handles incoming frames, outgoing transmits, forwarding,
737///    ACK matching, retransmission scheduling, and timer deadlines — no external polling
738///    required.
739/// 6. **Send traffic** by calling `queue_broadcast`, `queue_unicast`, `queue_multicast`,
740///    etc. from application code between (or concurrent with) event-loop iterations.
741/// 7. **Persist counters** by calling [`Mac::service_counter_persistence`] whenever
742///    `next_event` signals that pending persistence work is ready to flush.
743///
744/// ## Example (pseudo-code)
745///
746/// ```rust,ignore
747/// let mut mac = Mac::<MyPlatform>::new(
748///     radio, crypto, clock, rng, counter_store,
749///     RepeaterConfig::default(), OperatingPolicy::default(),
750/// );
751/// let id = mac.add_identity(my_identity)?;
752/// mac.load_persisted_counter(id).await?;
753///
754/// mac.run(|id, event| {
755///     let _ = (id, event);
756///     // handle deliveries / ACKs here and schedule persistence work as needed
757/// }).await?;
758/// ```
759pub struct Mac<
760    P: Platform,
761    const IDENTITIES: usize = DEFAULT_IDENTITIES,
762    const PEERS: usize = DEFAULT_PEERS,
763    const CHANNELS: usize = DEFAULT_CHANNELS,
764    const ACKS: usize = DEFAULT_ACKS,
765    const TX: usize = DEFAULT_TX,
766    const FRAME: usize = MAX_RESEND_FRAME_LEN,
767    const DUP: usize = DEFAULT_DUP,
768> {
769    radio: P::Radio,
770    crypto: CryptoEngine<P::Aes, P::Sha>,
771    clock: P::Clock,
772    rng: P::Rng,
773    counter_store: P::CounterStore,
774    identities: Vec<Option<IdentitySlot<P::Identity, PEERS, ACKS, FRAME>>, IDENTITIES>,
775    peer_registry: PeerRegistry<PEERS>,
776    channels: ChannelTable<CHANNELS>,
777    dup_cache: DuplicateCache<DUP>,
778    multicast_unknown_dup_cache: DuplicateCache<DUP>,
779    tx_queue: TxQueue<TX, FRAME>,
780    post_tx_listen: Option<PostTxListen>,
781    repeater: RepeaterConfig,
782    operating_policy: OperatingPolicy,
783    auto_register_full_key_peers: bool,
784    deferred_counter_resync_frame: Option<DeferredCounterResyncFrame<FRAME>>,
785}
786
787impl<
788    P: Platform,
789    const IDENTITIES: usize,
790    const PEERS: usize,
791    const CHANNELS: usize,
792    const ACKS: usize,
793    const TX: usize,
794    const FRAME: usize,
795    const DUP: usize,
796> Mac<P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>
797{
798    /// Creates a MAC coordinator with the supplied radio, crypto, timing, and policy state.
799    pub fn new(
800        radio: P::Radio,
801        crypto: CryptoEngine<P::Aes, P::Sha>,
802        clock: P::Clock,
803        rng: P::Rng,
804        counter_store: P::CounterStore,
805        repeater: RepeaterConfig,
806        operating_policy: OperatingPolicy,
807    ) -> Self {
808        Self {
809            radio,
810            crypto,
811            clock,
812            rng,
813            counter_store,
814            identities: Vec::new(),
815            peer_registry: PeerRegistry::new(),
816            channels: ChannelTable::new(),
817            dup_cache: DuplicateCache::new(),
818            multicast_unknown_dup_cache: DuplicateCache::new(),
819            tx_queue: TxQueue::new(),
820            post_tx_listen: None,
821            repeater,
822            operating_policy,
823            auto_register_full_key_peers: false,
824            deferred_counter_resync_frame: None,
825        }
826    }
827
828    /// Borrow the underlying radio.
829    pub fn radio(&self) -> &P::Radio {
830        &self.radio
831    }
832
833    /// Mutably borrow the underlying radio.
834    pub fn radio_mut(&mut self) -> &mut P::Radio {
835        &mut self.radio
836    }
837
838    /// Borrow the crypto engine.
839    pub fn crypto(&self) -> &CryptoEngine<P::Aes, P::Sha> {
840        &self.crypto
841    }
842
843    /// Borrow the monotonic clock.
844    pub fn clock(&self) -> &P::Clock {
845        &self.clock
846    }
847
848    /// Borrow the RNG.
849    pub fn rng(&self) -> &P::Rng {
850        &self.rng
851    }
852
853    /// Mutably borrow the RNG.
854    pub fn rng_mut(&mut self) -> &mut P::Rng {
855        &mut self.rng
856    }
857
858    /// Borrow the counter store.
859    pub fn counter_store(&self) -> &P::CounterStore {
860        &self.counter_store
861    }
862    /// Borrow the transmit queue.
863    pub fn tx_queue(&self) -> &TxQueue<TX, FRAME> {
864        &self.tx_queue
865    }
866    /// Mutably borrow the transmit queue.
867    pub fn tx_queue_mut(&mut self) -> &mut TxQueue<TX, FRAME> {
868        &mut self.tx_queue
869    }
870    /// Borrow the duplicate cache.
871    pub fn dup_cache(&self) -> &DuplicateCache<DUP> {
872        &self.dup_cache
873    }
874    /// Borrow the peer registry.
875    pub fn peer_registry(&self) -> &PeerRegistry<PEERS> {
876        &self.peer_registry
877    }
878    /// Mutably borrow the peer registry.
879    pub fn peer_registry_mut(&mut self) -> &mut PeerRegistry<PEERS> {
880        &mut self.peer_registry
881    }
882    /// Borrow the channel table.
883    pub fn channels(&self) -> &ChannelTable<CHANNELS> {
884        &self.channels
885    }
886    /// Mutably borrow the channel table.
887    pub fn channels_mut(&mut self) -> &mut ChannelTable<CHANNELS> {
888        &mut self.channels
889    }
890    /// Borrow repeater configuration.
891    pub fn repeater_config(&self) -> &RepeaterConfig {
892        &self.repeater
893    }
894    /// Mutably borrow repeater configuration.
895    pub fn repeater_config_mut(&mut self) -> &mut RepeaterConfig {
896        &mut self.repeater
897    }
898    /// Borrow the local operating policy.
899    pub fn operating_policy(&self) -> &OperatingPolicy {
900        &self.operating_policy
901    }
902    /// Mutably borrow the local operating policy.
903    pub fn operating_policy_mut(&mut self) -> &mut OperatingPolicy {
904        &mut self.operating_policy
905    }
906
907    /// Return whether inbound secure packets carrying a full source key may auto-register peers.
908    pub fn auto_register_full_key_peers(&self) -> bool {
909        self.auto_register_full_key_peers
910    }
911
912    /// Enable or disable inbound full-key peer auto-registration.
913    pub fn set_auto_register_full_key_peers(&mut self, enabled: bool) {
914        self.auto_register_full_key_peers = enabled;
915    }
916
917    /// Register one long-term local identity.
918    pub fn add_identity(
919        &mut self,
920        identity: P::Identity,
921    ) -> Result<LocalIdentityId, CapacityError> {
922        self.insert_identity(LocalIdentity::LongTerm(identity), None)
923    }
924
925    /// Load the persisted frame-counter boundary for `id` from the counter store.
926    pub async fn load_persisted_counter(
927        &mut self,
928        id: LocalIdentityId,
929    ) -> Result<u32, CounterPersistenceError<<P::CounterStore as CounterStore>::Error>> {
930        let context = {
931            let slot = self
932                .identity(id)
933                .ok_or(CounterPersistenceError::IdentityMissing)?;
934            if !slot.counter_persistence_enabled() {
935                return Ok(slot.frame_counter());
936            }
937            *slot.identity().public_key()
938        };
939        let loaded = self
940            .counter_store
941            .load(&context.0)
942            .await
943            .map_err(CounterPersistenceError::Store)?;
944        let aligned = align_counter_boundary(loaded);
945        let slot = self
946            .identity_mut(id)
947            .ok_or(CounterPersistenceError::IdentityMissing)?;
948        slot.load_persisted_counter(aligned);
949        Ok(aligned)
950    }
951
952    /// Persist all currently scheduled frame-counter reservations.
953    pub async fn service_counter_persistence(
954        &mut self,
955    ) -> Result<usize, <P::CounterStore as CounterStore>::Error> {
956        let mut pending = Vec::<(LocalIdentityId, [u8; 32], u32), IDENTITIES>::new();
957        for (index, slot) in self.identities.iter().enumerate() {
958            let Some(slot) = slot.as_ref() else {
959                continue;
960            };
961            let Some(target) = slot.pending_persist_target() else {
962                continue;
963            };
964            if !slot.counter_persistence_enabled() {
965                continue;
966            }
967            pending
968                .push((
969                    LocalIdentityId(index as u8),
970                    slot.identity().public_key().0,
971                    target,
972                ))
973                .expect("identity enumeration must fit configured identity capacity");
974        }
975
976        let mut wrote = 0usize;
977        for (_, context, target) in pending.iter() {
978            self.counter_store
979                .store(context, align_counter_boundary(*target))
980                .await?;
981            wrote += 1;
982        }
983        if wrote > 0 {
984            self.counter_store.flush().await?;
985            for (id, _, target) in pending {
986                if let Some(slot) = self.identity_mut(id) {
987                    slot.mark_counter_persisted(target);
988                }
989            }
990        }
991        Ok(wrote)
992    }
993
994    #[cfg(feature = "software-crypto")]
995    /// Register an ephemeral software identity linked to `parent`.
996    pub fn register_ephemeral(
997        &mut self,
998        parent: LocalIdentityId,
999        identity: umsh_crypto::software::SoftwareIdentity,
1000    ) -> Result<LocalIdentityId, CapacityError> {
1001        self.insert_identity(LocalIdentity::Ephemeral(identity), Some(parent))
1002    }
1003
1004    #[cfg(feature = "software-crypto")]
1005    /// Remove an ephemeral identity slot if one exists at `id`.
1006    pub fn remove_ephemeral(&mut self, id: LocalIdentityId) -> bool {
1007        if let Some(slot) = self.identities.get_mut(id.0 as usize) {
1008            let should_remove = slot
1009                .as_ref()
1010                .map(|identity_slot| identity_slot.identity().is_ephemeral())
1011                .unwrap_or(false);
1012            if should_remove {
1013                *slot = None;
1014                return true;
1015            }
1016        }
1017        false
1018    }
1019
1020    /// Borrow an identity slot by identifier.
1021    pub fn identity(
1022        &self,
1023        id: LocalIdentityId,
1024    ) -> Option<&IdentitySlot<P::Identity, PEERS, ACKS, FRAME>> {
1025        self.identities.get(id.0 as usize)?.as_ref()
1026    }
1027
1028    /// Mutably borrow an identity slot by identifier.
1029    pub fn identity_mut(
1030        &mut self,
1031        id: LocalIdentityId,
1032    ) -> Option<&mut IdentitySlot<P::Identity, PEERS, ACKS, FRAME>> {
1033        self.identities.get_mut(id.0 as usize)?.as_mut()
1034    }
1035
1036    /// Registers or refreshes a known remote peer in the shared registry.
1037    pub fn add_peer(&mut self, key: PublicKey) -> Result<PeerId, CapacityError> {
1038        self.peer_registry.try_insert_or_update(key)
1039    }
1040
1041    /// Adds or updates a shared channel and derives its multicast keys.
1042    pub fn add_channel(&mut self, key: ChannelKey) -> Result<(), CapacityError> {
1043        let derived = self.crypto.derive_channel_keys(&key);
1044        self.channels.try_add(key, derived)
1045    }
1046
1047    /// Adds or updates a named channel using the coordinator's channel-key derivation.
1048    pub fn add_named_channel(&mut self, name: &str) -> Result<(), CapacityError> {
1049        let key = self.crypto.derive_named_channel_key(name);
1050        self.add_channel(key)
1051    }
1052
1053    /// Return the number of occupied identity slots.
1054    pub fn identity_count(&self) -> usize {
1055        self.identities.iter().filter(|slot| slot.is_some()).count()
1056    }
1057
1058    /// Installs pairwise transport keys for one local identity and remote peer.
1059    ///
1060    /// # Safety (logical)
1061    /// Installing wrong keys will silently corrupt the session. This method
1062    /// is crate-internal; external callers should use the `unsafe-advanced`
1063    /// feature or go through the node-layer PFS session manager.
1064    #[cfg(any(feature = "unsafe-advanced", test))]
1065    pub(crate) fn install_pairwise_keys(
1066        &mut self,
1067        identity_id: LocalIdentityId,
1068        peer_id: PeerId,
1069        pairwise_keys: PairwiseKeys,
1070    ) -> Result<Option<crate::peers::PeerCryptoState>, SendError> {
1071        let slot = self
1072            .identity_mut(identity_id)
1073            .ok_or(SendError::IdentityMissing)?;
1074        slot.peer_crypto_mut()
1075            .insert(
1076                peer_id,
1077                crate::peers::PeerCryptoState {
1078                    pairwise_keys,
1079                    replay_window: ReplayWindow::new(),
1080                },
1081            )
1082            .map_err(|_| SendError::QueueFull)
1083    }
1084
1085    /// Installs pairwise transport keys for one local identity and remote peer.
1086    ///
1087    /// # Safety (logical)
1088    /// Installing wrong keys will silently corrupt the session. This method
1089    /// is deliberately gated behind the `unsafe-advanced` feature. Prefer
1090    /// going through the node-layer PFS session manager instead.
1091    #[cfg(feature = "unsafe-advanced")]
1092    pub fn install_pairwise_keys_advanced(
1093        &mut self,
1094        identity_id: LocalIdentityId,
1095        peer_id: PeerId,
1096        pairwise_keys: PairwiseKeys,
1097    ) -> Result<Option<crate::peers::PeerCryptoState>, SendError> {
1098        self.install_pairwise_keys(identity_id, peer_id, pairwise_keys)
1099    }
1100
1101    /// Enqueues a broadcast frame for transmission.
1102    pub fn queue_broadcast(
1103        &mut self,
1104        from: LocalIdentityId,
1105        payload: &[u8],
1106        options: &SendOptions,
1107    ) -> Result<SendReceipt, SendError> {
1108        self.enforce_send_policy(None, options, false)?;
1109        if options.encrypted {
1110            return Err(SendError::EncryptionUnsupported);
1111        }
1112        if options.ack_requested {
1113            return Err(SendError::AckUnsupported);
1114        }
1115        if options.salt {
1116            return Err(SendError::SaltUnsupported);
1117        }
1118
1119        let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
1120        let source_key = *slot.identity().public_key();
1121        let receipt = slot.next_receipt();
1122        let mut buf = [0u8; FRAME];
1123        let builder = PacketBuilder::new(&mut buf).broadcast();
1124        let mut builder = if options.full_source {
1125            builder.source_full(&source_key)
1126        } else {
1127            builder.source_hint(source_key.hint())
1128        };
1129        if let Some(hops) = options.flood_hops {
1130            builder = builder.flood_hops(hops);
1131        }
1132        if options.trace_route {
1133            builder = builder.trace_route();
1134        }
1135        if let Some(route) = options.source_route.as_ref() {
1136            builder = builder.source_route(route.as_slice());
1137        }
1138        if let Some(region_code) = options.region_code {
1139            builder = builder.region_code(region_code);
1140        }
1141        if let Some(callsign) = self.operating_policy.operator_callsign {
1142            builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1143        }
1144        let frame = builder.payload(payload).build()?;
1145        if frame.len() > self.radio.max_frame_size() {
1146            return Err(SendError::Build(BuildError::BufferTooSmall));
1147        }
1148        self.tx_queue
1149            .enqueue(TxPriority::Application, frame, Some(receipt), Some(from))
1150            .map_err(|_| SendError::QueueFull)?;
1151        Ok(receipt)
1152    }
1153
1154    /// Enqueue a broadcast frame for transmission.
1155    pub async fn send_broadcast(
1156        &mut self,
1157        from: LocalIdentityId,
1158        payload: &[u8],
1159        options: &SendOptions,
1160    ) -> Result<SendReceipt, SendError> {
1161        self.queue_broadcast(from, payload, options)
1162    }
1163
1164    /// Enqueues a multicast frame using the configured channel keys.
1165    pub fn queue_multicast(
1166        &mut self,
1167        from: LocalIdentityId,
1168        channel_id: &ChannelId,
1169        payload: &[u8],
1170        options: &SendOptions,
1171    ) -> Result<SendReceipt, SendError> {
1172        self.enforce_send_policy(Some(*channel_id), options, false)?;
1173        if options.ack_requested {
1174            return Err(SendError::AckUnsupported);
1175        }
1176
1177        let derived = self
1178            .channels
1179            .lookup_by_id(channel_id)
1180            .next()
1181            .ok_or(SendError::ChannelMissing)?
1182            .derived
1183            .clone();
1184        let keys = PairwiseKeys {
1185            k_enc: derived.k_enc,
1186            k_mic: derived.k_mic,
1187        };
1188        let receipt = self
1189            .identity_mut(from)
1190            .ok_or(SendError::IdentityMissing)?
1191            .next_receipt();
1192        let (source_key, frame_counter) = self.identity_and_advance(from)?;
1193        let salt = self.take_salt(options);
1194        let mut buf = [0u8; FRAME];
1195        let builder = PacketBuilder::new(&mut buf).multicast(*channel_id);
1196        let builder = if options.full_source {
1197            builder.source_full(&source_key)
1198        } else {
1199            builder.source_hint(source_key.hint())
1200        };
1201        let mut builder = builder.frame_counter(frame_counter);
1202        if options.encrypted {
1203            builder = builder.encrypted();
1204        }
1205        builder = builder.mic_size(options.mic_size);
1206        if let Some(salt) = salt {
1207            builder = builder.salt(salt);
1208        }
1209        if let Some(hops) = options.flood_hops {
1210            builder = builder.flood_hops(hops);
1211        }
1212        if options.trace_route {
1213            builder = builder.trace_route();
1214        }
1215        if let Some(route) = options.source_route.as_ref() {
1216            builder = builder.source_route(route.as_slice());
1217        }
1218        if let Some(region_code) = options.region_code {
1219            builder = builder.region_code(region_code);
1220        }
1221        if let Some(callsign) = self.operating_policy.operator_callsign {
1222            builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1223        }
1224        let mut packet = builder.payload(payload).build()?;
1225        self.crypto.seal_packet(&mut packet, &keys)?;
1226        self.enqueue_packet(packet, Some(receipt), Some(from))?;
1227        Ok(receipt)
1228    }
1229
1230    /// Enqueue a multicast frame for transmission.
1231    pub async fn send_multicast(
1232        &mut self,
1233        from: LocalIdentityId,
1234        channel_id: &ChannelId,
1235        payload: &[u8],
1236        options: &SendOptions,
1237    ) -> Result<SendReceipt, SendError> {
1238        self.queue_multicast(from, channel_id, payload, options)
1239    }
1240
1241    /// Enqueues a MAC ACK frame, using any cached route to `peer_id` when available.
1242    pub fn queue_mac_ack_for_peer(
1243        &mut self,
1244        peer_id: PeerId,
1245        dst: NodeHint,
1246        ack_tag: [u8; 8],
1247    ) -> Result<(), SendError> {
1248        let mut buf = [0u8; FRAME];
1249        let mut builder = PacketBuilder::new(&mut buf).mac_ack(dst, ack_tag);
1250        if let Some(peer) = self.peer_registry.get(peer_id) {
1251            match peer.route.as_ref() {
1252                Some(CachedRoute::Source(route)) => {
1253                    builder = builder.source_route(route.as_slice());
1254                }
1255                Some(CachedRoute::Flood { hops }) => {
1256                    builder = builder.flood_hops((*hops).clamp(1, 15));
1257                }
1258                None => {}
1259            }
1260        }
1261        let frame = builder.build()?;
1262        if frame.len() > self.radio.max_frame_size() {
1263            return Err(SendError::Build(BuildError::BufferTooSmall));
1264        }
1265        self.tx_queue
1266            .enqueue(TxPriority::ImmediateAck, frame, None, None)
1267            .map_err(|_| SendError::QueueFull)?;
1268        Ok(())
1269    }
1270
1271    /// Enqueues an immediate direct MAC ACK frame.
1272    pub fn queue_mac_ack(&mut self, dst: NodeHint, ack_tag: [u8; 8]) -> Result<(), SendError> {
1273        let mut buf = [0u8; FRAME];
1274        let frame = PacketBuilder::new(&mut buf).mac_ack(dst, ack_tag).build()?;
1275        if frame.len() > self.radio.max_frame_size() {
1276            return Err(SendError::Build(BuildError::BufferTooSmall));
1277        }
1278        self.tx_queue
1279            .enqueue(TxPriority::ImmediateAck, frame, None, None)
1280            .map_err(|_| SendError::QueueFull)?;
1281        Ok(())
1282    }
1283
1284    /// Enqueues a unicast frame and optional pending-ACK state.
1285    pub fn queue_unicast(
1286        &mut self,
1287        from: LocalIdentityId,
1288        peer: &PublicKey,
1289        payload: &[u8],
1290        options: &SendOptions,
1291    ) -> Result<Option<SendReceipt>, SendError> {
1292        self.enforce_send_policy(None, options, false)?;
1293        let (peer_id, _) = self
1294            .peer_registry
1295            .lookup_by_key(peer)
1296            .ok_or(SendError::PeerMissing)?;
1297        let pairwise_keys = self
1298            .identity(from)
1299            .ok_or(SendError::IdentityMissing)?
1300            .peer_crypto()
1301            .get(&peer_id)
1302            .ok_or(SendError::PairwiseKeysMissing)?
1303            .pairwise_keys
1304            .clone();
1305        let effective_source_route = self.effective_source_route(peer_id, options);
1306
1307        let (source_key, frame_counter) = self.identity_and_advance(from)?;
1308        let salt = self.take_salt(options);
1309        let mut buf = [0u8; FRAME];
1310        let builder = PacketBuilder::new(&mut buf).unicast(peer.hint());
1311        let builder = if options.full_source {
1312            builder.source_full(&source_key)
1313        } else {
1314            builder.source_hint(source_key.hint())
1315        };
1316        let mut builder = builder.frame_counter(frame_counter);
1317        if options.ack_requested {
1318            builder = builder.ack_requested();
1319        }
1320        if options.encrypted {
1321            builder = builder.encrypted();
1322        }
1323        builder = builder.mic_size(options.mic_size);
1324        if let Some(salt) = salt {
1325            builder = builder.salt(salt);
1326        }
1327        if let Some(hops) = options.flood_hops {
1328            builder = builder.flood_hops(hops);
1329        }
1330        if options.trace_route {
1331            builder = builder.trace_route();
1332        }
1333        if let Some(route) = effective_source_route.as_ref() {
1334            builder = builder.source_route(route.as_slice());
1335        }
1336        if let Some(region_code) = options.region_code {
1337            builder = builder.region_code(region_code);
1338        }
1339        if let Some(callsign) = self.operating_policy.operator_callsign {
1340            builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1341        }
1342        let mut packet = builder.payload(payload).build()?;
1343
1344        let receipt = if options.ack_requested {
1345            Some(self.prepare_pending_ack(from, *peer, &packet, &pairwise_keys, options)?)
1346        } else {
1347            None
1348        };
1349
1350        self.crypto.seal_packet(&mut packet, &pairwise_keys)?;
1351        if let Some(receipt) = receipt {
1352            self.refresh_pending_resend(
1353                from,
1354                receipt,
1355                packet.as_bytes(),
1356                effective_source_route
1357                    .as_ref()
1358                    .map(|route| route.as_slice()),
1359            )?;
1360        }
1361        if let Err(err) = self.enqueue_packet(packet, receipt, Some(from)) {
1362            if let Some(receipt) = receipt {
1363                let _ = self
1364                    .identity_mut(from)
1365                    .and_then(|slot| slot.remove_pending_ack(&receipt));
1366            }
1367            return Err(err);
1368        }
1369        Ok(receipt)
1370    }
1371
1372    /// Enqueue a unicast frame for transmission, deriving secure peer state on first use.
1373    pub async fn send_unicast(
1374        &mut self,
1375        from: LocalIdentityId,
1376        peer: &PublicKey,
1377        payload: &[u8],
1378        options: &SendOptions,
1379    ) -> Result<Option<SendReceipt>, SendError> {
1380        let (peer_id, _) = self
1381            .peer_registry
1382            .lookup_by_key(peer)
1383            .ok_or(SendError::PeerMissing)?;
1384        let _ = self.ensure_peer_crypto(from, peer_id).await?;
1385        self.queue_unicast(from, peer, payload, options)
1386    }
1387
1388    /// Enqueues a blind-unicast frame and optional pending-ACK state.
1389    pub fn queue_blind_unicast(
1390        &mut self,
1391        from: LocalIdentityId,
1392        peer: &PublicKey,
1393        channel_id: &ChannelId,
1394        payload: &[u8],
1395        options: &SendOptions,
1396    ) -> Result<Option<SendReceipt>, SendError> {
1397        self.enforce_send_policy(Some(*channel_id), options, true)?;
1398        let (peer_id, _) = self
1399            .peer_registry
1400            .lookup_by_key(peer)
1401            .ok_or(SendError::PeerMissing)?;
1402        let pairwise_keys = self
1403            .identity(from)
1404            .ok_or(SendError::IdentityMissing)?
1405            .peer_crypto()
1406            .get(&peer_id)
1407            .ok_or(SendError::PairwiseKeysMissing)?
1408            .pairwise_keys
1409            .clone();
1410        let channel_keys = self
1411            .channels
1412            .lookup_by_id(channel_id)
1413            .next()
1414            .ok_or(SendError::ChannelMissing)?
1415            .derived
1416            .clone();
1417        let blind_keys = self.crypto.derive_blind_keys(&pairwise_keys, &channel_keys);
1418        let effective_source_route = self.effective_source_route(peer_id, options);
1419
1420        let (source_key, frame_counter) = self.identity_and_advance(from)?;
1421        let salt = self.take_salt(options);
1422        let mut buf = [0u8; FRAME];
1423        let builder = PacketBuilder::new(&mut buf).blind_unicast(*channel_id, peer.hint());
1424        let builder = if options.full_source {
1425            builder.source_full(&source_key)
1426        } else {
1427            builder.source_hint(source_key.hint())
1428        };
1429        let mut builder = builder.frame_counter(frame_counter);
1430        if options.ack_requested {
1431            builder = builder.ack_requested();
1432        }
1433        if !options.encrypted {
1434            builder = builder.unencrypted();
1435        }
1436        builder = builder.mic_size(options.mic_size);
1437        if let Some(salt) = salt {
1438            builder = builder.salt(salt);
1439        }
1440        if let Some(hops) = options.flood_hops {
1441            builder = builder.flood_hops(hops);
1442        }
1443        if options.trace_route {
1444            builder = builder.trace_route();
1445        }
1446        if let Some(route) = effective_source_route.as_ref() {
1447            builder = builder.source_route(route.as_slice());
1448        }
1449        if let Some(region_code) = options.region_code {
1450            builder = builder.region_code(region_code);
1451        }
1452        if let Some(callsign) = self.operating_policy.operator_callsign {
1453            builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1454        }
1455        let mut packet = builder.payload(payload).build()?;
1456
1457        let receipt = if options.ack_requested {
1458            Some(self.prepare_pending_ack(from, *peer, &packet, &blind_keys, options)?)
1459        } else {
1460            None
1461        };
1462
1463        self.crypto
1464            .seal_blind_packet(&mut packet, &blind_keys, &channel_keys)
1465            .map_err(SendError::Crypto)?;
1466        if let Some(receipt) = receipt {
1467            self.refresh_pending_resend(
1468                from,
1469                receipt,
1470                packet.as_bytes(),
1471                effective_source_route
1472                    .as_ref()
1473                    .map(|route| route.as_slice()),
1474            )?;
1475        }
1476        if let Err(err) = self.enqueue_packet(packet, receipt, Some(from)) {
1477            if let Some(receipt) = receipt {
1478                let _ = self
1479                    .identity_mut(from)
1480                    .and_then(|slot| slot.remove_pending_ack(&receipt));
1481            }
1482            return Err(err);
1483        }
1484        Ok(receipt)
1485    }
1486
1487    /// Enqueue a blind-unicast frame for transmission, deriving secure peer state on first use.
1488    pub async fn send_blind_unicast(
1489        &mut self,
1490        from: LocalIdentityId,
1491        peer: &PublicKey,
1492        channel_id: &ChannelId,
1493        payload: &[u8],
1494        options: &SendOptions,
1495    ) -> Result<Option<SendReceipt>, SendError> {
1496        let (peer_id, _) = self
1497            .peer_registry
1498            .lookup_by_key(peer)
1499            .ok_or(SendError::PeerMissing)?;
1500        let _ = self.ensure_peer_crypto(from, peer_id).await?;
1501        self.queue_blind_unicast(from, peer, channel_id, payload, options)
1502    }
1503
1504    /// Transmit the next eligible queued frame, if any.
1505    ///
1506    /// While a post-transmit forwarding listen window is active, only immediate MAC
1507    /// ACK traffic is permitted to bypass the listen state. Forwarded sends arm a new
1508    /// listen window after the radio transmit completes. Non-immediate traffic honors
1509    /// queued CAD backoff state and gives up after the configured maximum number of
1510    /// CAD attempts.
1511    pub async fn transmit_next(
1512        &mut self,
1513        on_event: &mut impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1514    ) -> Result<Option<SendReceipt>, MacError<<P::Radio as Radio>::Error>> {
1515        self.expire_post_tx_listen_if_needed();
1516        let Some(queued) = self.tx_queue.pop_next() else {
1517            return Ok(None);
1518        };
1519        let now_ms = self.clock.now_ms();
1520
1521        if queued.not_before_ms > now_ms {
1522            self.requeue_tx(&queued).map_err(|_| MacError::QueueFull)?;
1523            return Ok(None);
1524        }
1525
1526        if self.post_tx_listen.is_some() && queued.priority != TxPriority::ImmediateAck {
1527            self.requeue_tx(&queued).map_err(|_| MacError::QueueFull)?;
1528            return Ok(None);
1529        }
1530
1531        let receipt = queued.receipt;
1532        let identity_id = queued.identity_id;
1533        let tx_options = if queued.priority == TxPriority::ImmediateAck {
1534            TxOptions::default()
1535        } else {
1536            TxOptions {
1537                cad_timeout_ms: Some(0),
1538            }
1539        };
1540        match self
1541            .radio
1542            .transmit(queued.frame.as_slice(), tx_options)
1543            .await
1544        {
1545            Ok(()) => {}
1546            Err(TxError::CadTimeout) => {
1547                let next_attempt = queued.cad_attempts.saturating_add(1);
1548                if next_attempt >= MAX_CAD_ATTEMPTS {
1549                    return Ok(None);
1550                }
1551                let backoff_ms = u64::from(
1552                    self.rng
1553                        .random_range(..self.radio.t_frame_ms().saturating_add(1)),
1554                );
1555                self.tx_queue
1556                    .enqueue_with_state(
1557                        queued.priority,
1558                        queued.frame.as_slice(),
1559                        queued.receipt,
1560                        queued.identity_id,
1561                        now_ms.saturating_add(backoff_ms),
1562                        next_attempt,
1563                        queued.forward_deferrals,
1564                    )
1565                    .map_err(|_| MacError::QueueFull)?;
1566                return Ok(None);
1567            }
1568            Err(error) => return Err(MacError::Transmit(error)),
1569        }
1570        if let Some(identity_id) = identity_id {
1571            on_event(
1572                identity_id,
1573                crate::MacEventRef::Transmitted {
1574                    identity_id,
1575                    receipt,
1576                },
1577            );
1578        }
1579        if let Some(receipt) = receipt {
1580            self.note_transmitted_ack_requested(receipt, queued.frame.as_slice());
1581        }
1582        Ok(receipt)
1583    }
1584
1585    /// Keep transmitting until the queue is empty.
1586    ///
1587    /// Progress stops when CAD keeps reporting busy, when a post-transmit listen window blocks
1588    /// normal traffic, or when the queue is otherwise unable to shrink further in the current cycle.
1589    pub async fn drain_tx_queue(
1590        &mut self,
1591        on_event: &mut impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1592    ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1593        while !self.tx_queue.is_empty() {
1594            let queue_len = self.tx_queue.len();
1595            let _ = self.transmit_next(on_event).await?;
1596            if self.tx_queue.len() >= queue_len {
1597                break;
1598            }
1599        }
1600        Ok(())
1601    }
1602
1603    /// Runs one coordinator cycle over the current MAC state.
1604    ///
1605    /// The cycle performs four ordered phases:
1606    ///
1607    /// 1. Drain any queued transmit work.
1608    /// 2. Receive and process at most one inbound frame.
1609    /// 3. Drain any immediate ACK generated during receive handling.
1610    /// 4. Service pending ACK timers and emit timeout events.
1611    ///
1612    /// The callback may be invoked zero or more times depending on what the
1613    /// receive and timeout phases accept or resolve.
1614    /// Service one MAC coordinator cycle.
1615    pub async fn poll_cycle(
1616        &mut self,
1617        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1618    ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1619        self.drain_tx_queue(&mut on_event).await?;
1620        if self.post_tx_listen.is_some() {
1621            self.service_post_tx_listen(&mut on_event).await?;
1622        } else {
1623            let _ = self.receive_one(&mut on_event).await?;
1624        }
1625        self.drain_tx_queue(&mut on_event).await?;
1626        self.service_pending_ack_timeouts(&mut on_event)
1627            .map_err(|_| MacError::QueueFull)?;
1628        Ok(())
1629    }
1630
1631    /// Compute the earliest deadline across all coordinator timers.
1632    ///
1633    /// Returns `None` when there are no pending timers.  The returned value
1634    /// covers pending ACK deadlines (both `ack_deadline_ms` and forwarding
1635    /// `confirm_deadline_ms`), the post-transmit listen window, and deferred
1636    /// transmit-queue entries.
1637    pub fn earliest_deadline_ms(&self) -> Option<u64> {
1638        let mut earliest: Option<u64> = None;
1639
1640        if let Some(listen) = &self.post_tx_listen {
1641            earliest =
1642                Some(earliest.map_or(listen.deadline_ms, |e: u64| e.min(listen.deadline_ms)));
1643        }
1644
1645        for slot in self.identities.iter().filter_map(|s| s.as_ref()) {
1646            for (_, pending) in slot.pending_acks.iter() {
1647                if !matches!(pending.state, crate::AckState::Queued { .. }) {
1648                    earliest = Some(earliest.map_or(pending.ack_deadline_ms, |e: u64| {
1649                        e.min(pending.ack_deadline_ms)
1650                    }));
1651                }
1652                if let crate::AckState::AwaitingForward {
1653                    confirm_deadline_ms,
1654                } = pending.state
1655                {
1656                    earliest = Some(
1657                        earliest.map_or(confirm_deadline_ms, |e: u64| e.min(confirm_deadline_ms)),
1658                    );
1659                }
1660            }
1661        }
1662
1663        if let Some(nb) = self.tx_queue.earliest_not_before_ms() {
1664            earliest = Some(earliest.map_or(nb, |e: u64| e.min(nb)));
1665        }
1666
1667        earliest
1668    }
1669
1670    /// Run the coordinator's event loop until at least one event is delivered
1671    /// or a timer-driven action (retransmit, timeout) is processed.
1672    ///
1673    /// Unlike [`poll_cycle`](Self::poll_cycle), this method properly awaits the
1674    /// radio and timer deadlines instead of returning immediately when nothing
1675    /// is ready.  Callers can use `tokio::select!` (or equivalent) to multiplex
1676    /// user input alongside MAC events:
1677    ///
1678    /// ```ignore
1679    /// loop {
1680    ///     tokio::select! {
1681    ///         line = stdin.next_line() => { /* handle input */ }
1682    ///         result = mac.next_event(|id, event| { /* handle event */ }) => {
1683    ///             result?;
1684    ///         }
1685    ///     }
1686    /// }
1687    /// ```
1688    pub async fn next_event(
1689        &mut self,
1690        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1691    ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1692        loop {
1693            // Phase 1: Drain ready transmit work.
1694            self.drain_tx_queue(&mut on_event).await?;
1695
1696            // Phase 2: Wait for a radio frame or the earliest timer deadline.
1697            let mut buf = [0u8; FRAME];
1698            enum WakeReason {
1699                Received(RxInfo),
1700                TimerExpired,
1701            }
1702            let reason = poll_fn(|cx| {
1703                // Try to receive a frame (non-blocking poll).
1704                match self.radio.poll_receive(cx, &mut buf) {
1705                    Poll::Ready(Ok(rx)) => return Poll::Ready(Ok(WakeReason::Received(rx))),
1706                    Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
1707                    Poll::Pending => {}
1708                }
1709
1710                // Check whether any timer has expired.
1711                let now_ms = self.clock.now_ms();
1712                if let Some(deadline) = self.earliest_deadline_ms() {
1713                    if now_ms >= deadline {
1714                        return Poll::Ready(Ok(WakeReason::TimerExpired));
1715                    }
1716                    // Register for wakeup at the earliest deadline.
1717                    let _ = self.clock.poll_delay_until(cx, deadline);
1718                }
1719
1720                // Check for tx_queue items that became ready.
1721                if self.tx_queue.has_ready(now_ms) {
1722                    return Poll::Ready(Ok(WakeReason::TimerExpired));
1723                }
1724
1725                Poll::Pending
1726            })
1727            .await
1728            .map_err(MacError::Radio)?;
1729
1730            // Phase 3: Process what woke us up.
1731            match reason {
1732                WakeReason::Received(rx) => {
1733                    let frame_len = rx.len.min(buf.len());
1734                    let _ = self
1735                        .process_received_frame(&mut buf, frame_len, &rx, &mut on_event)
1736                        .await;
1737                }
1738                WakeReason::TimerExpired => {
1739                    // Timers are handled in phase 4 below.
1740                }
1741            }
1742
1743            // Phase 4: Drain any immediate ACKs generated during receive.
1744            self.drain_tx_queue(&mut on_event).await?;
1745
1746            // Phase 5: Service pending ACK timers.
1747            self.service_pending_ack_timeouts(&mut on_event)
1748                .map_err(|_| MacError::QueueFull)?;
1749
1750            // If the tx_queue has new work (e.g. retransmits just enqueued),
1751            // loop back to drain it before waiting again.
1752            if !self.tx_queue.is_empty() {
1753                continue;
1754            }
1755
1756            return Ok(());
1757        }
1758    }
1759
1760    /// Drive the coordinator forever, invoking `on_event` for each delivered event.
1761    ///
1762    /// This is the preferred long-lived run loop for standalone MAC-driven tasks such as
1763    /// repeaters or dedicated radio services. Unlike manually calling
1764    /// [`poll_cycle`](Self::poll_cycle) in a loop, `run` keeps the wake/sleep policy inside
1765    /// the coordinator by delegating to [`next_event`](Self::next_event), which already
1766    /// waits for radio activity and protocol deadlines.
1767    pub async fn run(
1768        &mut self,
1769        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1770    ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1771        loop {
1772            self.next_event(&mut on_event).await?;
1773        }
1774    }
1775
1776    /// Drive the coordinator forever while ignoring emitted events.
1777    ///
1778    /// Useful for standalone repeaters or bridge tasks that do not need to observe inbound
1779    /// deliveries directly but still need the coordinator to service forwarding, ACKs, and
1780    /// retransmissions without an app-owned polling loop.
1781    pub async fn run_quiet(&mut self) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1782        self.run(|_, _| {}).await
1783    }
1784
1785    /// Process a received frame, dispatching events through `on_event`.
1786    ///
1787    /// This is the shared implementation used by both [`receive_one`](Self::receive_one)
1788    /// and [`next_event`](Self::next_event).  Returns `true` when the frame
1789    /// produced at least one event or side-effect.
1790    async fn process_received_frame(
1791        &mut self,
1792        buf: &mut [u8; FRAME],
1793        frame_len: usize,
1794        rx: &RxInfo,
1795        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1796    ) -> bool {
1797        let received_at_ms = self.clock.now_ms();
1798        let mut current_len = frame_len;
1799        let mut current_rx = RxInfo {
1800            len: frame_len,
1801            rssi: rx.rssi,
1802            snr: rx.snr,
1803            lqi: rx.lqi,
1804        };
1805        let mut current_received_at_ms = received_at_ms;
1806        let mut handled_any = false;
1807
1808        loop {
1809            let Ok(header) = PacketHeader::parse(&buf[..current_len]) else {
1810                return handled_any;
1811            };
1812            let forwarding_confirmed = if let Some((identity_id, receipt)) =
1813                self.observe_forwarding_confirmation(&buf[..current_len])
1814            {
1815                let hint = match header.source {
1816                    SourceAddrRef::Hint(h) => Some(RouterHint([h.0[0], h.0[1]])),
1817                    SourceAddrRef::FullKeyAt { offset } => {
1818                        let mut key_bytes = [0u8; 32];
1819                        key_bytes.copy_from_slice(&buf[offset..offset + 32]);
1820                        let h = PublicKey(key_bytes).hint();
1821                        Some(RouterHint([h.0[0], h.0[1]]))
1822                    }
1823                    _ => None,
1824                };
1825                on_event(
1826                    identity_id,
1827                    crate::MacEventRef::Forwarded {
1828                        identity_id,
1829                        receipt,
1830                        hint,
1831                    },
1832                );
1833                true
1834            } else {
1835                false
1836            };
1837
1838            let (handled, replay_target) = match header.packet_type() {
1839                PacketType::Broadcast => (
1840                    self.process_broadcast(
1841                        buf,
1842                        current_len,
1843                        &header,
1844                        &current_rx,
1845                        current_received_at_ms,
1846                        &mut on_event,
1847                    ),
1848                    None,
1849                ),
1850                PacketType::MacAck => (
1851                    self.process_mac_ack(
1852                        buf,
1853                        current_len,
1854                        &header,
1855                        &current_rx,
1856                        forwarding_confirmed,
1857                        &mut on_event,
1858                    ),
1859                    None,
1860                ),
1861                PacketType::Unicast | PacketType::UnicastAckReq => {
1862                    self.process_unicast(
1863                        buf,
1864                        current_len,
1865                        &header,
1866                        &current_rx,
1867                        current_received_at_ms,
1868                        forwarding_confirmed,
1869                        &mut on_event,
1870                    )
1871                    .await
1872                }
1873                PacketType::Multicast => (
1874                    self.process_multicast(
1875                        buf,
1876                        current_len,
1877                        &header,
1878                        &current_rx,
1879                        current_received_at_ms,
1880                        forwarding_confirmed,
1881                        &mut on_event,
1882                    ),
1883                    None,
1884                ),
1885                PacketType::BlindUnicast | PacketType::BlindUnicastAckReq => {
1886                    self.process_blind_unicast(
1887                        buf,
1888                        current_len,
1889                        &header,
1890                        &current_rx,
1891                        current_received_at_ms,
1892                        forwarding_confirmed,
1893                        &mut on_event,
1894                    )
1895                    .await
1896                }
1897                PacketType::Reserved5 => (false, None),
1898            };
1899            handled_any |= handled;
1900
1901            let Some((local_id, peer_id)) = replay_target else {
1902                return handled_any;
1903            };
1904            let Some(deferred) = self.take_deferred_counter_resync_frame(local_id, peer_id) else {
1905                return handled_any;
1906            };
1907            current_len = deferred.frame.len();
1908            buf[..current_len].copy_from_slice(deferred.frame.as_slice());
1909            current_rx = RxInfo {
1910                len: current_len,
1911                rssi: deferred.rssi,
1912                snr: deferred.snr,
1913                lqi: deferred.lqi,
1914            };
1915            current_received_at_ms = deferred.received_at_ms;
1916        }
1917    }
1918
1919    fn process_broadcast(
1920        &mut self,
1921        buf: &[u8; FRAME],
1922        frame_len: usize,
1923        header: &PacketHeader,
1924        rx: &RxInfo,
1925        received_at_ms: u64,
1926        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1927    ) -> bool {
1928        let Some((from_hint, from_key)) = Self::resolve_broadcast_source(&buf[..frame_len], header)
1929        else {
1930            return false;
1931        };
1932        if !Self::payload_is_allowed(header.packet_type(), &buf[header.body_range.clone()]) {
1933            return false;
1934        }
1935        let mut delivered = false;
1936        for (index, slot) in self.identities.iter().enumerate() {
1937            if slot.is_none() {
1938                continue;
1939            }
1940            delivered = true;
1941            on_event(
1942                LocalIdentityId(index as u8),
1943                crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
1944                    &buf[..frame_len],
1945                    &buf[header.body_range.clone()],
1946                    header.clone(),
1947                    ParsedOptions::extract(&buf[..frame_len], header.options_range.clone())
1948                        .unwrap_or_default(),
1949                    from_key,
1950                    Some(from_hint),
1951                    false,
1952                    None,
1953                    crate::send::RxMetadata::new(
1954                        Some(rx.rssi),
1955                        Some(rx.snr),
1956                        rx.lqi,
1957                        Some(received_at_ms),
1958                    ),
1959                )),
1960            );
1961        }
1962        // Broadcast delivery does not consume the packet. Broadcast remains a
1963        // routable mesh packet and may still be forwarded by a repeater after
1964        // local delivery.
1965        let forwarded = self.maybe_forward_received(&buf[..frame_len], header, rx, false);
1966        delivered || forwarded
1967    }
1968
1969    fn process_mac_ack(
1970        &mut self,
1971        buf: &[u8; FRAME],
1972        frame_len: usize,
1973        header: &PacketHeader,
1974        rx: &RxInfo,
1975        forwarding_confirmed: bool,
1976        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1977    ) -> bool {
1978        let Some(ack_dst) = header.ack_dst else {
1979            return false;
1980        };
1981        let target_peer = self
1982            .identities
1983            .iter()
1984            .filter_map(|slot| slot.as_ref())
1985            .find(|slot| slot.identity().public_key().hint() == ack_dst)
1986            .and_then(|slot| self.match_pending_peer_for_ack(slot, &buf[header.mic_range.clone()]));
1987        if let Some(target_peer) = target_peer {
1988            let mut ack_tag = [0u8; 8];
1989            ack_tag.copy_from_slice(&buf[header.mic_range.clone()]);
1990            if let Some((identity_id, receipt)) = self.complete_ack(&target_peer, &ack_tag) {
1991                on_event(
1992                    identity_id,
1993                    crate::MacEventRef::AckReceived {
1994                        peer: target_peer,
1995                        receipt,
1996                    },
1997                );
1998                return true;
1999            }
2000        }
2001        forwarding_confirmed || self.maybe_forward_received(&buf[..frame_len], header, rx, false)
2002    }
2003
2004    async fn process_unicast(
2005        &mut self,
2006        buf: &mut [u8; FRAME],
2007        frame_len: usize,
2008        header: &PacketHeader,
2009        rx: &RxInfo,
2010        received_at_ms: u64,
2011        forwarding_confirmed: bool,
2012        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2013    ) -> (bool, Option<(LocalIdentityId, PeerId)>) {
2014        let mut original = [0u8; FRAME];
2015        original[..frame_len].copy_from_slice(&buf[..frame_len]);
2016        let mut replay_target = None;
2017        let handled = if let Some(local_id) = self.find_local_identity_for_dst(header.dst) {
2018            let mut handled = false;
2019            for (peer_id, peer_key) in
2020                self.resolve_source_peer_candidates(&buf[..frame_len], header)
2021            {
2022                let Ok(keys) = self.ensure_peer_crypto(local_id, peer_id).await else {
2023                    continue;
2024                };
2025                let Ok(body_range) = self
2026                    .crypto
2027                    .open_packet(&mut buf[..frame_len], header, &keys)
2028                else {
2029                    continue;
2030                };
2031                let payload = &buf[body_range.clone()];
2032                if !Self::payload_is_allowed(header.packet_type(), payload) {
2033                    continue;
2034                }
2035                match self.unicast_replay_verdict(local_id, peer_id, header, &buf[..frame_len]) {
2036                    Some(ReplayVerdict::Accept) => {
2037                        let _ = self.accept_unicast_replay(
2038                            local_id,
2039                            peer_id,
2040                            header,
2041                            &buf[..frame_len],
2042                        );
2043                    }
2044                    Some(ReplayVerdict::OutOfWindow | ReplayVerdict::Stale) => {
2045                        if self.try_accept_counter_resync_response(
2046                            local_id,
2047                            peer_id,
2048                            header,
2049                            &buf[..frame_len],
2050                            payload,
2051                        ) {
2052                            replay_target = Some((local_id, peer_id));
2053                        } else {
2054                            self.store_deferred_counter_resync_frame(
2055                                local_id,
2056                                peer_id,
2057                                &original[..frame_len],
2058                                rx,
2059                                received_at_ms,
2060                            );
2061                            self.maybe_request_counter_resync(local_id, peer_id, peer_key)
2062                                .await;
2063                            continue;
2064                        }
2065                    }
2066                    Some(ReplayVerdict::Replay) | None => continue,
2067                }
2068                self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2069
2070                if header.ack_requested()
2071                    && self.should_emit_destination_ack(&buf[..frame_len], header)
2072                {
2073                    let ack_tag = self.compute_received_ack_tag(
2074                        &buf[..frame_len],
2075                        header,
2076                        body_range.clone(),
2077                        &keys,
2078                    );
2079                    self.queue_mac_ack_for_peer(peer_id, peer_key.hint(), ack_tag)
2080                        .ok();
2081                }
2082
2083                if let Some(data) = Self::echo_request_data(payload) {
2084                    let response =
2085                        Self::build_echo_command_payload(MAC_COMMAND_ECHO_RESPONSE_ID, data);
2086                    let _ = self
2087                        .send_unicast(
2088                            local_id,
2089                            &peer_key,
2090                            response.as_slice(),
2091                            &SendOptions::default(),
2092                        )
2093                        .await;
2094                }
2095
2096                on_event(
2097                    local_id,
2098                    crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2099                        &original[..frame_len],
2100                        &buf[body_range],
2101                        header.clone(),
2102                        ParsedOptions::extract(
2103                            &original[..frame_len],
2104                            header.options_range.clone(),
2105                        )
2106                        .unwrap_or_default(),
2107                        Some(peer_key),
2108                        Some(peer_key.hint()),
2109                        true,
2110                        None,
2111                        crate::send::RxMetadata::new(
2112                            Some(rx.rssi),
2113                            Some(rx.snr),
2114                            rx.lqi,
2115                            Some(received_at_ms),
2116                        ),
2117                    )),
2118                );
2119                handled = true;
2120                break;
2121            }
2122            handled
2123        } else {
2124            false
2125        };
2126        let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, handled);
2127        (
2128            handled || forwarding_confirmed || forwarded,
2129            handled.then_some(()).and(replay_target),
2130        )
2131    }
2132
2133    fn process_multicast(
2134        &mut self,
2135        buf: &mut [u8; FRAME],
2136        frame_len: usize,
2137        header: &PacketHeader,
2138        rx: &RxInfo,
2139        received_at_ms: u64,
2140        forwarding_confirmed: bool,
2141        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2142    ) -> bool {
2143        let mut original = [0u8; FRAME];
2144        original[..frame_len].copy_from_slice(&buf[..frame_len]);
2145        let delivered = if let Some(channel_id) = header.channel {
2146            let channel_info = {
2147                self.channels
2148                    .lookup_by_id(&channel_id)
2149                    .next()
2150                    .map(|channel| (channel.channel_key.clone(), channel.derived.clone()))
2151            };
2152            if let Some((channel_key, derived)) = channel_info {
2153                let keys = PairwiseKeys {
2154                    k_enc: derived.k_enc,
2155                    k_mic: derived.k_mic,
2156                };
2157                if let Ok(body_range) =
2158                    self.crypto
2159                        .open_packet(&mut buf[..frame_len], header, &keys)
2160                {
2161                    if !Self::payload_is_allowed(header.packet_type(), &buf[body_range.clone()]) {
2162                        false
2163                    } else if let Some(source) =
2164                        self.resolve_multicast_source(&buf[..frame_len], header)
2165                    {
2166                        let accepted = if let Some(peer_id) = source.peer_id {
2167                            let accepted = self.accept_multicast_replay(
2168                                channel_id,
2169                                peer_id,
2170                                header,
2171                                &buf[..frame_len],
2172                            );
2173                            if accepted {
2174                                self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2175                            }
2176                            accepted
2177                        } else {
2178                            self.accept_unknown_multicast_replay(header, &buf[..frame_len])
2179                        };
2180                        if accepted {
2181                            let mut delivered = false;
2182                            for (index, slot) in self.identities.iter().enumerate() {
2183                                if slot.is_none() {
2184                                    continue;
2185                                }
2186                                delivered = true;
2187                                on_event(
2188                                    LocalIdentityId(index as u8),
2189                                    crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2190                                        &original[..frame_len],
2191                                        &buf[body_range.clone()],
2192                                        header.clone(),
2193                                        ParsedOptions::extract(
2194                                            &original[..frame_len],
2195                                            header.options_range.clone(),
2196                                        )
2197                                        .unwrap_or_default(),
2198                                        source.public_key,
2199                                        source
2200                                            .hint
2201                                            .or_else(|| source.public_key.map(|key| key.hint())),
2202                                        true,
2203                                        Some(crate::ChannelInfoRef {
2204                                            id: channel_id,
2205                                            key: &channel_key,
2206                                        }),
2207                                        crate::send::RxMetadata::new(
2208                                            Some(rx.rssi),
2209                                            Some(rx.snr),
2210                                            rx.lqi,
2211                                            Some(received_at_ms),
2212                                        ),
2213                                    )),
2214                                );
2215                            }
2216                            delivered
2217                        } else {
2218                            false
2219                        }
2220                    } else {
2221                        false
2222                    }
2223                } else {
2224                    false
2225                }
2226            } else {
2227                false
2228            }
2229        } else {
2230            false
2231        };
2232        let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, false);
2233        delivered || forwarding_confirmed || forwarded
2234    }
2235
2236    async fn process_blind_unicast(
2237        &mut self,
2238        buf: &mut [u8; FRAME],
2239        frame_len: usize,
2240        header: &PacketHeader,
2241        rx: &RxInfo,
2242        received_at_ms: u64,
2243        forwarding_confirmed: bool,
2244        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2245    ) -> (bool, Option<(LocalIdentityId, PeerId)>) {
2246        let mut original = [0u8; FRAME];
2247        original[..frame_len].copy_from_slice(&buf[..frame_len]);
2248        let mut replay_target = None;
2249        let handled = if let Some(channel_id) = header.channel {
2250            let channel_candidates: Vec<(ChannelKey, DerivedChannelKeys), CHANNELS> = self
2251                .channels
2252                .lookup_by_id(&channel_id)
2253                .map(|channel| (channel.channel_key.clone(), channel.derived.clone()))
2254                .collect();
2255            if channel_candidates.is_empty() {
2256                false
2257            } else {
2258                let mut handled = false;
2259                for (resolved_channel_key, channel_keys) in channel_candidates {
2260                    buf[..frame_len].copy_from_slice(&original[..frame_len]);
2261                    let Ok((dst, source_addr)) = self.crypto.decrypt_blind_addr(
2262                        &mut buf[..frame_len],
2263                        header,
2264                        &channel_keys,
2265                    ) else {
2266                        continue;
2267                    };
2268                    let Some(local_id) = self.find_local_identity_for_dst(Some(dst)) else {
2269                        continue;
2270                    };
2271                    for (peer_id, peer_key) in
2272                        self.resolve_blind_source_peer_candidates(&buf[..frame_len], source_addr)
2273                    {
2274                        let Ok(pairwise_keys) = self.ensure_peer_crypto(local_id, peer_id).await
2275                        else {
2276                            continue;
2277                        };
2278                        let blind_keys =
2279                            self.crypto.derive_blind_keys(&pairwise_keys, &channel_keys);
2280                        let body_range = match self.crypto.open_packet(
2281                            &mut buf[..frame_len],
2282                            header,
2283                            &blind_keys,
2284                        ) {
2285                            Ok(range) => range,
2286                            Err(_) => continue,
2287                        };
2288                        let payload = &buf[body_range.clone()];
2289                        if !Self::payload_is_allowed(header.packet_type(), payload) {
2290                            continue;
2291                        }
2292                        match self.unicast_replay_verdict(
2293                            local_id,
2294                            peer_id,
2295                            header,
2296                            &buf[..frame_len],
2297                        ) {
2298                            Some(ReplayVerdict::Accept) => {
2299                                let _ = self.accept_unicast_replay(
2300                                    local_id,
2301                                    peer_id,
2302                                    header,
2303                                    &buf[..frame_len],
2304                                );
2305                            }
2306                            Some(ReplayVerdict::OutOfWindow | ReplayVerdict::Stale) => {
2307                                if self.try_accept_counter_resync_response(
2308                                    local_id,
2309                                    peer_id,
2310                                    header,
2311                                    &buf[..frame_len],
2312                                    payload,
2313                                ) {
2314                                    replay_target = Some((local_id, peer_id));
2315                                } else {
2316                                    self.store_deferred_counter_resync_frame(
2317                                        local_id,
2318                                        peer_id,
2319                                        &original[..frame_len],
2320                                        rx,
2321                                        received_at_ms,
2322                                    );
2323                                    self.maybe_request_counter_resync(local_id, peer_id, peer_key)
2324                                        .await;
2325                                    continue;
2326                                }
2327                            }
2328                            Some(ReplayVerdict::Replay) | None => continue,
2329                        }
2330                        self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2331
2332                        if header.ack_requested()
2333                            && self.should_emit_destination_ack(&buf[..frame_len], header)
2334                        {
2335                            let ack_tag = self.compute_received_ack_tag(
2336                                &buf[..frame_len],
2337                                header,
2338                                body_range.clone(),
2339                                &blind_keys,
2340                            );
2341                            self.queue_mac_ack_for_peer(peer_id, peer_key.hint(), ack_tag)
2342                                .ok();
2343                        }
2344
2345                        if let Some(data) = Self::echo_request_data(payload) {
2346                            let response = Self::build_echo_command_payload(
2347                                MAC_COMMAND_ECHO_RESPONSE_ID,
2348                                data,
2349                            );
2350                            let _ = self
2351                                .send_unicast(
2352                                    local_id,
2353                                    &peer_key,
2354                                    response.as_slice(),
2355                                    &SendOptions::default(),
2356                                )
2357                                .await;
2358                        }
2359
2360                        on_event(
2361                            local_id,
2362                            crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2363                                &original[..frame_len],
2364                                &buf[body_range],
2365                                header.clone(),
2366                                ParsedOptions::extract(
2367                                    &original[..frame_len],
2368                                    header.options_range.clone(),
2369                                )
2370                                .unwrap_or_default(),
2371                                Some(peer_key),
2372                                Some(peer_key.hint()),
2373                                true,
2374                                Some(crate::ChannelInfoRef {
2375                                    id: channel_id,
2376                                    key: &resolved_channel_key,
2377                                }),
2378                                crate::send::RxMetadata::new(
2379                                    Some(rx.rssi),
2380                                    Some(rx.snr),
2381                                    rx.lqi,
2382                                    Some(received_at_ms),
2383                                ),
2384                            )),
2385                        );
2386                        handled = true;
2387                        break;
2388                    }
2389                    if handled {
2390                        break;
2391                    }
2392                }
2393                handled
2394            }
2395        } else {
2396            false
2397        };
2398        let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, handled);
2399        (
2400            handled || forwarding_confirmed || forwarded,
2401            handled.then_some(()).and(replay_target),
2402        )
2403    }
2404
2405    /// Non-blocking receive: polls the radio once and processes a frame if available.
2406    ///
2407    /// This is the legacy non-blocking API used by [`poll_cycle`](Self::poll_cycle).
2408    /// For new code, prefer [`next_event`](Self::next_event) which properly awaits
2409    /// the radio and timer deadlines.
2410    pub async fn receive_one(
2411        &mut self,
2412        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2413    ) -> Result<bool, MacError<<P::Radio as Radio>::Error>> {
2414        let mut buf = [0u8; FRAME];
2415        let Some(rx) = poll_fn(|cx| match self.radio.poll_receive(cx, &mut buf) {
2416            Poll::Ready(Ok(rx)) => Poll::Ready(Ok(Some(rx))),
2417            Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
2418            Poll::Pending => Poll::Ready(Ok(None)),
2419        })
2420        .await
2421        .map_err(MacError::Radio)?
2422        else {
2423            return Ok(false);
2424        };
2425
2426        let frame_len = rx.len.min(buf.len());
2427        Ok(self
2428            .process_received_frame(&mut buf, frame_len, &rx, &mut on_event)
2429            .await)
2430    }
2431
2432    /// Mark a pending receipt as acknowledged and emit an event through `on_event`.
2433    pub fn complete_ack(
2434        &mut self,
2435        peer: &PublicKey,
2436        ack_tag: &[u8; 8],
2437    ) -> Option<(LocalIdentityId, SendReceipt)> {
2438        for (index, slot) in self.identities.iter_mut().enumerate() {
2439            let Some(slot) = slot.as_mut() else {
2440                continue;
2441            };
2442
2443            let receipt = slot.pending_acks.iter().find_map(|(receipt, pending)| {
2444                (pending.peer == *peer && pending.ack_tag == *ack_tag).then_some(*receipt)
2445            });
2446
2447            if let Some(receipt) = receipt {
2448                slot.pending_acks.remove(&receipt);
2449                if self
2450                    .post_tx_listen
2451                    .as_ref()
2452                    .map(|listen| {
2453                        listen.identity_id == LocalIdentityId(index as u8)
2454                            && listen.receipt == receipt
2455                    })
2456                    .unwrap_or(false)
2457                {
2458                    self.post_tx_listen = None;
2459                }
2460                return Some((LocalIdentityId(index as u8), receipt));
2461            }
2462        }
2463
2464        None
2465    }
2466
2467    /// Expire or retry pending ACK state based on `now_ms`.
2468    pub fn service_pending_ack_timeouts(
2469        &mut self,
2470        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2471    ) -> Result<(), CapacityError> {
2472        self.expire_post_tx_listen_if_needed();
2473
2474        #[derive(Clone)]
2475        enum Action<const FRAME: usize> {
2476            Retry {
2477                receipt: SendReceipt,
2478                resend: ResendRecord<FRAME>,
2479                not_before_ms: u64,
2480            },
2481            RouteRetry {
2482                receipt: SendReceipt,
2483                peer: PublicKey,
2484                resend: ResendRecord<FRAME>,
2485                not_before_ms: u64,
2486            },
2487            Timeout {
2488                receipt: SendReceipt,
2489                peer: PublicKey,
2490            },
2491        }
2492
2493        let now_ms = self.clock.now_ms();
2494        let t_frame_ms = self.radio.t_frame_ms();
2495
2496        for index in 0..self.identities.len() {
2497            let identity_id = LocalIdentityId(index as u8);
2498            let actions = {
2499                let Some(slot) = self.identities[index].as_mut() else {
2500                    continue;
2501                };
2502
2503                let mut actions: Vec<Action<FRAME>, ACKS> = Vec::new();
2504                for (receipt, pending) in slot.pending_acks.iter_mut() {
2505                    if !matches!(pending.state, crate::AckState::Queued { .. })
2506                        && now_ms >= pending.ack_deadline_ms
2507                    {
2508                        if Self::can_attempt_route_retry(pending) {
2509                            let backoff_cap_ms =
2510                                Self::forward_retry_backoff_cap_ms_for_t_frame(t_frame_ms, 1);
2511                            let backoff_ms = if backoff_cap_ms == 0 {
2512                                0
2513                            } else {
2514                                u64::from(self.rng.random_range(..backoff_cap_ms.saturating_add(1)))
2515                            };
2516                            actions
2517                                .push(Action::RouteRetry {
2518                                    receipt: *receipt,
2519                                    peer: pending.peer,
2520                                    resend: pending.resend.clone(),
2521                                    not_before_ms: now_ms.saturating_add(backoff_ms),
2522                                })
2523                                .map_err(|_| CapacityError)?;
2524                        } else {
2525                            actions
2526                                .push(Action::Timeout {
2527                                    receipt: *receipt,
2528                                    peer: pending.peer,
2529                                })
2530                                .map_err(|_| CapacityError)?;
2531                        }
2532                        continue;
2533                    }
2534
2535                    if let crate::AckState::AwaitingForward {
2536                        confirm_deadline_ms,
2537                    } = pending.state
2538                    {
2539                        if now_ms >= confirm_deadline_ms && pending.retries < MAX_FORWARD_RETRIES {
2540                            pending.retries = pending.retries.saturating_add(1);
2541                            let backoff_cap_ms = Self::forward_retry_backoff_cap_ms_for_t_frame(
2542                                t_frame_ms,
2543                                pending.retries,
2544                            );
2545                            let backoff_ms = if backoff_cap_ms == 0 {
2546                                0
2547                            } else {
2548                                u64::from(self.rng.random_range(..backoff_cap_ms.saturating_add(1)))
2549                            };
2550                            let not_before_ms = now_ms.saturating_add(backoff_ms);
2551                            pending.state = crate::AckState::RetryQueued;
2552                            actions
2553                                .push(Action::Retry {
2554                                    receipt: *receipt,
2555                                    resend: pending.resend.clone(),
2556                                    not_before_ms,
2557                                })
2558                                .map_err(|_| CapacityError)?;
2559                        }
2560                    }
2561                }
2562                actions
2563            };
2564
2565            for action in actions {
2566                match action {
2567                    Action::Retry {
2568                        receipt,
2569                        resend,
2570                        not_before_ms,
2571                    } => {
2572                        self.tx_queue.enqueue_with_state(
2573                            TxPriority::Retry,
2574                            resend.frame.as_slice(),
2575                            Some(receipt),
2576                            Some(identity_id),
2577                            not_before_ms,
2578                            0,
2579                            0,
2580                        )?;
2581                    }
2582                    Action::RouteRetry {
2583                        receipt,
2584                        peer,
2585                        resend,
2586                        not_before_ms,
2587                    } => {
2588                        if let Some(rewritten) = self.synthesize_route_retry_resend(&peer, &resend)
2589                        {
2590                            if let Some(pending) = self
2591                                .identity_mut(identity_id)
2592                                .and_then(|slot| slot.pending_ack_mut(&receipt))
2593                            {
2594                                pending.resend = rewritten.clone();
2595                                pending.retries = 0;
2596                                pending.sent_ms = 0;
2597                                pending.ack_deadline_ms = 0;
2598                                pending.state = crate::AckState::RetryQueued;
2599                            }
2600                            self.tx_queue.enqueue_with_state(
2601                                TxPriority::Retry,
2602                                rewritten.frame.as_slice(),
2603                                Some(receipt),
2604                                Some(identity_id),
2605                                not_before_ms,
2606                                0,
2607                                0,
2608                            )?;
2609                        } else {
2610                            if let Some(slot) = self.identity_mut(identity_id) {
2611                                slot.pending_acks.remove(&receipt);
2612                            }
2613                            on_event(
2614                                identity_id,
2615                                crate::MacEventRef::AckTimeout { peer, receipt },
2616                            );
2617                        }
2618                    }
2619                    Action::Timeout { receipt, peer } => {
2620                        if let Some(slot) = self.identity_mut(identity_id) {
2621                            slot.pending_acks.remove(&receipt);
2622                        }
2623                        on_event(
2624                            identity_id,
2625                            crate::MacEventRef::AckTimeout { peer, receipt },
2626                        );
2627                    }
2628                }
2629            }
2630        }
2631
2632        Ok(())
2633    }
2634
2635    /// Cancel a pending ACK-requested send, stopping retransmissions.
2636    ///
2637    /// Removes the pending ACK entry for the given identity slot and receipt,
2638    /// and removes any matching entry from the transmit queue. Returns `true`
2639    /// if a pending ACK was found and removed.
2640    pub fn cancel_pending_ack(
2641        &mut self,
2642        identity_id: LocalIdentityId,
2643        receipt: SendReceipt,
2644    ) -> bool {
2645        let removed = self
2646            .identity_mut(identity_id)
2647            .and_then(|slot| slot.remove_pending_ack(&receipt))
2648            .is_some();
2649
2650        // Also remove any queued retransmission for this receipt.
2651        self.tx_queue.remove_first_matching(|entry| {
2652            entry.receipt == Some(receipt) && entry.identity_id == Some(identity_id)
2653        });
2654
2655        // Clear the post-tx listen if it was tracking this receipt.
2656        if let Some(listen) = &self.post_tx_listen {
2657            if listen.identity_id == identity_id && listen.receipt == receipt {
2658                self.post_tx_listen = None;
2659            }
2660        }
2661
2662        removed
2663    }
2664
2665    fn identity_and_advance(
2666        &mut self,
2667        from: LocalIdentityId,
2668    ) -> Result<(PublicKey, u32), SendError> {
2669        let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
2670        if slot.counter_window_exhausted() {
2671            return Err(SendError::CounterPersistenceLag);
2672        }
2673        let source_key = *slot.identity().public_key();
2674        let frame_counter = slot.advance_frame_counter();
2675        slot.schedule_counter_persist_if_needed();
2676        Ok((source_key, frame_counter))
2677    }
2678
2679    fn take_salt(&mut self, options: &SendOptions) -> Option<u16> {
2680        options.salt.then(|| self.rng.next_u32() as u16)
2681    }
2682
2683    fn enforce_send_policy(
2684        &self,
2685        channel_id: Option<ChannelId>,
2686        options: &SendOptions,
2687        blind_unicast: bool,
2688    ) -> Result<(), SendError> {
2689        let _authority = self.classify_send_authority(options, blind_unicast)?;
2690
2691        let Some(channel_id) = channel_id else {
2692            return Ok(());
2693        };
2694        let Some(policy) = self
2695            .operating_policy
2696            .channel_policies
2697            .iter()
2698            .find(|policy| policy.channel_id == channel_id)
2699        else {
2700            return Ok(());
2701        };
2702
2703        if policy.require_unencrypted && options.encrypted {
2704            return Err(SendError::PolicyViolation);
2705        }
2706        if policy.require_full_source && !options.full_source {
2707            return Err(SendError::PolicyViolation);
2708        }
2709        if let Some(max_flood_hops) = policy.max_flood_hops {
2710            if options
2711                .flood_hops
2712                .map(|hops| hops > max_flood_hops)
2713                .unwrap_or(false)
2714            {
2715                return Err(SendError::PolicyViolation);
2716            }
2717        }
2718
2719        Ok(())
2720    }
2721
2722    fn classify_send_authority(
2723        &self,
2724        options: &SendOptions,
2725        _blind_unicast: bool,
2726    ) -> Result<TransmitAuthority, SendError> {
2727        match self.operating_policy.amateur_radio_mode {
2728            AmateurRadioMode::Unlicensed => Ok(TransmitAuthority::Unlicensed),
2729            AmateurRadioMode::LicensedOnly => {
2730                if options.encrypted || self.operating_policy.operator_callsign.is_none() {
2731                    return Err(SendError::PolicyViolation);
2732                }
2733                Ok(TransmitAuthority::Amateur)
2734            }
2735            AmateurRadioMode::Hybrid => {
2736                if options.encrypted {
2737                    // Hybrid encrypted traffic must be treated as unlicensed
2738                    // traffic for downstream transmit-power and duty-cycle policy.
2739                    Ok(TransmitAuthority::Unlicensed)
2740                } else if self.operating_policy.operator_callsign.is_some() {
2741                    Ok(TransmitAuthority::Amateur)
2742                } else {
2743                    Ok(TransmitAuthority::Unlicensed)
2744                }
2745            }
2746        }
2747    }
2748
2749    fn enqueue_packet(
2750        &mut self,
2751        packet: UnsealedPacket<'_>,
2752        receipt: Option<SendReceipt>,
2753        identity_id: Option<LocalIdentityId>,
2754    ) -> Result<(), SendError> {
2755        if packet.total_len() > self.radio.max_frame_size() {
2756            return Err(SendError::Build(BuildError::BufferTooSmall));
2757        }
2758        self.tx_queue
2759            .enqueue(
2760                TxPriority::Application,
2761                packet.as_bytes(),
2762                receipt,
2763                identity_id,
2764            )
2765            .map_err(|_| SendError::QueueFull)?;
2766        Ok(())
2767    }
2768
2769    fn refresh_pending_resend(
2770        &mut self,
2771        from: LocalIdentityId,
2772        receipt: SendReceipt,
2773        frame: &[u8],
2774        source_route: Option<&[RouterHint]>,
2775    ) -> Result<(), SendError> {
2776        let resend =
2777            ResendRecord::try_new(frame, source_route).map_err(|_| SendError::QueueFull)?;
2778        let pending = self
2779            .identity_mut(from)
2780            .ok_or(SendError::IdentityMissing)?
2781            .pending_ack_mut(&receipt)
2782            .ok_or(SendError::IdentityMissing)?;
2783        pending.resend = resend;
2784        Ok(())
2785    }
2786
2787    fn prepare_pending_ack(
2788        &mut self,
2789        from: LocalIdentityId,
2790        peer: PublicKey,
2791        packet: &UnsealedPacket<'_>,
2792        keys: &PairwiseKeys,
2793        options: &SendOptions,
2794    ) -> Result<SendReceipt, SendError> {
2795        let header = PacketHeader::parse(packet.as_bytes())?;
2796        let mut cmac: CmacState<_> = self.crypto.cmac_state(&keys.k_mic);
2797        feed_aad(&header, packet.as_bytes(), |chunk| cmac.update(chunk));
2798        cmac.update(packet.body());
2799        let full_mac = cmac.finalize();
2800        let ack_tag = self.crypto.compute_ack_tag(&full_mac, &keys.k_enc);
2801        let is_forwarded = options
2802            .source_route
2803            .as_ref()
2804            .map(|route| !route.is_empty())
2805            .unwrap_or(false)
2806            || options.flood_hops.unwrap_or(0) > 0;
2807        let resend = ResendRecord::try_new(
2808            packet.as_bytes(),
2809            options.source_route.as_ref().map(|route| route.as_slice()),
2810        )
2811        .map_err(|_| SendError::QueueFull)?;
2812
2813        let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
2814        let receipt = slot.next_receipt();
2815        let pending = if is_forwarded {
2816            PendingAck::forwarded(ack_tag, peer, resend)
2817        } else {
2818            PendingAck::direct(ack_tag, peer, resend)
2819        };
2820        slot.try_insert_pending_ack(receipt, pending)
2821            .map_err(|_| SendError::PendingAckFull)?;
2822        Ok(receipt)
2823    }
2824
2825    async fn derive_pairwise_keys_for_peer(
2826        &self,
2827        local_id: LocalIdentityId,
2828        peer_key: &PublicKey,
2829    ) -> Result<PairwiseKeys, SendError> {
2830        let shared_secret = {
2831            let slot = self.identity(local_id).ok_or(SendError::IdentityMissing)?;
2832            match slot.identity() {
2833                LocalIdentity::LongTerm(identity) => identity
2834                    .agree(peer_key)
2835                    .await
2836                    .map_err(|_| SendError::IdentityAgreementFailed)?,
2837                #[cfg(feature = "software-crypto")]
2838                LocalIdentity::Ephemeral(identity) => identity
2839                    .agree(peer_key)
2840                    .await
2841                    .map_err(|_| SendError::IdentityAgreementFailed)?,
2842            }
2843        };
2844
2845        Ok(self.crypto.derive_pairwise_keys(&shared_secret))
2846    }
2847
2848    fn effective_source_route(
2849        &self,
2850        peer_id: PeerId,
2851        options: &SendOptions,
2852    ) -> Option<Vec<RouterHint, MAX_SOURCE_ROUTE_HOPS>> {
2853        if let Some(route) = options.source_route.as_ref() {
2854            return Some(route.clone());
2855        }
2856
2857        let Some(peer) = self.peer_registry.get(peer_id) else {
2858            return None;
2859        };
2860        match peer.route.as_ref() {
2861            Some(CachedRoute::Source(route)) => Some(route.clone()),
2862            _ => None,
2863        }
2864    }
2865
2866    fn cache_peer_crypto(
2867        &mut self,
2868        local_id: LocalIdentityId,
2869        peer_id: PeerId,
2870        pairwise_keys: PairwiseKeys,
2871    ) -> Result<(), SendError> {
2872        let slot = self
2873            .identity_mut(local_id)
2874            .ok_or(SendError::IdentityMissing)?;
2875        if slot.peer_crypto().get(&peer_id).is_some() {
2876            return Ok(());
2877        }
2878        slot.peer_crypto_mut()
2879            .insert(
2880                peer_id,
2881                crate::peers::PeerCryptoState {
2882                    pairwise_keys,
2883                    replay_window: ReplayWindow::new(),
2884                },
2885            )
2886            .map_err(|_| SendError::QueueFull)?;
2887        Ok(())
2888    }
2889
2890    async fn ensure_peer_crypto(
2891        &mut self,
2892        local_id: LocalIdentityId,
2893        peer_id: PeerId,
2894    ) -> Result<PairwiseKeys, SendError> {
2895        if let Some(keys) = self
2896            .identity(local_id)
2897            .and_then(|slot| slot.peer_crypto().get(&peer_id))
2898            .map(|state| state.pairwise_keys.clone())
2899        {
2900            return Ok(keys);
2901        }
2902
2903        let peer_key = self
2904            .peer_registry
2905            .get(peer_id)
2906            .ok_or(SendError::PeerMissing)?
2907            .public_key;
2908        let pairwise_keys = self
2909            .derive_pairwise_keys_for_peer(local_id, &peer_key)
2910            .await?;
2911        self.cache_peer_crypto(local_id, peer_id, pairwise_keys.clone())?;
2912        Ok(pairwise_keys)
2913    }
2914
2915    fn insert_identity(
2916        &mut self,
2917        identity: LocalIdentity<P::Identity>,
2918        pfs_parent: Option<LocalIdentityId>,
2919    ) -> Result<LocalIdentityId, CapacityError> {
2920        let initial_frame_counter = self.rng.next_u32();
2921
2922        if let Some((index, slot)) = self
2923            .identities
2924            .iter_mut()
2925            .enumerate()
2926            .find(|(_, slot)| slot.is_none())
2927        {
2928            *slot = Some(IdentitySlot::new(
2929                identity,
2930                initial_frame_counter,
2931                pfs_parent,
2932            ));
2933            return Ok(LocalIdentityId(index as u8));
2934        }
2935
2936        let next_id = self.identities.len();
2937        self.identities
2938            .push(Some(IdentitySlot::new(
2939                identity,
2940                initial_frame_counter,
2941                pfs_parent,
2942            )))
2943            .map_err(|_| CapacityError)?;
2944        Ok(LocalIdentityId(next_id as u8))
2945    }
2946
2947    fn compute_received_ack_tag(
2948        &self,
2949        buf: &[u8],
2950        header: &PacketHeader,
2951        body_range: core::ops::Range<usize>,
2952        keys: &PairwiseKeys,
2953    ) -> [u8; 8] {
2954        let mut cmac: CmacState<_> = self.crypto.cmac_state(&keys.k_mic);
2955        feed_aad(header, buf, |chunk| cmac.update(chunk));
2956        cmac.update(&buf[body_range]);
2957        let full_mac = cmac.finalize();
2958        self.crypto.compute_ack_tag(&full_mac, &keys.k_enc)
2959    }
2960
2961    fn requeue_tx(&mut self, queued: &crate::QueuedTx<FRAME>) -> Result<u32, CapacityError> {
2962        self.tx_queue.enqueue_with_state(
2963            queued.priority,
2964            queued.frame.as_slice(),
2965            queued.receipt,
2966            queued.identity_id,
2967            queued.not_before_ms,
2968            queued.cad_attempts,
2969            queued.forward_deferrals,
2970        )
2971    }
2972
2973    fn accept_unicast_replay(
2974        &mut self,
2975        local_id: LocalIdentityId,
2976        peer_id: PeerId,
2977        header: &PacketHeader,
2978        frame: &[u8],
2979    ) -> bool {
2980        let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
2981            return false;
2982        };
2983        let now_ms = self.clock.now_ms();
2984        let Some(window) = self
2985            .identity_mut(local_id)
2986            .and_then(|slot| slot.peer_crypto_mut().get_mut(&peer_id))
2987            .map(|state| &mut state.replay_window)
2988        else {
2989            return false;
2990        };
2991
2992        if window.check(counter, mic, now_ms) != ReplayVerdict::Accept {
2993            return false;
2994        }
2995
2996        window.accept(counter, mic, now_ms);
2997        true
2998    }
2999
3000    fn unicast_replay_verdict(
3001        &mut self,
3002        local_id: LocalIdentityId,
3003        peer_id: PeerId,
3004        header: &PacketHeader,
3005        frame: &[u8],
3006    ) -> Option<ReplayVerdict> {
3007        let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3008            return None;
3009        };
3010        let now_ms = self.clock.now_ms();
3011        self.identity_mut(local_id)
3012            .and_then(|slot| slot.peer_crypto_mut().get_mut(&peer_id))
3013            .map(|state| state.replay_window.check(counter, mic, now_ms))
3014    }
3015
3016    fn try_accept_counter_resync_response(
3017        &mut self,
3018        local_id: LocalIdentityId,
3019        peer_id: PeerId,
3020        header: &PacketHeader,
3021        frame: &[u8],
3022        payload: &[u8],
3023    ) -> bool {
3024        let Some(nonce) = Self::echo_response_nonce(payload) else {
3025            return false;
3026        };
3027        let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3028            return false;
3029        };
3030        let now_ms = self.clock.now_ms();
3031        let Some(slot) = self.identity_mut(local_id) else {
3032            return false;
3033        };
3034        let Some(pending) = slot.pending_counter_resync().get(&peer_id).copied() else {
3035            return false;
3036        };
3037        if pending.nonce != nonce {
3038            return false;
3039        }
3040        let Some(state) = slot.peer_crypto_mut().get_mut(&peer_id) else {
3041            return false;
3042        };
3043        state.replay_window.reset(counter, now_ms);
3044        state.replay_window.accept(counter, mic, now_ms);
3045        let _ = slot.pending_counter_resync_mut().remove(&peer_id);
3046        true
3047    }
3048
3049    async fn maybe_request_counter_resync(
3050        &mut self,
3051        local_id: LocalIdentityId,
3052        peer_id: PeerId,
3053        peer_key: PublicKey,
3054    ) {
3055        let now_ms = self.clock.now_ms();
3056        let should_send = {
3057            let Some(slot) = self.identity(local_id) else {
3058                return;
3059            };
3060            match slot.pending_counter_resync().get(&peer_id).copied() {
3061                Some(pending) => {
3062                    now_ms.saturating_sub(pending.requested_ms) >= COUNTER_RESYNC_REQUEST_RETRY_MS
3063                }
3064                None => true,
3065            }
3066        };
3067        if !should_send {
3068            return;
3069        }
3070
3071        let nonce = self.rng.next_u32();
3072        let payload =
3073            Self::build_echo_command_payload(MAC_COMMAND_ECHO_REQUEST_ID, &nonce.to_be_bytes());
3074        let options = SendOptions::default();
3075        if self
3076            .send_unicast(local_id, &peer_key, payload.as_slice(), &options)
3077            .await
3078            .is_ok()
3079        {
3080            if let Some(slot) = self.identity_mut(local_id) {
3081                let _ = slot.pending_counter_resync_mut().insert(
3082                    peer_id,
3083                    PendingCounterResync {
3084                        nonce,
3085                        requested_ms: now_ms,
3086                    },
3087                );
3088            }
3089        }
3090    }
3091
3092    fn store_deferred_counter_resync_frame(
3093        &mut self,
3094        local_id: LocalIdentityId,
3095        peer_id: PeerId,
3096        frame: &[u8],
3097        rx: &RxInfo,
3098        received_at_ms: u64,
3099    ) {
3100        let mut stored = Vec::new();
3101        stored
3102            .extend_from_slice(frame)
3103            .expect("received frame length must fit configured frame capacity");
3104        self.deferred_counter_resync_frame = Some(DeferredCounterResyncFrame {
3105            local_id,
3106            peer_id,
3107            frame: stored,
3108            rssi: rx.rssi,
3109            snr: rx.snr,
3110            lqi: rx.lqi,
3111            received_at_ms,
3112        });
3113    }
3114
3115    fn take_deferred_counter_resync_frame(
3116        &mut self,
3117        local_id: LocalIdentityId,
3118        peer_id: PeerId,
3119    ) -> Option<DeferredCounterResyncFrame<FRAME>> {
3120        match self.deferred_counter_resync_frame.as_ref() {
3121            Some(deferred) if deferred.local_id == local_id && deferred.peer_id == peer_id => {
3122                self.deferred_counter_resync_frame.take()
3123            }
3124            _ => None,
3125        }
3126    }
3127
3128    fn accept_multicast_replay(
3129        &mut self,
3130        channel_id: ChannelId,
3131        peer_id: PeerId,
3132        header: &PacketHeader,
3133        frame: &[u8],
3134    ) -> bool {
3135        let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3136            return false;
3137        };
3138        let now_ms = self.clock.now_ms();
3139        let Some(channel) = self.channels.get_mut_by_id(&channel_id) else {
3140            return false;
3141        };
3142
3143        if let Some(window) = channel.replay.get_mut(&peer_id) {
3144            if window.check(counter, mic, now_ms) != ReplayVerdict::Accept {
3145                return false;
3146            }
3147            window.accept(counter, mic, now_ms);
3148            return true;
3149        }
3150
3151        let mut window = ReplayWindow::new();
3152        window.accept(counter, mic, now_ms);
3153        channel.replay.insert(peer_id, window).is_ok()
3154    }
3155
3156    fn clear_peer_slot_state(&mut self, peer_id: PeerId) {
3157        for slot in self.identities.iter_mut().filter_map(|slot| slot.as_mut()) {
3158            let _ = slot.peer_crypto_mut().remove(&peer_id);
3159            let _ = slot.pending_counter_resync_mut().remove(&peer_id);
3160        }
3161        for channel in self.channels.iter_mut() {
3162            let _ = channel.replay.remove(&peer_id);
3163        }
3164    }
3165
3166    fn try_auto_register_peer(&mut self, key: PublicKey) -> Result<PeerId, CapacityError> {
3167        let now_ms = self.clock.now_ms();
3168        let outcome = self.peer_registry.try_insert_or_update_auto(key, now_ms)?;
3169        if outcome.evicted_key.is_some() {
3170            self.clear_peer_slot_state(outcome.peer_id);
3171        }
3172        Ok(outcome.peer_id)
3173    }
3174
3175    fn replay_metadata<'a>(header: &PacketHeader, frame: &'a [u8]) -> Option<(u32, &'a [u8])> {
3176        let counter = header.sec_info?.frame_counter;
3177        let mic = frame.get(header.mic_range.clone())?;
3178        Some((counter, mic))
3179    }
3180
3181    fn build_echo_command_payload(command_id: u8, data: &[u8]) -> Vec<u8, FRAME> {
3182        let mut payload = Vec::new();
3183        let _ = payload.push(PayloadType::MacCommand as u8);
3184        let _ = payload.push(command_id);
3185        let _ = payload.extend_from_slice(data);
3186        payload
3187    }
3188
3189    fn echo_request_data(payload: &[u8]) -> Option<&[u8]> {
3190        let (&payload_type, rest) = payload.split_first()?;
3191        let (&command_id, data) = rest.split_first()?;
3192        if PayloadType::from_byte(payload_type)? != PayloadType::MacCommand
3193            || command_id != MAC_COMMAND_ECHO_REQUEST_ID
3194        {
3195            return None;
3196        }
3197        Some(data)
3198    }
3199
3200    fn echo_response_nonce(payload: &[u8]) -> Option<u32> {
3201        let (&payload_type, rest) = payload.split_first()?;
3202        let (&command_id, data) = rest.split_first()?;
3203        if PayloadType::from_byte(payload_type)? != PayloadType::MacCommand
3204            || command_id != MAC_COMMAND_ECHO_RESPONSE_ID
3205            || data.len() != COUNTER_RESYNC_NONCE_LEN
3206        {
3207            return None;
3208        }
3209        Some(u32::from_be_bytes(data.try_into().ok()?))
3210    }
3211
3212    fn full_key_at(frame: &[u8], offset: usize) -> Option<PublicKey> {
3213        let mut key = [0u8; 32];
3214        key.copy_from_slice(frame.get(offset..offset + 32)?);
3215        Some(PublicKey(key))
3216    }
3217
3218    fn find_local_identity_for_dst(
3219        &self,
3220        dst: Option<umsh_core::NodeHint>,
3221    ) -> Option<LocalIdentityId> {
3222        let dst = dst?;
3223        self.identities
3224            .iter()
3225            .enumerate()
3226            .find(|(_, slot)| {
3227                slot.as_ref()
3228                    .map(|slot| slot.identity().public_key().hint() == dst)
3229                    .unwrap_or(false)
3230            })
3231            .map(|(index, _)| LocalIdentityId(index as u8))
3232    }
3233
3234    fn resolve_source_peer_candidates(
3235        &mut self,
3236        frame: &[u8],
3237        header: &PacketHeader,
3238    ) -> Vec<(PeerId, PublicKey), PEERS> {
3239        match header.source {
3240            SourceAddrRef::FullKeyAt { offset } => {
3241                let Some(peer_key) = Self::full_key_at(frame, offset) else {
3242                    return Vec::new();
3243                };
3244
3245                if let Some((peer_id, _)) = self.peer_registry.lookup_by_key(&peer_key) {
3246                    let mut out = Vec::new();
3247                    let _ = out.push((peer_id, peer_key));
3248                    return out;
3249                }
3250
3251                if self.auto_register_full_key_peers {
3252                    if let Ok(peer_id) = self.try_auto_register_peer(peer_key) {
3253                        let mut out = Vec::new();
3254                        let _ = out.push((peer_id, peer_key));
3255                        return out;
3256                    }
3257                }
3258
3259                Vec::new()
3260            }
3261            SourceAddrRef::Hint(hint) => self
3262                .peer_registry
3263                .lookup_by_hint(&hint)
3264                .map(|(peer_id, info)| (peer_id, info.public_key))
3265                .collect(),
3266            SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => Vec::new(),
3267        }
3268    }
3269
3270    fn resolve_multicast_source(
3271        &mut self,
3272        frame: &[u8],
3273        header: &PacketHeader,
3274    ) -> Option<ResolvedMulticastSource> {
3275        match header.source {
3276            SourceAddrRef::FullKeyAt { offset } => {
3277                let mut key = [0u8; 32];
3278                key.copy_from_slice(frame.get(offset..offset + 32)?);
3279                let public_key = PublicKey(key);
3280                let peer_id = self
3281                    .peer_registry
3282                    .lookup_by_key(&public_key)
3283                    .map(|(peer_id, _)| peer_id);
3284                Some(ResolvedMulticastSource {
3285                    peer_id,
3286                    public_key: Some(public_key),
3287                    hint: Some(public_key.hint()),
3288                })
3289            }
3290            SourceAddrRef::Hint(hint) => {
3291                let resolved = self.resolve_unique_hint(hint);
3292                Some(ResolvedMulticastSource {
3293                    peer_id: resolved.map(|(peer_id, _)| peer_id),
3294                    public_key: resolved.map(|(_, key)| key),
3295                    hint: Some(hint),
3296                })
3297            }
3298            SourceAddrRef::Encrypted { offset, len } => match len {
3299                32 => {
3300                    let mut key = [0u8; 32];
3301                    key.copy_from_slice(frame.get(offset..offset + 32)?);
3302                    let public_key = PublicKey(key);
3303                    let peer_id = self
3304                        .peer_registry
3305                        .lookup_by_key(&public_key)
3306                        .map(|(peer_id, _)| peer_id);
3307                    Some(ResolvedMulticastSource {
3308                        peer_id,
3309                        public_key: Some(public_key),
3310                        hint: Some(public_key.hint()),
3311                    })
3312                }
3313                3 => {
3314                    let hint = umsh_core::NodeHint([
3315                        *frame.get(offset)?,
3316                        *frame.get(offset + 1)?,
3317                        *frame.get(offset + 2)?,
3318                    ]);
3319                    let resolved = self.resolve_unique_hint(hint);
3320                    Some(ResolvedMulticastSource {
3321                        peer_id: resolved.map(|(peer_id, _)| peer_id),
3322                        public_key: resolved.map(|(_, key)| key),
3323                        hint: Some(hint),
3324                    })
3325                }
3326                _ => None,
3327            },
3328            SourceAddrRef::None => None,
3329        }
3330    }
3331
3332    fn accept_unknown_multicast_replay(&mut self, header: &PacketHeader, frame: &[u8]) -> bool {
3333        let Some(cache_key) = Self::forward_dup_key(header, frame) else {
3334            return false;
3335        };
3336        if self.multicast_unknown_dup_cache.contains(&cache_key) {
3337            return false;
3338        }
3339        self.multicast_unknown_dup_cache
3340            .insert(cache_key, self.clock.now_ms());
3341        true
3342    }
3343
3344    fn resolve_blind_source_peer_candidates(
3345        &mut self,
3346        frame: &[u8],
3347        source: SourceAddrRef,
3348    ) -> Vec<(PeerId, PublicKey), PEERS> {
3349        match source {
3350            SourceAddrRef::FullKeyAt { offset } => {
3351                let Some(peer_key) = Self::full_key_at(frame, offset) else {
3352                    return Vec::new();
3353                };
3354
3355                if let Some((peer_id, _)) = self.peer_registry.lookup_by_key(&peer_key) {
3356                    let mut out = Vec::new();
3357                    let _ = out.push((peer_id, peer_key));
3358                    return out;
3359                }
3360
3361                if self.auto_register_full_key_peers {
3362                    if let Ok(peer_id) = self.try_auto_register_peer(peer_key) {
3363                        let mut out = Vec::new();
3364                        let _ = out.push((peer_id, peer_key));
3365                        return out;
3366                    }
3367                }
3368
3369                Vec::new()
3370            }
3371            SourceAddrRef::Hint(hint) => self
3372                .peer_registry
3373                .lookup_by_hint(&hint)
3374                .map(|(peer_id, info)| (peer_id, info.public_key))
3375                .collect(),
3376            SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => Vec::new(),
3377        }
3378    }
3379
3380    fn resolve_unique_hint(&self, hint: umsh_core::NodeHint) -> Option<(PeerId, PublicKey)> {
3381        let mut matches = self.peer_registry.lookup_by_hint(&hint);
3382        let (peer_id, info) = matches.next()?;
3383        if matches.next().is_some() {
3384            return None;
3385        }
3386        Some((peer_id, info.public_key))
3387    }
3388
3389    fn learn_route_for_peer(&mut self, peer_id: PeerId, frame: &[u8], header: &PacketHeader) {
3390        let now_ms = self.clock.now_ms();
3391        self.peer_registry.touch(peer_id, now_ms);
3392
3393        let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3394            return;
3395        };
3396
3397        if let Some(trace_range) = options.trace_route {
3398            if let Some(route) = self.source_route_from_trace(frame.get(trace_range).unwrap_or(&[]))
3399            {
3400                self.peer_registry
3401                    .update_route(peer_id, crate::CachedRoute::Source(route));
3402                return;
3403            }
3404        }
3405
3406        if let Some(flood_hops) = header.flood_hops {
3407            self.peer_registry.update_route(
3408                peer_id,
3409                crate::CachedRoute::Flood {
3410                    hops: flood_hops.accumulated(),
3411                },
3412            );
3413        }
3414    }
3415
3416    fn resolve_broadcast_source(
3417        frame: &[u8],
3418        header: &PacketHeader,
3419    ) -> Option<(umsh_core::NodeHint, Option<PublicKey>)> {
3420        match header.source {
3421            SourceAddrRef::Hint(hint) => Some((hint, None)),
3422            SourceAddrRef::FullKeyAt { offset } => {
3423                let mut key = [0u8; 32];
3424                key.copy_from_slice(frame.get(offset..offset + 32)?);
3425                let public_key = PublicKey(key);
3426                Some((public_key.hint(), Some(public_key)))
3427            }
3428            SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => None,
3429        }
3430    }
3431
3432    fn payload_is_allowed(packet_type: PacketType, payload: &[u8]) -> bool {
3433        if payload.is_empty() {
3434            return true;
3435        }
3436
3437        PayloadType::from_byte(payload[0])
3438            .unwrap_or(PayloadType::Empty)
3439            .allowed_for(packet_type)
3440    }
3441
3442    fn source_route_from_trace(
3443        &self,
3444        trace_bytes: &[u8],
3445    ) -> Option<heapless::Vec<RouterHint, { crate::MAX_SOURCE_ROUTE_HOPS }>> {
3446        if trace_bytes.len() % 2 != 0 {
3447            return None;
3448        }
3449
3450        let mut route = heapless::Vec::new();
3451        for chunk in trace_bytes.chunks_exact(2) {
3452            route.push(RouterHint([chunk[0], chunk[1]])).ok()?;
3453        }
3454        Some(route)
3455    }
3456
3457    fn should_emit_destination_ack(&self, frame: &[u8], header: &PacketHeader) -> bool {
3458        let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3459            return false;
3460        };
3461
3462        // Destination ACKs are emitted only once a source route is fully
3463        // consumed. An empty SourceRoute still matters for provenance, but it
3464        // no longer constrains forwarding and therefore counts as "at the
3465        // destination" for ACK purposes.
3466        options
3467            .source_route
3468            .map(|range| range.is_empty())
3469            .unwrap_or(true)
3470    }
3471
3472    fn maybe_forward_received(
3473        &mut self,
3474        frame: &[u8],
3475        header: &PacketHeader,
3476        rx: &RxInfo,
3477        locally_handled_unicast: bool,
3478    ) -> bool {
3479        if !self.repeater.enabled {
3480            return false;
3481        }
3482        if !header.packet_type().is_routable() {
3483            return false;
3484        }
3485        // Once a point-to-point packet is handled by its actual destination, it
3486        // should not also be repeated by that same node. Other packet classes,
3487        // including broadcast and MAC ACK, remain routable.
3488        if locally_handled_unicast
3489            && matches!(
3490                header.packet_type(),
3491                PacketType::Unicast
3492                    | PacketType::UnicastAckReq
3493                    | PacketType::BlindUnicast
3494                    | PacketType::BlindUnicastAckReq
3495            )
3496        {
3497            return false;
3498        }
3499
3500        let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3501            return false;
3502        };
3503        let Some(cache_key) = Self::forward_dup_key(header, frame) else {
3504            return false;
3505        };
3506        if self.dup_cache.contains(&cache_key) {
3507            self.defer_pending_forward(&cache_key, rx, &options);
3508            return false;
3509        }
3510
3511        let Some(plan) = self.plan_forwarding(frame, header, &options, rx) else {
3512            return false;
3513        };
3514
3515        let mut rewritten = [0u8; FRAME];
3516        let Ok(total_len) =
3517            self.rewrite_forwarded_frame(frame, header, &options, plan, &mut rewritten)
3518        else {
3519            return false;
3520        };
3521        if total_len > self.radio.max_frame_size() {
3522            return false;
3523        }
3524
3525        let now_ms = self.clock.now_ms();
3526        if self
3527            .tx_queue
3528            .enqueue_with_state(
3529                TxPriority::Forward,
3530                &rewritten[..total_len],
3531                None,
3532                None,
3533                now_ms.saturating_add(plan.delay_ms),
3534                0,
3535                0,
3536            )
3537            .is_err()
3538        {
3539            return false;
3540        }
3541        self.dup_cache.insert(cache_key, now_ms);
3542        true
3543    }
3544
3545    fn plan_forwarding(
3546        &mut self,
3547        frame: &[u8],
3548        header: &PacketHeader,
3549        options: &ParsedOptions,
3550        rx: &RxInfo,
3551    ) -> Option<ForwardPlan> {
3552        if options.has_unknown_critical {
3553            return None;
3554        }
3555
3556        let router_hint = self.repeater_router_hint()?;
3557        let station_action = self.classify_forward_station_action(frame, header)?;
3558
3559        let source_route_bytes = options
3560            .source_route
3561            .as_ref()
3562            .and_then(|range| frame.get(range.clone()))
3563            .unwrap_or(&[]);
3564        if source_route_bytes.len() % 2 != 0 {
3565            return None;
3566        }
3567
3568        let mut consume_source_route = false;
3569        let mut decrement_flood_hops = false;
3570        let mut insert_region_code = None;
3571        let mut delay_ms = 0u64;
3572
3573        if !source_route_bytes.is_empty() {
3574            if source_route_bytes[..2] != router_hint.0 {
3575                return None;
3576            }
3577            consume_source_route = true;
3578            if source_route_bytes.len() == 2 {
3579                decrement_flood_hops = header.flood_hops.is_some();
3580            }
3581        } else {
3582            decrement_flood_hops = true;
3583        }
3584
3585        if decrement_flood_hops {
3586            let flood_hops = header.flood_hops?;
3587            if flood_hops.remaining() == 0 {
3588                return None;
3589            }
3590            // Signal-quality filtering applies only to flood forwarding,
3591            // not to source-routed hops.
3592            if let Some(min_rssi) = Self::effective_min_rssi(options, &self.repeater) {
3593                if rx.rssi < min_rssi {
3594                    return None;
3595                }
3596            }
3597            if let Some(min_snr) = Self::effective_min_snr(options, &self.repeater) {
3598                if rx.snr < Snr::from_decibels(min_snr) {
3599                    return None;
3600                }
3601            }
3602            let mut saw_region_code = false;
3603            let mut matched_region_code = false;
3604            if !header.options_range.is_empty() {
3605                for entry in umsh_core::iter_options(frame, header.options_range.clone()) {
3606                    let (number, value) = entry.ok()?;
3607                    if OptionNumber::from(number) != OptionNumber::RegionCode || value.len() != 2 {
3608                        continue;
3609                    }
3610                    saw_region_code = true;
3611                    let region_code = [value[0], value[1]];
3612                    if self
3613                        .repeater
3614                        .regions
3615                        .iter()
3616                        .any(|configured| *configured == region_code)
3617                    {
3618                        matched_region_code = true;
3619                    }
3620                }
3621            }
3622            if saw_region_code {
3623                if !matched_region_code {
3624                    return None;
3625                }
3626            } else {
3627                insert_region_code = self.repeater.regions.first().copied();
3628            }
3629            delay_ms = self.sample_flood_contention_delay_ms(rx, options);
3630        }
3631
3632        Some(ForwardPlan {
3633            router_hint,
3634            consume_source_route,
3635            decrement_flood_hops,
3636            insert_region_code,
3637            delay_ms,
3638            station_action,
3639        })
3640    }
3641
3642    fn classify_forward_station_action(
3643        &self,
3644        frame: &[u8],
3645        header: &PacketHeader,
3646    ) -> Option<ForwardStationAction> {
3647        let has_operator_callsign = if header.options_range.is_empty() {
3648            false
3649        } else {
3650            umsh_core::iter_options(frame, header.options_range.clone())
3651                .filter_map(Result::ok)
3652                .any(|(number, _)| OptionNumber::from(number) == OptionNumber::OperatorCallsign)
3653        };
3654        let encrypted = header
3655            .sec_info
3656            .map(|sec| sec.scf.encrypted())
3657            .unwrap_or(false);
3658
3659        match self.repeater.amateur_radio_mode {
3660            AmateurRadioMode::Unlicensed => Some(ForwardStationAction::Remove),
3661            AmateurRadioMode::LicensedOnly => {
3662                if encrypted || !has_operator_callsign || self.repeater.station_callsign.is_none() {
3663                    None
3664                } else {
3665                    Some(ForwardStationAction::Replace)
3666                }
3667            }
3668            AmateurRadioMode::Hybrid => {
3669                if !encrypted && has_operator_callsign {
3670                    self.repeater
3671                        .station_callsign
3672                        .as_ref()
3673                        .map(|_| ForwardStationAction::Replace)
3674                } else {
3675                    Some(ForwardStationAction::Remove)
3676                }
3677            }
3678        }
3679    }
3680
3681    fn rewrite_forwarded_frame(
3682        &self,
3683        src: &[u8],
3684        header: &PacketHeader,
3685        options: &ParsedOptions,
3686        plan: ForwardPlan,
3687        dst: &mut [u8],
3688    ) -> Result<usize, CapacityError> {
3689        if dst.is_empty() {
3690            return Err(CapacityError);
3691        }
3692
3693        let options_len =
3694            self.encode_forwarded_options(src, header, options, plan, &mut dst[1..])?;
3695        let mut cursor = 1 + options_len;
3696        let has_options = options_len > 0;
3697        dst[0] = umsh_core::Fcf::new(
3698            header.packet_type(),
3699            header.fcf.full_source(),
3700            has_options,
3701            header.flood_hops.is_some(),
3702        )
3703        .0;
3704
3705        if let Some(flood_hops) = header.flood_hops {
3706            let next = if plan.decrement_flood_hops {
3707                flood_hops.decremented().0
3708            } else {
3709                flood_hops.0
3710            };
3711            *dst.get_mut(cursor).ok_or(CapacityError)? = next;
3712            cursor += 1;
3713        }
3714
3715        let fixed_start = header.options_range.end + usize::from(header.flood_hops.is_some());
3716        let tail = src.get(fixed_start..).ok_or(CapacityError)?;
3717        let end = cursor + tail.len();
3718        dst.get_mut(cursor..end)
3719            .ok_or(CapacityError)?
3720            .copy_from_slice(tail);
3721        Ok(end)
3722    }
3723
3724    fn encode_forwarded_options(
3725        &self,
3726        src: &[u8],
3727        header: &PacketHeader,
3728        _options: &ParsedOptions,
3729        plan: ForwardPlan,
3730        dst: &mut [u8],
3731    ) -> Result<usize, CapacityError> {
3732        let mut encoder = OptionEncoder::new(dst);
3733        let mut inserted_region = false;
3734        let mut inserted_station = false;
3735        let mut saw_station = false;
3736        let mut wrote_any = false;
3737
3738        if !header.options_range.is_empty() {
3739            for entry in umsh_core::iter_options(src, header.options_range.clone()) {
3740                let (number, value) = entry.map_err(|_| CapacityError)?;
3741                if !inserted_region {
3742                    if let Some(region_code) = plan.insert_region_code {
3743                        if number > OptionNumber::RegionCode.as_u16() {
3744                            encoder
3745                                .put(OptionNumber::RegionCode.as_u16(), &region_code)
3746                                .map_err(|_| CapacityError)?;
3747                            inserted_region = true;
3748                            wrote_any = true;
3749                        }
3750                    }
3751                }
3752                if !inserted_station
3753                    && matches!(plan.station_action, ForwardStationAction::Replace)
3754                    && number > OptionNumber::StationCallsign.as_u16()
3755                {
3756                    encoder
3757                        .put(
3758                            OptionNumber::StationCallsign.as_u16(),
3759                            self.repeater
3760                                .station_callsign
3761                                .as_ref()
3762                                .ok_or(CapacityError)?
3763                                .as_trimmed_slice(),
3764                        )
3765                        .map_err(|_| CapacityError)?;
3766                    inserted_station = true;
3767                    wrote_any = true;
3768                }
3769
3770                match OptionNumber::from(number) {
3771                    OptionNumber::RegionCode => {
3772                        inserted_region = true;
3773                        encoder.put(number, value).map_err(|_| CapacityError)?;
3774                        wrote_any = true;
3775                    }
3776                    OptionNumber::TraceRoute => {
3777                        let mut trace = [0u8; crate::MAX_SOURCE_ROUTE_HOPS * 2 + 2];
3778                        trace[..2].copy_from_slice(&plan.router_hint.0);
3779                        trace[2..2 + value.len()].copy_from_slice(value);
3780                        encoder
3781                            .put(number, &trace[..2 + value.len()])
3782                            .map_err(|_| CapacityError)?;
3783                        wrote_any = true;
3784                    }
3785                    OptionNumber::SourceRoute if plan.consume_source_route => {
3786                        if value.len() < 2 || value.len() % 2 != 0 {
3787                            return Err(CapacityError);
3788                        }
3789                        let remaining = if value.len() > 2 { &value[2..] } else { &[] };
3790                        encoder.put(number, remaining).map_err(|_| CapacityError)?;
3791                        wrote_any = true;
3792                    }
3793                    OptionNumber::StationCallsign => {
3794                        saw_station = true;
3795                        match plan.station_action {
3796                            ForwardStationAction::Remove => {}
3797                            ForwardStationAction::Replace => {
3798                                encoder
3799                                    .put(
3800                                        number,
3801                                        self.repeater
3802                                            .station_callsign
3803                                            .as_ref()
3804                                            .ok_or(CapacityError)?
3805                                            .as_trimmed_slice(),
3806                                    )
3807                                    .map_err(|_| CapacityError)?;
3808                                inserted_station = true;
3809                                wrote_any = true;
3810                            }
3811                        }
3812                    }
3813                    _ => {
3814                        encoder.put(number, value).map_err(|_| CapacityError)?;
3815                        wrote_any = true;
3816                    }
3817                }
3818            }
3819        }
3820
3821        if matches!(plan.station_action, ForwardStationAction::Replace)
3822            && !inserted_station
3823            && !saw_station
3824        {
3825            encoder
3826                .put(
3827                    OptionNumber::StationCallsign.as_u16(),
3828                    self.repeater
3829                        .station_callsign
3830                        .as_ref()
3831                        .ok_or(CapacityError)?
3832                        .as_trimmed_slice(),
3833                )
3834                .map_err(|_| CapacityError)?;
3835            wrote_any = true;
3836        }
3837        if let Some(region_code) = plan.insert_region_code {
3838            if !inserted_region {
3839                encoder
3840                    .put(OptionNumber::RegionCode.as_u16(), &region_code)
3841                    .map_err(|_| CapacityError)?;
3842                wrote_any = true;
3843            }
3844        }
3845        if wrote_any {
3846            encoder.end_marker().map_err(|_| CapacityError)?;
3847            Ok(encoder.finish())
3848        } else {
3849            Ok(0)
3850        }
3851    }
3852
3853    fn synthesize_route_retry_resend(
3854        &self,
3855        peer: &PublicKey,
3856        resend: &ResendRecord<FRAME>,
3857    ) -> Option<ResendRecord<FRAME>> {
3858        let header = PacketHeader::parse(resend.frame.as_slice()).ok()?;
3859        let options =
3860            ParsedOptions::extract(resend.frame.as_slice(), header.options_range.clone()).ok()?;
3861        if options.route_retry {
3862            return None;
3863        }
3864        let source_route = resend.source_route.as_ref()?;
3865        if source_route.is_empty() {
3866            return None;
3867        }
3868
3869        let flood_hops = self.route_retry_flood_hops(peer, &header, source_route)?;
3870        let mut rewritten = [0u8; FRAME];
3871        let options_len = self
3872            .encode_route_retry_options(
3873                resend.frame.as_slice(),
3874                header.options_range.clone(),
3875                &options,
3876                &mut rewritten[1..],
3877            )
3878            .ok()?;
3879        let mut cursor = 1 + options_len;
3880        let has_options = options_len > 0;
3881        let has_flood_hops = flood_hops > 0;
3882        rewritten[0] = umsh_core::Fcf::new(
3883            header.packet_type(),
3884            header.fcf.full_source(),
3885            has_options,
3886            has_flood_hops,
3887        )
3888        .0;
3889
3890        if has_flood_hops {
3891            *rewritten.get_mut(cursor)? = FloodHops::new(flood_hops, 0)?.0;
3892            cursor += 1;
3893        }
3894
3895        let fixed_start = header.options_range.end + usize::from(header.flood_hops.is_some());
3896        let tail = resend.frame.get(fixed_start..)?;
3897        let end = cursor + tail.len();
3898        rewritten.get_mut(cursor..end)?.copy_from_slice(tail);
3899
3900        ResendRecord::try_new(&rewritten[..end], None).ok()
3901    }
3902
3903    fn encode_route_retry_options(
3904        &self,
3905        src: &[u8],
3906        options_range: core::ops::Range<usize>,
3907        _options: &ParsedOptions,
3908        dst: &mut [u8],
3909    ) -> Result<usize, CapacityError> {
3910        let mut encoder = OptionEncoder::new(dst);
3911        let mut wrote_any = false;
3912        let mut inserted_trace_route = false;
3913        let mut inserted_route_retry = false;
3914
3915        if !options_range.is_empty() {
3916            for entry in umsh_core::iter_options(src, options_range) {
3917                let (number, value) = entry.map_err(|_| CapacityError)?;
3918                if !inserted_trace_route && number > OptionNumber::TraceRoute.as_u16() {
3919                    encoder
3920                        .put(OptionNumber::TraceRoute.as_u16(), &[])
3921                        .map_err(|_| CapacityError)?;
3922                    wrote_any = true;
3923                    inserted_trace_route = true;
3924                }
3925                if !inserted_route_retry && number > OptionNumber::RouteRetry.as_u16() {
3926                    encoder
3927                        .put(OptionNumber::RouteRetry.as_u16(), &[])
3928                        .map_err(|_| CapacityError)?;
3929                    wrote_any = true;
3930                    inserted_route_retry = true;
3931                }
3932                match OptionNumber::from(number) {
3933                    OptionNumber::SourceRoute => {}
3934                    OptionNumber::TraceRoute => {
3935                        encoder.put(number, value).map_err(|_| CapacityError)?;
3936                        wrote_any = true;
3937                        inserted_trace_route = true;
3938                    }
3939                    OptionNumber::RouteRetry => {}
3940                    _ => {
3941                        encoder.put(number, value).map_err(|_| CapacityError)?;
3942                        wrote_any = true;
3943                    }
3944                }
3945            }
3946        }
3947
3948        if !inserted_trace_route {
3949            encoder
3950                .put(OptionNumber::TraceRoute.as_u16(), &[])
3951                .map_err(|_| CapacityError)?;
3952            wrote_any = true;
3953        }
3954        if !inserted_route_retry {
3955            encoder
3956                .put(OptionNumber::RouteRetry.as_u16(), &[])
3957                .map_err(|_| CapacityError)?;
3958            wrote_any = true;
3959        }
3960
3961        if wrote_any {
3962            encoder.end_marker().map_err(|_| CapacityError)?;
3963            Ok(encoder.finish())
3964        } else {
3965            Ok(0)
3966        }
3967    }
3968
3969    fn route_retry_flood_hops(
3970        &self,
3971        peer: &PublicKey,
3972        header: &PacketHeader,
3973        source_route: &heapless::Vec<RouterHint, MAX_SOURCE_ROUTE_HOPS>,
3974    ) -> Option<u8> {
3975        let existing = header
3976            .flood_hops
3977            .map(|hops| hops.remaining())
3978            .filter(|hops| *hops > 0);
3979        let cached = self
3980            .peer_registry
3981            .lookup_by_key(peer)
3982            .and_then(|(_, info)| match info.route.as_ref() {
3983                Some(crate::CachedRoute::Flood { hops }) => Some((*hops).clamp(1, 15)),
3984                _ => None,
3985            });
3986        let route_len = u8::try_from(source_route.len())
3987            .ok()
3988            .map(|hops| hops.clamp(1, 15));
3989
3990        existing.or(cached).or(route_len).or(Some(5))
3991    }
3992
3993    fn repeater_router_hint(&self) -> Option<RouterHint> {
3994        self.identities
3995            .iter()
3996            .filter_map(|slot| slot.as_ref())
3997            .next()
3998            .map(|slot| slot.identity().public_key().router_hint())
3999    }
4000
4001    fn effective_min_rssi(options: &ParsedOptions, repeater: &RepeaterConfig) -> Option<i16> {
4002        match (options.min_rssi, repeater.min_rssi) {
4003            (Some(packet), Some(local)) => Some(packet.max(local)),
4004            (Some(packet), None) => Some(packet),
4005            (None, Some(local)) => Some(local),
4006            (None, None) => None,
4007        }
4008    }
4009
4010    fn effective_min_snr(options: &ParsedOptions, repeater: &RepeaterConfig) -> Option<i8> {
4011        match (options.min_snr, repeater.min_snr) {
4012            (Some(packet), Some(local)) => Some(packet.max(local)),
4013            (Some(packet), None) => Some(packet),
4014            (None, Some(local)) => Some(local),
4015            (None, None) => None,
4016        }
4017    }
4018
4019    fn sample_flood_contention_delay_ms(&mut self, rx: &RxInfo, options: &ParsedOptions) -> u64 {
4020        let effective_threshold_db =
4021            Self::effective_min_snr(options, &self.repeater).unwrap_or(i8::MIN);
4022        let low_db = self
4023            .repeater
4024            .flood_contention_snr_low_db
4025            .max(effective_threshold_db);
4026        let high_db = self
4027            .repeater
4028            .flood_contention_snr_high_db
4029            .max(low_db.saturating_add(1));
4030        let low = i32::from(Snr::from_decibels(low_db).as_centibels());
4031        let high = i32::from(Snr::from_decibels(high_db).as_centibels());
4032        let received = i32::from(rx.snr.as_centibels());
4033        let clamped = (received - low).clamp(0, high - low) as u32;
4034        let range = (high - low) as u32;
4035        let t_frame_ms = u64::from(self.radio.t_frame_ms());
4036        let min_window_ms = t_frame_ms
4037            .saturating_mul(u64::from(self.repeater.flood_contention_min_window_percent))
4038            / 100;
4039        let max_window_ms = t_frame_ms
4040            .saturating_mul(u64::from(self.repeater.flood_contention_max_window_frames))
4041            .max(min_window_ms);
4042        let window_span_ms = max_window_ms.saturating_sub(min_window_ms);
4043        let window_ms = if range == 0 {
4044            max_window_ms
4045        } else {
4046            max_window_ms.saturating_sub(
4047                window_span_ms.saturating_mul(u64::from(clamped)) / u64::from(range),
4048            )
4049        };
4050        if window_ms == 0 {
4051            0
4052        } else {
4053            self.rng.random_range(..window_ms.saturating_add(1))
4054        }
4055    }
4056
4057    /// Routing identity used for duplicate suppression at repeaters.
4058    ///
4059    /// This is intentionally not the same thing as the destination's logical
4060    /// delivery identity:
4061    /// - delivery identity is governed by replay windows / frame counters at
4062    ///   the destination
4063    /// - routing identity must remain stable across repeater rewrites of
4064    ///   dynamic routing metadata
4065    /// - forwarding-confirmation identity intentionally matches routing
4066    ///   identity so a node can recognize "the same packet, forwarded onward"
4067    pub(crate) fn forward_dup_key(header: &PacketHeader, frame: &[u8]) -> Option<DupCacheKey> {
4068        Self::routable_packet_identity(header, frame)
4069    }
4070
4071    fn routable_packet_identity(header: &PacketHeader, frame: &[u8]) -> Option<DupCacheKey> {
4072        if !header.packet_type().is_secure() {
4073            return Some(DupCacheKey::Hash32(Self::normalized_routable_hash32(
4074                header, frame,
4075            )));
4076        }
4077        let options = ParsedOptions::extract(frame, header.options_range.clone()).ok()?;
4078        let mic = frame.get(header.mic_range.clone())?;
4079        if mic.is_empty() || mic.len() > 16 {
4080            return None;
4081        }
4082        let mut bytes = [0u8; 16];
4083        bytes[..mic.len()].copy_from_slice(mic);
4084        Some(DupCacheKey::Mic {
4085            bytes,
4086            len: mic.len() as u8,
4087            route_retry: options.route_retry,
4088        })
4089    }
4090
4091    fn defer_pending_forward(&mut self, key: &DupCacheKey, rx: &RxInfo, options: &ParsedOptions) {
4092        let Some(queued) = self.tx_queue.remove_first_matching(|entry| {
4093            entry.priority == TxPriority::Forward
4094                && Self::confirmation_key(entry.frame.as_slice())
4095                    .map(|entry_key| &entry_key == key)
4096                    .unwrap_or(false)
4097        }) else {
4098            return;
4099        };
4100
4101        if queued.forward_deferrals >= self.repeater.flood_contention_max_deferrals {
4102            return;
4103        }
4104
4105        let now_ms = self.clock.now_ms();
4106        let delay_ms = self.sample_flood_contention_delay_ms(rx, options);
4107        let _ = self.tx_queue.enqueue_with_state(
4108            queued.priority,
4109            queued.frame.as_slice(),
4110            queued.receipt,
4111            queued.identity_id,
4112            now_ms.saturating_add(delay_ms),
4113            queued.cad_attempts,
4114            queued.forward_deferrals.saturating_add(1),
4115        );
4116    }
4117
4118    async fn service_post_tx_listen(
4119        &mut self,
4120        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
4121    ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
4122        loop {
4123            self.expire_post_tx_listen_if_needed();
4124            if self.post_tx_listen.is_none() {
4125                return Ok(());
4126            }
4127
4128            let handled = self.receive_one(&mut on_event).await?;
4129            if !handled {
4130                return Ok(());
4131            }
4132        }
4133    }
4134
4135    fn note_transmitted_ack_requested(&mut self, receipt: SendReceipt, frame: &[u8]) {
4136        let sent_ms = self.clock.now_ms();
4137        let direct_ack_deadline_ms = sent_ms.saturating_add(self.direct_ack_timeout_ms());
4138        let forwarded_ack_deadline_ms = sent_ms.saturating_add(self.forwarded_ack_timeout_ms());
4139        let confirm_timeout_ms = self.forward_confirm_timeout_ms();
4140        let confirm_key = Self::confirmation_key(frame);
4141
4142        let post_tx_listen = {
4143            let Some((identity_id, pending)) = self.pending_ack_mut(receipt) else {
4144                return;
4145            };
4146
4147            let needs_forward_confirmation = match pending.state {
4148                crate::AckState::Queued {
4149                    needs_forward_confirmation,
4150                } => needs_forward_confirmation,
4151                crate::AckState::RetryQueued => true,
4152                _ => return,
4153            };
4154
4155            pending.sent_ms = sent_ms;
4156            if pending.ack_deadline_ms == 0 {
4157                pending.ack_deadline_ms = if needs_forward_confirmation {
4158                    forwarded_ack_deadline_ms
4159                } else {
4160                    direct_ack_deadline_ms
4161                };
4162            }
4163
4164            if needs_forward_confirmation {
4165                let deadline_ms = sent_ms.saturating_add(confirm_timeout_ms);
4166                pending.state = crate::AckState::AwaitingForward {
4167                    confirm_deadline_ms: deadline_ms,
4168                };
4169                confirm_key.map(|confirm_key| PostTxListen {
4170                    identity_id,
4171                    receipt,
4172                    confirm_key,
4173                    deadline_ms,
4174                })
4175            } else {
4176                pending.state = crate::AckState::AwaitingAck;
4177                None
4178            }
4179        };
4180
4181        self.post_tx_listen = post_tx_listen;
4182    }
4183
4184    fn expire_post_tx_listen_if_needed(&mut self) {
4185        let should_clear = self
4186            .post_tx_listen
4187            .as_ref()
4188            .map(|listen| self.clock.now_ms() >= listen.deadline_ms)
4189            .unwrap_or(false);
4190        if should_clear {
4191            self.post_tx_listen = None;
4192        }
4193    }
4194
4195    fn forward_confirm_timeout_ms(&self) -> u64 {
4196        let t_frame_ms = u64::from(self.radio.t_frame_ms());
4197        t_frame_ms
4198            .saturating_add(self.max_forward_contention_delay_ms())
4199            .saturating_add(t_frame_ms)
4200    }
4201
4202    fn max_forward_contention_delay_ms(&self) -> u64 {
4203        u64::from(self.radio.t_frame_ms())
4204            .saturating_mul(u64::from(self.repeater.flood_contention_max_window_frames))
4205    }
4206
4207    fn forward_retry_backoff_cap_ms(&self, retry_number: u8) -> u32 {
4208        Self::forward_retry_backoff_cap_ms_for_t_frame(self.radio.t_frame_ms(), retry_number)
4209    }
4210
4211    fn forward_retry_backoff_cap_ms_for_t_frame(t_frame_ms: u32, retry_number: u8) -> u32 {
4212        let exponent = retry_number.saturating_sub(1).min(2);
4213        t_frame_ms
4214            .saturating_mul(1u32 << exponent)
4215            .min(t_frame_ms.saturating_mul(4))
4216    }
4217
4218    fn can_attempt_route_retry(pending: &PendingAck<FRAME>) -> bool {
4219        let Ok(header) = PacketHeader::parse(pending.resend.frame.as_slice()) else {
4220            return false;
4221        };
4222        let Ok(options) = ParsedOptions::extract(
4223            pending.resend.frame.as_slice(),
4224            header.options_range.clone(),
4225        ) else {
4226            return false;
4227        };
4228        !options.route_retry
4229            && pending
4230                .resend
4231                .source_route
4232                .as_ref()
4233                .map(|route| !route.is_empty())
4234                .unwrap_or(false)
4235    }
4236
4237    fn direct_ack_timeout_ms(&self) -> u64 {
4238        u64::from(self.radio.t_frame_ms()).saturating_mul(10)
4239    }
4240
4241    fn forwarded_ack_timeout_ms(&self) -> u64 {
4242        let mut total = self.forward_confirm_timeout_ms();
4243        for retry_number in 1..=MAX_FORWARD_RETRIES {
4244            total = total
4245                .saturating_add(u64::from(self.forward_retry_backoff_cap_ms(retry_number)))
4246                .saturating_add(self.forward_confirm_timeout_ms());
4247        }
4248        total.saturating_add(u64::from(self.radio.t_frame_ms()))
4249    }
4250
4251    fn pending_ack_mut(
4252        &mut self,
4253        receipt: SendReceipt,
4254    ) -> Option<(LocalIdentityId, &mut PendingAck<FRAME>)> {
4255        for (index, slot) in self.identities.iter_mut().enumerate() {
4256            let Some(slot) = slot.as_mut() else {
4257                continue;
4258            };
4259            if let Some(pending) = slot.pending_ack_mut(&receipt) {
4260                return Some((LocalIdentityId(index as u8), pending));
4261            }
4262        }
4263        None
4264    }
4265
4266    pub(crate) fn confirmation_key(frame: &[u8]) -> Option<DupCacheKey> {
4267        let header = PacketHeader::parse(frame).ok()?;
4268        Self::routable_packet_identity(&header, frame)
4269    }
4270
4271    fn normalized_routable_hash32(header: &PacketHeader, frame: &[u8]) -> u32 {
4272        let mut hash = 0x811C_9DC5u32;
4273
4274        Self::hash_u8(&mut hash, header.packet_type() as u8);
4275        Self::hash_u8(&mut hash, header.fcf.full_source() as u8);
4276
4277        if !header.options_range.is_empty() {
4278            for entry in umsh_core::iter_options(frame, header.options_range.clone()) {
4279                let Ok((number, value)) = entry else {
4280                    continue;
4281                };
4282                let option = OptionNumber::from(number);
4283                if option.is_dynamic() {
4284                    continue;
4285                }
4286                Self::hash_u16(&mut hash, number);
4287                Self::hash_u16(&mut hash, value.len() as u16);
4288                Self::hash_bytes(&mut hash, value);
4289            }
4290        }
4291
4292        match header.packet_type() {
4293            PacketType::Broadcast => {
4294                match header.source {
4295                    umsh_core::SourceAddrRef::Hint(hint) => Self::hash_bytes(&mut hash, &hint.0),
4296                    umsh_core::SourceAddrRef::FullKeyAt { offset } => {
4297                        if let Some(key) = frame.get(offset..offset + 32) {
4298                            Self::hash_bytes(&mut hash, key);
4299                        }
4300                    }
4301                    umsh_core::SourceAddrRef::Encrypted { offset, len } => {
4302                        if let Some(src) = frame.get(offset..offset + len) {
4303                            Self::hash_bytes(&mut hash, src);
4304                        }
4305                    }
4306                    umsh_core::SourceAddrRef::None => {}
4307                }
4308                if let Some(payload) = frame.get(header.body_range.clone()) {
4309                    Self::hash_bytes(&mut hash, payload);
4310                }
4311            }
4312            PacketType::MacAck => {
4313                if let Some(dst) = header.ack_dst {
4314                    Self::hash_bytes(&mut hash, &dst.0);
4315                }
4316                if let Some(tag) = frame.get(header.mic_range.clone()) {
4317                    Self::hash_bytes(&mut hash, tag);
4318                }
4319            }
4320            _ => {
4321                if let Some(bytes) = frame.get(header.body_range.clone()) {
4322                    Self::hash_bytes(&mut hash, bytes);
4323                }
4324            }
4325        }
4326        hash
4327    }
4328
4329    fn hash_u8(hash: &mut u32, value: u8) {
4330        *hash ^= u32::from(value);
4331        *hash = hash.wrapping_mul(0x0100_0193);
4332    }
4333
4334    fn hash_u16(hash: &mut u32, value: u16) {
4335        Self::hash_bytes(hash, &value.to_be_bytes());
4336    }
4337
4338    fn hash_bytes(hash: &mut u32, bytes: &[u8]) {
4339        for byte in bytes {
4340            Self::hash_u8(hash, *byte);
4341        }
4342    }
4343
4344    /// Check if a received frame confirms forwarding of a pending send.
4345    ///
4346    /// Returns `Some((identity_id, receipt))` on successful confirmation
4347    /// (AwaitingForward → AwaitingAck transition), `None` otherwise.
4348    fn observe_forwarding_confirmation(
4349        &mut self,
4350        frame: &[u8],
4351    ) -> Option<(LocalIdentityId, SendReceipt)> {
4352        self.expire_post_tx_listen_if_needed();
4353        let listen = self.post_tx_listen.clone()?;
4354
4355        let received_key = Self::confirmation_key(frame)?;
4356        if received_key != listen.confirm_key {
4357            return None;
4358        }
4359
4360        let Some(slot) = self.identity_mut(listen.identity_id) else {
4361            self.post_tx_listen = None;
4362            return None;
4363        };
4364        let Some(pending) = slot.pending_ack_mut(&listen.receipt) else {
4365            self.post_tx_listen = None;
4366            return None;
4367        };
4368        if !matches!(pending.state, crate::AckState::AwaitingForward { .. }) {
4369            self.post_tx_listen = None;
4370            return None;
4371        }
4372
4373        pending.state = crate::AckState::AwaitingAck;
4374        self.post_tx_listen = None;
4375        Some((listen.identity_id, listen.receipt))
4376    }
4377
4378    fn match_pending_peer_for_ack(
4379        &self,
4380        slot: &IdentitySlot<P::Identity, PEERS, ACKS, FRAME>,
4381        ack_tag_bytes: &[u8],
4382    ) -> Option<PublicKey> {
4383        if ack_tag_bytes.len() != 8 {
4384            return None;
4385        }
4386
4387        slot.pending_acks
4388            .iter()
4389            .find_map(|(_, pending)| (pending.ack_tag == ack_tag_bytes).then_some(pending.peer))
4390    }
4391}
4392
4393fn align_counter_boundary(value: u32) -> u32 {
4394    value & !COUNTER_PERSIST_BLOCK_MASK
4395}
4396
4397fn next_counter_persist_target(next_counter: u32) -> u32 {
4398    next_counter.wrapping_add(COUNTER_PERSIST_BLOCK_SIZE) & !COUNTER_PERSIST_BLOCK_MASK
4399}