umsh_mac/
coordinator.rs

1use core::num::NonZeroU8;
2use core::{future::poll_fn, task::Poll};
3
4use hamaddr::HamAddr;
5use heapless::{LinearMap, Vec};
6use rand::{Rng, RngExt as _};
7use umsh_core::{
8    BuildError, ChannelId, ChannelKey, FloodHops, NodeHint, OptionNumber, PacketBuilder,
9    PacketHeader, PacketType, ParseError, ParsedOptions, PayloadType, PublicKey, RouterHint,
10    SourceAddrRef, UnsealedPacket, feed_aad, options::OptionEncoder,
11};
12use umsh_crypto::{
13    CmacState, CryptoEngine, CryptoError, DerivedChannelKeys, NodeIdentity, PairwiseKeys,
14};
15use umsh_hal::{Clock, CounterStore, Radio, RxInfo, Snr, TxError, TxOptions};
16
17use crate::{
18    CapacityError, DEFAULT_ACKS, DEFAULT_CHANNELS, DEFAULT_DUP, DEFAULT_IDENTITIES, DEFAULT_PEERS,
19    DEFAULT_TX, MAX_CAD_ATTEMPTS, MAX_FORWARD_RETRIES, MAX_RESEND_FRAME_LEN, MAX_SOURCE_ROUTE_HOPS,
20    Platform, ReplayVerdict, ReplayWindow,
21    cache::{DupCacheKey, DuplicateCache},
22    peers::CachedRoute,
23    peers::{ChannelTable, PeerCryptoMap, PeerId, PeerRegistry},
24    send::{
25        PendingAck, PendingAckError, ResendRecord, SendOptions, SendReceipt, TxPriority, TxQueue,
26    },
27};
28
29/// Why [`Mac::poll_wait_for_wake`] returned ready.
30///
31/// Returned by the sync phase-2 poll so that callers sharing a coordinator
32/// across tasks can decide when to re-acquire the exclusive borrow for
33/// [`Mac::process_wake_reason`].
34pub enum WakeReason {
35    /// A frame was received; its metadata is attached and the caller-provided
36    /// buffer has been populated with `rx.len` bytes.
37    Received(RxInfo),
38    /// At least one coordinator timer has elapsed (ACK deadline, post-TX
39    /// listen window, or a deferred transmit becoming ready).
40    TimerExpired,
41}
42
43const COUNTER_PERSIST_BLOCK_SIZE: u32 = 128;
44const COUNTER_PERSIST_BLOCK_MASK: u32 = COUNTER_PERSIST_BLOCK_SIZE - 1;
45const COUNTER_PERSIST_SCHEDULE_OFFSET: u32 = 100;
46const MAC_COMMAND_ECHO_REQUEST_ID: u8 = 4;
47const MAC_COMMAND_ECHO_RESPONSE_ID: u8 = 5;
48const COUNTER_RESYNC_NONCE_LEN: usize = 4;
49const COUNTER_RESYNC_REQUEST_RETRY_MS: u64 = 5_000;
50
51#[derive(Clone, Copy, Debug, PartialEq, Eq)]
52struct PendingCounterResync {
53    nonce: u32,
54    requested_ms: u64,
55}
56
57#[derive(Clone, Debug, PartialEq, Eq)]
58struct DeferredCounterResyncFrame<const FRAME: usize> {
59    local_id: LocalIdentityId,
60    peer_id: PeerId,
61    frame: Vec<u8, FRAME>,
62    rssi: i16,
63    snr: Snr,
64    lqi: Option<NonZeroU8>,
65    received_at_ms: u64,
66}
67
68#[derive(Clone, Copy, Debug, PartialEq, Eq)]
69struct ResolvedMulticastSource {
70    peer_id: Option<PeerId>,
71    public_key: Option<PublicKey>,
72    hint: Option<NodeHint>,
73}
74
75#[derive(Clone, Debug, PartialEq, Eq)]
76struct PostTxListen {
77    identity_id: LocalIdentityId,
78    receipt: SendReceipt,
79    confirm_key: DupCacheKey,
80    deadline_ms: u64,
81}
82
83/// Opaque handle that identifies a locally registered identity within the [`Mac`] coordinator.
84///
85/// Every UMSH node presents one or more Ed25519 public keys to the network. When a key is
86/// registered via [`Mac::add_identity`] (or [`Mac::register_ephemeral`] for PFS sessions),
87/// the coordinator allocates a slot and returns a `LocalIdentityId` that permanently names it.
88///
89/// The inner `u8` is a stable zero-based slot index — slot `0` is the first identity
90/// registered, slot `1` the second, and so on. All per-identity coordinator operations
91/// (`queue_unicast`, `queue_multicast`, ACK tracking, key installation, frame-counter
92/// persistence) accept a `LocalIdentityId` to select which local keypair to use, allowing a
93/// single coordinator instance to operate multiple identities simultaneously — for example, a
94/// persistent long-term identity alongside an ephemeral PFS session identity.
95#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
96pub struct LocalIdentityId(pub u8);
97
98/// A local node identity that the [`Mac`] coordinator owns and acts on behalf of.
99///
100/// UMSH nodes are identified by Ed25519 public keys. An identity provides the public key and
101/// the ability to derive pairwise keys via ECDH with the corresponding private key.
102/// Two variants are supported:
103///
104/// - **`LongTerm(I)`** — wraps the platform-supplied `I: NodeIdentity`, which is typically
105///   backed by secure-element storage, an HSM, or a platform keystore. Long-term identities
106///   persist across reboots; their frame counters are saved to the [`umsh_hal::CounterStore`]
107///   so that replay protection remains valid after a power cycle.
108///
109/// - **`Ephemeral`** — wraps an in-memory [`SoftwareIdentity`](umsh_crypto::software::SoftwareIdentity)
110///   generated fresh at runtime for Perfect Forward Secrecy sessions. Because the key material
111///   itself vanishes on power loss, ephemeral identities do not persist their frame counters;
112///   replay protection is meaningful only within a single session. Requires the
113///   `software-crypto` crate feature.
114///
115/// Use [`LocalIdentity::public_key`] or [`LocalIdentity::hint`] to inspect the address
116/// presented to the network without matching on the variant.
117pub enum LocalIdentity<I: NodeIdentity> {
118    /// Long-term platform identity.
119    LongTerm(I),
120    #[cfg(feature = "software-crypto")]
121    /// Software ephemeral identity used for PFS sessions.
122    Ephemeral(umsh_crypto::software::SoftwareIdentity),
123}
124
125impl<I: NodeIdentity> LocalIdentity<I> {
126    /// Return the public key for this identity.
127    pub fn public_key(&self) -> &PublicKey {
128        match self {
129            Self::LongTerm(identity) => identity.public_key(),
130            #[cfg(feature = "software-crypto")]
131            Self::Ephemeral(identity) => identity.public_key(),
132        }
133    }
134
135    /// Return the derived node hint for this identity.
136    pub fn hint(&self) -> umsh_core::NodeHint {
137        self.public_key().hint()
138    }
139
140    /// Return whether this identity is ephemeral.
141    pub fn is_ephemeral(&self) -> bool {
142        match self {
143            Self::LongTerm(_) => false,
144            #[cfg(feature = "software-crypto")]
145            Self::Ephemeral(_) => true,
146        }
147    }
148}
149
150impl<I: NodeIdentity> From<I> for LocalIdentity<I> {
151    fn from(value: I) -> Self {
152        Self::LongTerm(value)
153    }
154}
155
156/// Per-identity runtime state owned by the [`Mac`] coordinator.
157///
158/// There is exactly one `IdentitySlot` per registered local identity ([`LocalIdentityId`]).
159/// The slot bundles everything the coordinator needs to send, receive, and authenticate
160/// on behalf of a single local keypair:
161///
162/// - The [`LocalIdentity`] (public key + ECDH capability).
163/// - A [`PeerCryptoMap`](crate::peers::PeerCryptoMap) mapping each known remote peer to its
164///   established [`umsh_crypto::PairwiseKeys`] and replay window. Entries are populated on
165///   first secure contact, or through the advanced manual-install escape hatch.
166/// - A monotonically increasing **frame counter** stamped into SECINFO of every sealed
167///   packet, plus the bookkeeping needed to persist it safely (see below).
168/// - A [`LinearMap`] of in-flight [`PendingAck`](crate::send::PendingAck) records keyed by
169///   [`SendReceipt`](crate::send::SendReceipt), one entry per ACK-requested send awaiting
170///   either forwarding confirmation or a final transport ACK.
171/// - An internal `next_receipt` counter used to issue unique
172///   [`SendReceipt`](crate::send::SendReceipt) values without allocation.
173///
174/// ## Frame-counter persistence
175///
176/// UMSH uses a monotonic frame counter instead of a timestamp for replay protection.
177/// If the counter resets to a previously-seen value after a reboot, replayed old frames
178/// might be accepted. To prevent this, the coordinator "reserves" counter ranges by writing
179/// boundary values to the [`umsh_hal::CounterStore`] *before* using them. The slot tracks
180/// three values:
181///
182/// - `frame_counter` — the live in-use value, advanced on every secured send.
183/// - `persisted_counter` — the last boundary safely committed to the store.
184/// - `pending_persist_target` — a scheduled future boundary written on the next call to
185///   [`Mac::service_counter_persistence`].
186///
187/// If the live counter reaches `persisted_counter + COUNTER_PERSIST_BLOCK_SIZE` without a
188/// successful flush, secure sends on that identity are blocked
189/// ([`SendError::CounterPersistenceLag`]) until the store catches up. Ephemeral identities
190/// opt out of this mechanism entirely.
191pub struct IdentitySlot<
192    I: NodeIdentity,
193    const PEERS: usize,
194    const ACKS: usize,
195    const FRAME: usize = MAX_RESEND_FRAME_LEN,
196> {
197    identity: LocalIdentity<I>,
198    peer_crypto: PeerCryptoMap<PEERS>,
199    frame_counter: u32,
200    persisted_counter: u32,
201    pending_persist_target: Option<u32>,
202    save_scheduled_since_boot: bool,
203    counter_persistence_enabled: bool,
204    pending_acks: LinearMap<SendReceipt, PendingAck<FRAME>, ACKS>,
205    next_receipt: u32,
206    pfs_parent: Option<LocalIdentityId>,
207    pending_counter_resync: LinearMap<PeerId, PendingCounterResync, PEERS>,
208}
209
210impl<I: NodeIdentity, const PEERS: usize, const ACKS: usize, const FRAME: usize>
211    IdentitySlot<I, PEERS, ACKS, FRAME>
212{
213    /// Create a new identity slot.
214    pub fn new(
215        identity: LocalIdentity<I>,
216        frame_counter: u32,
217        pfs_parent: Option<LocalIdentityId>,
218    ) -> Self {
219        let counter_persistence_enabled = !identity.is_ephemeral();
220        Self {
221            identity,
222            peer_crypto: PeerCryptoMap::new(),
223            frame_counter,
224            persisted_counter: frame_counter,
225            pending_persist_target: None,
226            save_scheduled_since_boot: false,
227            counter_persistence_enabled,
228            pending_acks: LinearMap::new(),
229            next_receipt: 0,
230            pfs_parent,
231            pending_counter_resync: LinearMap::new(),
232        }
233    }
234
235    /// Borrow the underlying identity.
236    pub fn identity(&self) -> &LocalIdentity<I> {
237        &self.identity
238    }
239    /// Borrow the per-peer secure-state map.
240    pub fn peer_crypto(&self) -> &PeerCryptoMap<PEERS> {
241        &self.peer_crypto
242    }
243    /// Mutably borrow the per-peer secure-state map.
244    pub fn peer_crypto_mut(&mut self) -> &mut PeerCryptoMap<PEERS> {
245        &mut self.peer_crypto
246    }
247    /// Return the current frame counter.
248    pub fn frame_counter(&self) -> u32 {
249        self.frame_counter
250    }
251    /// Return the persisted frame-counter reservation boundary.
252    pub fn persisted_counter(&self) -> u32 {
253        self.persisted_counter
254    }
255    /// Overwrite the current frame counter.
256    ///
257    /// # Safety (logical)
258    /// Misuse can break replay protection.
259    #[cfg(test)]
260    pub(crate) fn set_frame_counter(&mut self, value: u32) {
261        self.frame_counter = value;
262    }
263    /// Return the next scheduled persist target, if any.
264    pub fn pending_persist_target(&self) -> Option<u32> {
265        self.pending_persist_target
266    }
267
268    /// Return whether counter persistence is enabled for this identity.
269    pub fn counter_persistence_enabled(&self) -> bool {
270        self.counter_persistence_enabled
271    }
272
273    /// Return the current frame counter and advance it with wrapping semantics.
274    pub(crate) fn advance_frame_counter(&mut self) -> u32 {
275        let current = self.frame_counter;
276        self.frame_counter = self.frame_counter.wrapping_add(1);
277        current
278    }
279
280    /// Load a persisted counter boundary for this identity.
281    pub fn load_persisted_counter(&mut self, value: u32) {
282        let aligned = align_counter_boundary(value);
283        self.frame_counter = aligned;
284        self.persisted_counter = aligned;
285        self.pending_persist_target = None;
286        self.save_scheduled_since_boot = false;
287    }
288
289    fn schedule_counter_persist_if_needed(&mut self) {
290        if !self.counter_persistence_enabled {
291            return;
292        }
293
294        let should_schedule = !self.save_scheduled_since_boot
295            || (self.frame_counter & COUNTER_PERSIST_BLOCK_MASK) == COUNTER_PERSIST_SCHEDULE_OFFSET;
296        if !should_schedule {
297            return;
298        }
299
300        let target = next_counter_persist_target(self.frame_counter);
301        self.pending_persist_target = Some(
302            self.pending_persist_target
303                .map(|existing| existing.max(target))
304                .unwrap_or(target),
305        );
306        self.save_scheduled_since_boot = true;
307    }
308
309    fn mark_counter_persisted(&mut self, value: u32) {
310        let aligned = align_counter_boundary(value);
311        self.persisted_counter = aligned;
312        if self.pending_persist_target == Some(aligned) {
313            self.pending_persist_target = None;
314        }
315    }
316
317    fn counter_window_exhausted(&self) -> bool {
318        if !self.counter_persistence_enabled {
319            return false;
320        }
321
322        let ahead = self.persisted_counter.wrapping_sub(self.frame_counter);
323        if ahead > 0 && ahead <= COUNTER_PERSIST_BLOCK_SIZE {
324            return false;
325        }
326
327        if ahead == 0 {
328            return self.save_scheduled_since_boot;
329        }
330
331        self.frame_counter.wrapping_sub(self.persisted_counter) >= COUNTER_PERSIST_BLOCK_SIZE
332    }
333
334    /// Allocate the next send receipt.
335    pub fn next_receipt(&mut self) -> SendReceipt {
336        let receipt = SendReceipt(self.next_receipt);
337        self.next_receipt = self.next_receipt.wrapping_add(1);
338        receipt
339    }
340
341    /// Overrides the next send receipt value in tests that exercise wraparound behavior.
342    #[cfg(test)]
343    pub(crate) fn set_next_receipt_for_test(&mut self, value: u32) {
344        self.next_receipt = value;
345    }
346
347    /// Insert or replace pending-ACK state for a send receipt.
348    pub fn try_insert_pending_ack(
349        &mut self,
350        receipt: SendReceipt,
351        pending: PendingAck<FRAME>,
352    ) -> Result<Option<PendingAck<FRAME>>, PendingAckError> {
353        self.pending_acks
354            .insert(receipt, pending)
355            .map_err(|_| PendingAckError::TableFull)
356    }
357
358    /// Borrow pending-ACK state by receipt.
359    pub fn pending_ack(&self, receipt: &SendReceipt) -> Option<&PendingAck<FRAME>> {
360        self.pending_acks.get(receipt)
361    }
362
363    /// Mutably borrow pending-ACK state by receipt.
364    pub fn pending_ack_mut(&mut self, receipt: &SendReceipt) -> Option<&mut PendingAck<FRAME>> {
365        self.pending_acks.get_mut(receipt)
366    }
367
368    /// Remove pending-ACK state by receipt.
369    pub fn remove_pending_ack(&mut self, receipt: &SendReceipt) -> Option<PendingAck<FRAME>> {
370        self.pending_acks.remove(receipt)
371    }
372
373    /// Return the parent long-term identity if this slot is ephemeral.
374    pub fn pfs_parent(&self) -> Option<LocalIdentityId> {
375        self.pfs_parent
376    }
377
378    /// Borrow the pending counter-resynchronization table.
379    fn pending_counter_resync(&self) -> &LinearMap<PeerId, PendingCounterResync, PEERS> {
380        &self.pending_counter_resync
381    }
382
383    /// Mutably borrow the pending counter-resynchronization table.
384    fn pending_counter_resync_mut(
385        &mut self,
386    ) -> &mut LinearMap<PeerId, PendingCounterResync, PEERS> {
387        &mut self.pending_counter_resync
388    }
389}
390
391/// Per-channel operating-policy overrides enforced on outgoing traffic.
392///
393/// [`OperatingPolicy`] holds a small list of `ChannelPolicy` entries, one per channel that
394/// requires non-default behavior. When the coordinator builds a multicast or blind-unicast
395/// frame, it checks whether the target `channel_id` appears in this list and applies any
396/// overrides before sealing the packet.
397///
398/// Typical use cases:
399/// - **Unlicensed spectrum compliance** — force `require_unencrypted = true` for channels
400///   that must operate under Part 15 / ISM-band rules where encryption is permissible but
401///   the channel operator has chosen to run openly.
402/// - **Metadata reduction** — force `require_full_source = true` when receiving nodes need
403///   to resolve the sender without a prior key-exchange round-trip (e.g., a public beacon
404///   channel where all senders are first-contact).
405/// - **Propagation budget** — set `max_flood_hops` for high-density channels where
406///   uncontrolled flooding would waste airtime.
407///
408/// Channels absent from the policy list use the permissive defaults inherited from
409/// [`SendOptions`](crate::send::SendOptions).
410#[derive(Clone, Debug, PartialEq, Eq)]
411pub struct ChannelPolicy {
412    /// Channel to which this policy applies.
413    pub channel_id: ChannelId,
414    /// Whether the channel must be sent unencrypted.
415    pub require_unencrypted: bool,
416    /// Whether the channel requires the full source public key.
417    pub require_full_source: bool,
418    /// Optional maximum flood-hop budget.
419    pub max_flood_hops: Option<u8>,
420}
421
422/// Controls how the coordinator and optional repeater handle amateur-radio legal requirements.
423///
424/// Amateur (ham) radio law in most jurisdictions prohibits encrypted transmissions and requires
425/// station identification on all transmitted frames. UMSH supports three operating modes to
426/// accommodate networks that mix licensed and unlicensed nodes, or that operate exclusively
427/// under one regulatory regime.
428///
429/// | Mode | Encryption | Operator callsign | Repeater station callsign |
430/// |------|------------|------------------|--------------------------|
431/// | `Unlicensed` | Allowed | Optional | Not added |
432/// | `LicensedOnly` | Prohibited | Required | Required |
433/// | `Hybrid` | Allowed (local) | Optional | Added to forwarded frames |
434///
435/// The mode appears on both [`OperatingPolicy`] (for locally-originated traffic) and
436/// [`RepeaterConfig`] (for forwarding decisions) and they may differ independently — a node
437/// might transmit its own encrypted application traffic (`Unlicensed`) while acting as a
438/// licensed-identified repeater (`LicensedOnly`) for third-party frames it forwards.
439#[derive(Clone, Copy, Debug, PartialEq, Eq)]
440pub enum AmateurRadioMode {
441    /// Treat traffic as unlicensed operation only.
442    ///
443    /// Local transmit policy does not require operator callsigns or amateur-only
444    /// restrictions. Repeaters operating in this mode must not add a station
445    /// callsign when forwarding and should only retransmit packets that can be
446    /// handled under unlicensed rules.
447    Unlicensed,
448    /// Treat forwarded and locally originated traffic as amateur-only.
449    ///
450    /// Encryption and blind unicast are disallowed, operator callsigns are
451    /// required on originated packets, and repeaters must identify themselves
452    /// with a station callsign on forwarded traffic.
453    LicensedOnly,
454    /// Permit both unlicensed and amateur-qualified forwarding behavior.
455    ///
456    /// Local transmit policy remains permissive, but repeaters identify
457    /// forwarded packets with their station callsign and may still forward
458    /// packets lacking an operator callsign when they can do so under
459    /// unlicensed rules.
460    Hybrid,
461}
462
463#[derive(Clone, Copy, Debug, PartialEq, Eq)]
464enum TransmitAuthority {
465    Unlicensed,
466    Amateur,
467}
468
469#[derive(Clone, Copy, Debug, PartialEq, Eq)]
470enum ForwardStationAction {
471    Remove,
472    Replace,
473}
474
475#[derive(Clone, Copy, Debug, PartialEq, Eq)]
476struct ForwardPlan {
477    router_hint: RouterHint,
478    consume_source_route: bool,
479    decrement_flood_hops: bool,
480    insert_region_code: Option<[u8; 2]>,
481    delay_ms: u64,
482    station_action: ForwardStationAction,
483}
484
485/// Local transmission policy enforced by the [`Mac`] coordinator on all outgoing frames.
486///
487/// `OperatingPolicy` governs what the coordinator is *allowed to send*, independent of what
488/// the application requests. It is consulted at the start of every `queue_*` call via an
489/// internal policy check, which returns [`SendError::PolicyViolation`] if the requested send
490/// would violate it. This policy applies only to locally-originated frames; forwarding
491/// decisions are governed separately by [`RepeaterConfig`].
492///
493/// - **`amateur_radio_mode`** — determines whether encryption and blind-unicast are permitted
494///   and whether an operator callsign must be appended to originated frames.
495///   See [`AmateurRadioMode`].
496/// - **`operator_callsign`** — the ARNCE/HAM-64 callsign automatically appended to every
497///   locally-originated frame when set. Required in `LicensedOnly` mode; optional otherwise.
498/// - **`channel_policies`** — a small list of per-channel overrides for multicast and
499///   blind-unicast traffic. Channels absent from the list use permissive defaults.
500///
501/// The default configuration (via [`Default`]) sets `Unlicensed` mode with no callsign and
502/// no per-channel overrides.
503#[derive(Clone, Debug, PartialEq, Eq)]
504pub struct OperatingPolicy {
505    /// Amateur-radio operating mode.
506    pub amateur_radio_mode: AmateurRadioMode,
507    /// Optional local operator callsign.
508    pub operator_callsign: Option<HamAddr>,
509    /// Per-channel overrides.
510    pub channel_policies: Vec<ChannelPolicy, 4>,
511}
512
513impl Default for OperatingPolicy {
514    fn default() -> Self {
515        Self {
516            amateur_radio_mode: AmateurRadioMode::Unlicensed,
517            operator_callsign: None,
518            channel_policies: Vec::new(),
519        }
520    }
521}
522
523/// Configuration governing whether and how the node forwards received frames.
524///
525/// The UMSH MAC layer includes an optional built-in repeater that forwards packets it
526/// successfully receives, extending the effective range of the network without requiring
527/// dedicated infrastructure. `RepeaterConfig` controls every facet of that behavior:
528///
529/// - **`enabled`** — master on/off switch. When `false`, all inbound forwarding logic is
530///   skipped even if the other fields are populated.
531/// - **`regions`** — a local list of 2-byte ARNCE region codes used both for flood-forwarding
532///   eligibility checks and, when a flood-forwarded packet is untagged, as the local policy
533///   source for inserting a region code. When non-empty, packets carrying a non-matching region
534///   code are not flood-forwarded; when empty, forwarding does not impose a region check and the
535///   repeater has no local region to insert.
536/// - **`min_rssi` / `min_snr`** — signal-quality thresholds for flood forwarding. Packets
537///   received below these values are not flood-forwarded; this prevents marginal receptions
538///   from being re-injected into the network at full power, which would degrade SNR for
539///   nearby nodes rather than help. These thresholds do not apply to source-routed hops.
540/// - **Flood contention tuning** — controls the SNR-to-delay mapping used when several
541///   eligible repeaters contend to flood-forward the same frame. These values should usually
542///   remain aligned across the mesh.
543/// - **`amateur_radio_mode`** — determines whether the repeater may forward encrypted or
544///   blind-unicast frames, and whether it must inject a station callsign. See
545///   [`AmateurRadioMode`].
546/// - **`station_callsign`** — the ARNCE/HAM-64 callsign injected into the options block of
547///   every forwarded frame when operating in `LicensedOnly` or `Hybrid` mode, satisfying the
548///   third-party identification requirements of FCC §97.119 and equivalent regulations.
549///
550/// The default configuration has `enabled: false`; repeating must be explicitly opted in.
551#[derive(Clone, Debug, PartialEq, Eq)]
552pub struct RepeaterConfig {
553    /// Whether repeater forwarding is enabled.
554    pub enabled: bool,
555    /// Allowed repeater region codes.
556    pub regions: Vec<[u8; 2], 8>,
557    /// Minimum RSSI threshold for flood forwarding.
558    pub min_rssi: Option<i16>,
559    /// Minimum SNR threshold for flood forwarding.
560    pub min_snr: Option<i8>,
561    /// Lower clamp bound for SNR-based flood forwarding contention.
562    pub flood_contention_snr_low_db: i8,
563    /// Upper clamp bound for SNR-based flood forwarding contention.
564    pub flood_contention_snr_high_db: i8,
565    /// Minimum forwarding contention window as a percentage of `T_frame`.
566    pub flood_contention_min_window_percent: u8,
567    /// Maximum forwarding contention window as a multiple of `T_frame`.
568    pub flood_contention_max_window_frames: u8,
569    /// Maximum number of overheard-repeat deferrals before abandoning a pending forward.
570    pub flood_contention_max_deferrals: u8,
571    /// Amateur-radio operating mode for forwarding.
572    pub amateur_radio_mode: AmateurRadioMode,
573    /// Optional station callsign injected on forwarded traffic.
574    pub station_callsign: Option<HamAddr>,
575}
576
577impl Default for RepeaterConfig {
578    fn default() -> Self {
579        Self {
580            enabled: false,
581            regions: Vec::new(),
582            min_rssi: None,
583            min_snr: None,
584            flood_contention_snr_low_db: -6,
585            flood_contention_snr_high_db: 15,
586            flood_contention_min_window_percent: 20,
587            flood_contention_max_window_frames: 2,
588            flood_contention_max_deferrals: 3,
589            amateur_radio_mode: AmateurRadioMode::Unlicensed,
590            station_callsign: None,
591        }
592    }
593}
594
595/// Errors returned by the [`Mac`] coordinator when queueing an outbound send.
596///
597/// Returned synchronously by `queue_broadcast`, `queue_unicast`, `queue_multicast`, and
598/// related methods. An error here means the send could not be *enqueued* — it says nothing
599/// about the fate of frames already in the transmit queue.
600#[derive(Clone, Debug, PartialEq, Eq)]
601pub enum SendError {
602    /// The [`LocalIdentityId`] passed to the queue call does not correspond to an occupied
603    /// identity slot. This indicates the identity was never registered or was removed.
604    IdentityMissing,
605    /// The destination [`umsh_core::PublicKey`] is not present in the peer registry.
606    /// Register the peer first via [`Mac::add_peer`].
607    PeerMissing,
608    /// No cached pairwise session keys exist for the target peer on this identity.
609    /// This is only returned by the low-level `queue_*` APIs; the public async send APIs
610    /// derive and cache peer state automatically.
611    PairwiseKeysMissing,
612    /// The local identity failed to derive a shared secret for this peer.
613    IdentityAgreementFailed,
614    /// The target [`umsh_core::ChannelId`] is not present in the channel table.
615    /// Register the channel first via [`Mac::add_channel`] or [`Mac::add_named_channel`].
616    ChannelMissing,
617    /// The [`OperatingPolicy`] rejected this send — for example, attempting to send an
618    /// encrypted frame while operating in [`AmateurRadioMode::LicensedOnly`] mode.
619    PolicyViolation,
620    /// The low-level packet builder failed, typically because the frame buffer is too small
621    /// for the requested options and payload.
622    Build(BuildError),
623    /// Packet parsing failed while reprocessing a freshly-built frame, indicating an
624    /// internal inconsistency in the packet construction logic.
625    Parse(ParseError),
626    /// The cryptographic seal operation failed. This typically indicates a mismatched key
627    /// length or an internal crypto engine error.
628    Crypto(CryptoError),
629    /// The transmit queue is at the configured `TX` capacity. Back off and retry after
630    /// the event loop has drained some entries.
631    QueueFull,
632    /// The in-flight ACK table for this identity is at the configured `ACKS` capacity.
633    /// Wait for an existing ACK-requested send to complete or time out before sending another.
634    PendingAckFull,
635    /// Secure sends are blocked because the live frame counter has reached the persisted
636    /// reservation boundary. Call [`Mac::service_counter_persistence`] to flush a new
637    /// boundary to the counter store before retrying.
638    CounterPersistenceLag,
639}
640
641impl From<BuildError> for SendError {
642    fn from(value: BuildError) -> Self {
643        Self::Build(value)
644    }
645}
646
647impl From<ParseError> for SendError {
648    fn from(value: ParseError) -> Self {
649        Self::Parse(value)
650    }
651}
652
653impl From<CryptoError> for SendError {
654    fn from(value: CryptoError) -> Self {
655        Self::Crypto(value)
656    }
657}
658
659/// Runtime errors produced by the [`Mac`] coordinator's async event loop.
660///
661/// Unlike [`SendError`], which is returned synchronously when *enqueueing* a send,
662/// `MacError` surfaces from the async methods (`next_event`,
663/// `service_counter_persistence`) that actually drive the coordinator forward.
664#[derive(Clone, Debug, PartialEq, Eq)]
665pub enum MacError<RadioError> {
666    /// The underlying [`umsh_hal::Radio`] driver returned an error during a receive or
667    /// channel-sense operation. The inner type is platform-specific (e.g., SPI fault on
668    /// embedded hardware, socket error on a UDP transport).
669    Radio(RadioError),
670    /// A transmit-phase error from the radio, such as channel-activity-detection (CAD)
671    /// exhaustion after [`MAX_CAD_ATTEMPTS`] retries. The frame was not sent.
672    Transmit(TxError<RadioError>),
673    /// An internal capacity invariant was violated: the coordinator needed to enqueue a
674    /// control frame (MAC ACK, forwarded packet) but the transmit queue was full.
675    /// Increase the `TX` const generic on [`Mac`] to give the queue more headroom.
676    QueueFull,
677}
678
679/// Errors returned while loading persisted frame-counter boundaries via [`Mac::load_persisted_counter`].
680///
681/// On startup, applications should call [`Mac::load_persisted_counter`] for each registered
682/// long-term identity to restore the safe starting point for the frame counter from
683/// non-volatile storage.
684#[derive(Clone, Debug, PartialEq, Eq)]
685pub enum CounterPersistenceError<StoreError> {
686    /// The [`LocalIdentityId`] supplied does not correspond to an occupied identity slot.
687    IdentityMissing,
688    /// The underlying [`umsh_hal::CounterStore`] read operation failed. The application
689    /// should decide whether to halt, retry, or start the counter at a conservatively high
690    /// value to avoid replay-window collisions with pre-reset traffic.
691    Store(StoreError),
692}
693
694impl<RadioError> From<RadioError> for MacError<RadioError> {
695    fn from(value: RadioError) -> Self {
696        Self::Radio(value)
697    }
698}
699
700impl<RadioError> From<TxError<RadioError>> for MacError<RadioError> {
701    fn from(value: TxError<RadioError>) -> Self {
702        Self::Transmit(value)
703    }
704}
705
706/// Central MAC coordinator that owns and drives the full UMSH radio-facing state machine.
707///
708/// `Mac` is the top-level entry point for UMSH protocol operation. It combines a radio driver,
709/// cryptographic engine, clock, RNG, counter store, and all protocol state into a single
710/// fully-typed, allocation-free structure. All const-generic capacity parameters are enforced
711/// at compile time via `heapless` collections — there are no heap allocations inside `Mac`.
712///
713/// ## Generic parameters
714///
715/// - **`P: Platform`** — a trait bundle supplying the concrete driver types for `Radio`,
716///   `Aes`/`Sha` (crypto), `Clock`, `Rng`, and `CounterStore`. Implement [`Platform`] once
717///   per deployment target to swap in real hardware drivers, software stubs, or test doubles.
718/// - **`IDENTITIES`** — maximum simultaneously active local identities (default
719///   [`DEFAULT_IDENTITIES`]).
720/// - **`PEERS`** — maximum known remote peers and their per-identity pairwise key entries
721///   (default [`DEFAULT_PEERS`]).
722/// - **`CHANNELS`** — maximum registered multicast channel keys (default
723///   [`DEFAULT_CHANNELS`]).
724/// - **`ACKS`** — maximum simultaneously in-flight ACK-requested sends per identity
725///   (default [`DEFAULT_ACKS`]).
726/// - **`TX`** — depth of the transmit queue (default [`DEFAULT_TX`]). Must be large enough
727///   to absorb a burst of control frames (MAC ACKs + forwarded frames) alongside any
728///   backlogged application sends.
729/// - **`FRAME`** — maximum byte length of a stored frame buffer for retransmission
730///   (default [`MAX_RESEND_FRAME_LEN`]).
731/// - **`DUP`** — capacity of the duplicate-detection cache (default [`DEFAULT_DUP`]).
732///
733/// ## Lifecycle
734///
735/// 1. **Construct** with [`Mac::new`], supplying concrete driver instances and policy.
736/// 2. **Register identities** via [`Mac::add_identity`]; call
737///    [`Mac::load_persisted_counter`] on each long-term identity to restore the safe
738///    frame-counter start point from non-volatile storage.
739/// 3. **Register peers** via [`Mac::add_peer`]. Secure unicast and blind-unicast state is
740///    derived lazily from the local private key and peer public key on first use.
741/// 4. **Register channels** via [`Mac::add_channel`] or [`Mac::add_named_channel`].
742/// 5. **Drive the event loop** via [`Mac::run`] / [`Mac::run_quiet`] for long-lived tasks,
743///    or by awaiting [`Mac::next_event`] when you need to multiplex MAC progress with other
744///    async work. The coordinator handles incoming frames, outgoing transmits, forwarding,
745///    ACK matching, retransmission scheduling, and timer deadlines — no external polling
746///    required.
747/// 6. **Send traffic** by calling `queue_broadcast`, `queue_unicast`, `queue_multicast`,
748///    etc. from application code between (or concurrent with) event-loop iterations.
749/// 7. **Persist counters** by calling [`Mac::service_counter_persistence`] whenever
750///    `next_event` signals that pending persistence work is ready to flush.
751///
752/// ## Example (pseudo-code)
753///
754/// ```rust,ignore
755/// let mut mac = Mac::<MyPlatform>::new(
756///     radio, crypto, clock, rng, counter_store,
757///     RepeaterConfig::default(), OperatingPolicy::default(),
758/// );
759/// let id = mac.add_identity(my_identity)?;
760/// mac.load_persisted_counter(id).await?;
761///
762/// mac.run(|id, event| {
763///     let _ = (id, event);
764///     // handle deliveries / ACKs here and schedule persistence work as needed
765/// }).await?;
766/// ```
767pub struct Mac<
768    P: Platform,
769    const IDENTITIES: usize = DEFAULT_IDENTITIES,
770    const PEERS: usize = DEFAULT_PEERS,
771    const CHANNELS: usize = DEFAULT_CHANNELS,
772    const ACKS: usize = DEFAULT_ACKS,
773    const TX: usize = DEFAULT_TX,
774    const FRAME: usize = MAX_RESEND_FRAME_LEN,
775    const DUP: usize = DEFAULT_DUP,
776> {
777    radio: P::Radio,
778    crypto: CryptoEngine<P::Aes, P::Sha>,
779    clock: P::Clock,
780    rng: P::Rng,
781    counter_store: P::CounterStore,
782    identities: Vec<Option<IdentitySlot<P::Identity, PEERS, ACKS, FRAME>>, IDENTITIES>,
783    peer_registry: PeerRegistry<PEERS>,
784    channels: ChannelTable<CHANNELS>,
785    dup_cache: DuplicateCache<DUP>,
786    multicast_unknown_dup_cache: DuplicateCache<DUP>,
787    tx_queue: TxQueue<TX, FRAME>,
788    post_tx_listen: Option<PostTxListen>,
789    repeater: RepeaterConfig,
790    operating_policy: OperatingPolicy,
791    auto_register_full_key_peers: bool,
792    deferred_counter_resync_frame: Option<DeferredCounterResyncFrame<FRAME>>,
793}
794
795impl<
796    P: Platform,
797    const IDENTITIES: usize,
798    const PEERS: usize,
799    const CHANNELS: usize,
800    const ACKS: usize,
801    const TX: usize,
802    const FRAME: usize,
803    const DUP: usize,
804> Mac<P, IDENTITIES, PEERS, CHANNELS, ACKS, TX, FRAME, DUP>
805{
806    /// Creates a MAC coordinator with the supplied radio, crypto, timing, and policy state.
807    pub fn new(
808        radio: P::Radio,
809        crypto: CryptoEngine<P::Aes, P::Sha>,
810        clock: P::Clock,
811        rng: P::Rng,
812        counter_store: P::CounterStore,
813        repeater: RepeaterConfig,
814        operating_policy: OperatingPolicy,
815    ) -> Self {
816        Self {
817            radio,
818            crypto,
819            clock,
820            rng,
821            counter_store,
822            identities: Vec::new(),
823            peer_registry: PeerRegistry::new(),
824            channels: ChannelTable::new(),
825            dup_cache: DuplicateCache::new(),
826            multicast_unknown_dup_cache: DuplicateCache::new(),
827            tx_queue: TxQueue::new(),
828            post_tx_listen: None,
829            repeater,
830            operating_policy,
831            auto_register_full_key_peers: false,
832            deferred_counter_resync_frame: None,
833        }
834    }
835
836    /// Borrow the underlying radio.
837    pub fn radio(&self) -> &P::Radio {
838        &self.radio
839    }
840
841    /// Mutably borrow the underlying radio.
842    pub fn radio_mut(&mut self) -> &mut P::Radio {
843        &mut self.radio
844    }
845
846    /// Borrow the crypto engine.
847    pub fn crypto(&self) -> &CryptoEngine<P::Aes, P::Sha> {
848        &self.crypto
849    }
850
851    /// Borrow the monotonic clock.
852    pub fn clock(&self) -> &P::Clock {
853        &self.clock
854    }
855
856    /// Borrow the RNG.
857    pub fn rng(&self) -> &P::Rng {
858        &self.rng
859    }
860
861    /// Mutably borrow the RNG.
862    pub fn rng_mut(&mut self) -> &mut P::Rng {
863        &mut self.rng
864    }
865
866    /// Borrow the counter store.
867    pub fn counter_store(&self) -> &P::CounterStore {
868        &self.counter_store
869    }
870    /// Borrow the transmit queue.
871    pub fn tx_queue(&self) -> &TxQueue<TX, FRAME> {
872        &self.tx_queue
873    }
874    /// Mutably borrow the transmit queue.
875    pub fn tx_queue_mut(&mut self) -> &mut TxQueue<TX, FRAME> {
876        &mut self.tx_queue
877    }
878    /// Borrow the duplicate cache.
879    pub fn dup_cache(&self) -> &DuplicateCache<DUP> {
880        &self.dup_cache
881    }
882    /// Borrow the peer registry.
883    pub fn peer_registry(&self) -> &PeerRegistry<PEERS> {
884        &self.peer_registry
885    }
886    /// Mutably borrow the peer registry.
887    pub fn peer_registry_mut(&mut self) -> &mut PeerRegistry<PEERS> {
888        &mut self.peer_registry
889    }
890    /// Borrow the channel table.
891    pub fn channels(&self) -> &ChannelTable<CHANNELS> {
892        &self.channels
893    }
894    /// Mutably borrow the channel table.
895    pub fn channels_mut(&mut self) -> &mut ChannelTable<CHANNELS> {
896        &mut self.channels
897    }
898    /// Borrow repeater configuration.
899    pub fn repeater_config(&self) -> &RepeaterConfig {
900        &self.repeater
901    }
902    /// Mutably borrow repeater configuration.
903    pub fn repeater_config_mut(&mut self) -> &mut RepeaterConfig {
904        &mut self.repeater
905    }
906    /// Borrow the local operating policy.
907    pub fn operating_policy(&self) -> &OperatingPolicy {
908        &self.operating_policy
909    }
910    /// Mutably borrow the local operating policy.
911    pub fn operating_policy_mut(&mut self) -> &mut OperatingPolicy {
912        &mut self.operating_policy
913    }
914
915    /// Return whether inbound secure packets carrying a full source key may auto-register peers.
916    pub fn auto_register_full_key_peers(&self) -> bool {
917        self.auto_register_full_key_peers
918    }
919
920    /// Enable or disable inbound full-key peer auto-registration.
921    pub fn set_auto_register_full_key_peers(&mut self, enabled: bool) {
922        self.auto_register_full_key_peers = enabled;
923    }
924
925    /// Register one long-term local identity.
926    pub fn add_identity(
927        &mut self,
928        identity: P::Identity,
929    ) -> Result<LocalIdentityId, CapacityError> {
930        self.insert_identity(LocalIdentity::LongTerm(identity), None)
931    }
932
933    /// Load the persisted frame-counter boundary for `id` from the counter store.
934    pub async fn load_persisted_counter(
935        &mut self,
936        id: LocalIdentityId,
937    ) -> Result<u32, CounterPersistenceError<<P::CounterStore as CounterStore>::Error>> {
938        let context = {
939            let slot = self
940                .identity(id)
941                .ok_or(CounterPersistenceError::IdentityMissing)?;
942            if !slot.counter_persistence_enabled() {
943                return Ok(slot.frame_counter());
944            }
945            *slot.identity().public_key()
946        };
947        let loaded = self
948            .counter_store
949            .load(&context.0)
950            .await
951            .map_err(CounterPersistenceError::Store)?;
952        let aligned = align_counter_boundary(loaded);
953        let slot = self
954            .identity_mut(id)
955            .ok_or(CounterPersistenceError::IdentityMissing)?;
956        slot.load_persisted_counter(aligned);
957        Ok(aligned)
958    }
959
960    /// Persist all currently scheduled frame-counter reservations.
961    pub async fn service_counter_persistence(
962        &mut self,
963    ) -> Result<usize, <P::CounterStore as CounterStore>::Error> {
964        let mut pending = Vec::<(LocalIdentityId, [u8; 32], u32), IDENTITIES>::new();
965        for (index, slot) in self.identities.iter().enumerate() {
966            let Some(slot) = slot.as_ref() else {
967                continue;
968            };
969            let Some(target) = slot.pending_persist_target() else {
970                continue;
971            };
972            if !slot.counter_persistence_enabled() {
973                continue;
974            }
975            pending
976                .push((
977                    LocalIdentityId(index as u8),
978                    slot.identity().public_key().0,
979                    target,
980                ))
981                .expect("identity enumeration must fit configured identity capacity");
982        }
983
984        let mut wrote = 0usize;
985        for (_, context, target) in pending.iter() {
986            self.counter_store
987                .store(context, align_counter_boundary(*target))
988                .await?;
989            wrote += 1;
990        }
991        if wrote > 0 {
992            self.counter_store.flush().await?;
993            for (id, _, target) in pending {
994                if let Some(slot) = self.identity_mut(id) {
995                    slot.mark_counter_persisted(target);
996                }
997            }
998        }
999        Ok(wrote)
1000    }
1001
1002    #[cfg(feature = "software-crypto")]
1003    /// Register an ephemeral software identity linked to `parent`.
1004    pub fn register_ephemeral(
1005        &mut self,
1006        parent: LocalIdentityId,
1007        identity: umsh_crypto::software::SoftwareIdentity,
1008    ) -> Result<LocalIdentityId, CapacityError> {
1009        self.insert_identity(LocalIdentity::Ephemeral(identity), Some(parent))
1010    }
1011
1012    #[cfg(feature = "software-crypto")]
1013    /// Remove an ephemeral identity slot if one exists at `id`.
1014    pub fn remove_ephemeral(&mut self, id: LocalIdentityId) -> bool {
1015        if let Some(slot) = self.identities.get_mut(id.0 as usize) {
1016            let should_remove = slot
1017                .as_ref()
1018                .map(|identity_slot| identity_slot.identity().is_ephemeral())
1019                .unwrap_or(false);
1020            if should_remove {
1021                *slot = None;
1022                return true;
1023            }
1024        }
1025        false
1026    }
1027
1028    /// Borrow an identity slot by identifier.
1029    pub fn identity(
1030        &self,
1031        id: LocalIdentityId,
1032    ) -> Option<&IdentitySlot<P::Identity, PEERS, ACKS, FRAME>> {
1033        self.identities.get(id.0 as usize)?.as_ref()
1034    }
1035
1036    /// Mutably borrow an identity slot by identifier.
1037    pub fn identity_mut(
1038        &mut self,
1039        id: LocalIdentityId,
1040    ) -> Option<&mut IdentitySlot<P::Identity, PEERS, ACKS, FRAME>> {
1041        self.identities.get_mut(id.0 as usize)?.as_mut()
1042    }
1043
1044    /// Registers or refreshes a known remote peer in the shared registry.
1045    pub fn add_peer(&mut self, key: PublicKey) -> Result<PeerId, CapacityError> {
1046        self.peer_registry.try_insert_or_update(key)
1047    }
1048
1049    /// Adds or updates a shared channel and derives its multicast keys.
1050    pub fn add_channel(&mut self, key: ChannelKey) -> Result<(), CapacityError> {
1051        let derived = self.crypto.derive_channel_keys(&key);
1052        self.channels.try_add(key, derived)
1053    }
1054
1055    /// Adds or updates a named channel using the coordinator's channel-key derivation.
1056    pub fn add_named_channel(&mut self, name: &str) -> Result<(), CapacityError> {
1057        let key = self.crypto.derive_named_channel_key(name);
1058        self.add_channel(key)
1059    }
1060
1061    /// Return the number of occupied identity slots.
1062    pub fn identity_count(&self) -> usize {
1063        self.identities.iter().filter(|slot| slot.is_some()).count()
1064    }
1065
1066    /// Installs pairwise transport keys for one local identity and remote peer.
1067    ///
1068    /// # Safety (logical)
1069    /// Installing wrong keys will silently corrupt the session. This method
1070    /// is crate-internal; external callers should use the `unsafe-advanced`
1071    /// feature or go through the node-layer PFS session manager.
1072    #[cfg(any(feature = "unsafe-advanced", test))]
1073    pub(crate) fn install_pairwise_keys(
1074        &mut self,
1075        identity_id: LocalIdentityId,
1076        peer_id: PeerId,
1077        pairwise_keys: PairwiseKeys,
1078    ) -> Result<Option<crate::peers::PeerCryptoState>, SendError> {
1079        let slot = self
1080            .identity_mut(identity_id)
1081            .ok_or(SendError::IdentityMissing)?;
1082        slot.peer_crypto_mut()
1083            .insert(
1084                peer_id,
1085                crate::peers::PeerCryptoState {
1086                    pairwise_keys,
1087                    replay_window: ReplayWindow::new(),
1088                },
1089            )
1090            .map_err(|_| SendError::QueueFull)
1091    }
1092
1093    /// Installs pairwise transport keys for one local identity and remote peer.
1094    ///
1095    /// # Safety (logical)
1096    /// Installing wrong keys will silently corrupt the session. This method
1097    /// is deliberately gated behind the `unsafe-advanced` feature. Prefer
1098    /// going through the node-layer PFS session manager instead.
1099    #[cfg(feature = "unsafe-advanced")]
1100    pub fn install_pairwise_keys_advanced(
1101        &mut self,
1102        identity_id: LocalIdentityId,
1103        peer_id: PeerId,
1104        pairwise_keys: PairwiseKeys,
1105    ) -> Result<Option<crate::peers::PeerCryptoState>, SendError> {
1106        self.install_pairwise_keys(identity_id, peer_id, pairwise_keys)
1107    }
1108
1109    /// Enqueues a broadcast frame for transmission.
1110    pub fn queue_broadcast(
1111        &mut self,
1112        from: LocalIdentityId,
1113        payload: &[u8],
1114        options: &SendOptions,
1115    ) -> Result<SendReceipt, SendError> {
1116        // Broadcasts are always unencrypted, never ack-requested, and never
1117        // salted by the MAC. Sanitize before policy classification so a reused
1118        // `SendOptions` (whose default `encrypted = true`) doesn't get rejected
1119        // as a `PolicyViolation` in `LicensedOnly` mode for a send that will
1120        // in fact go out unencrypted.
1121        let mut options = options.clone();
1122        options.encrypted = false;
1123        options.ack_requested = false;
1124        options.salt = false;
1125        let options = &options;
1126        self.enforce_send_policy(None, options, false)?;
1127
1128        let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
1129        let source_key = *slot.identity().public_key();
1130        let receipt = slot.next_receipt();
1131        let mut buf = [0u8; FRAME];
1132        let builder = PacketBuilder::new(&mut buf).broadcast();
1133        let mut builder = if options.full_source {
1134            builder.source_full(&source_key)
1135        } else {
1136            builder.source_hint(source_key.hint())
1137        };
1138        if let Some(hops) = options.flood_hops {
1139            builder = builder.flood_hops(hops);
1140        }
1141        if options.trace_route {
1142            builder = builder.trace_route();
1143        }
1144        if let Some(route) = options.source_route.as_ref() {
1145            builder = builder.source_route(route.as_slice());
1146        }
1147        if let Some(region_code) = options.region_code {
1148            builder = builder.region_code(region_code);
1149        }
1150        if let Some(callsign) = self.operating_policy.operator_callsign {
1151            builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1152        }
1153        let frame = builder.payload(payload).build()?;
1154        if frame.len() > self.radio.max_frame_size() {
1155            return Err(SendError::Build(BuildError::BufferTooSmall));
1156        }
1157        self.tx_queue
1158            .enqueue(TxPriority::Application, frame, Some(receipt), Some(from))
1159            .map_err(|_| SendError::QueueFull)?;
1160        Ok(receipt)
1161    }
1162
1163    /// Enqueue a broadcast frame for transmission.
1164    pub async fn send_broadcast(
1165        &mut self,
1166        from: LocalIdentityId,
1167        payload: &[u8],
1168        options: &SendOptions,
1169    ) -> Result<SendReceipt, SendError> {
1170        self.queue_broadcast(from, payload, options)
1171    }
1172
1173    /// Enqueues a multicast frame using the configured channel keys.
1174    pub fn queue_multicast(
1175        &mut self,
1176        from: LocalIdentityId,
1177        channel_id: &ChannelId,
1178        payload: &[u8],
1179        options: &SendOptions,
1180    ) -> Result<SendReceipt, SendError> {
1181        self.enforce_send_policy(Some(*channel_id), options, false)?;
1182        // Multicast has no unicast receiver to ack, so an `ack_requested`
1183        // flag carried over from shared `SendOptions` is silently ignored.
1184
1185        let derived = self
1186            .channels
1187            .lookup_by_id(channel_id)
1188            .next()
1189            .ok_or(SendError::ChannelMissing)?
1190            .derived
1191            .clone();
1192        let keys = PairwiseKeys {
1193            k_enc: derived.k_enc,
1194            k_mic: derived.k_mic,
1195        };
1196        let receipt = self
1197            .identity_mut(from)
1198            .ok_or(SendError::IdentityMissing)?
1199            .next_receipt();
1200        let (source_key, frame_counter) = self.identity_and_advance(from)?;
1201        let salt = self.take_salt(options);
1202        let mut buf = [0u8; FRAME];
1203        let builder = PacketBuilder::new(&mut buf).multicast(*channel_id);
1204        let builder = if options.full_source {
1205            builder.source_full(&source_key)
1206        } else {
1207            builder.source_hint(source_key.hint())
1208        };
1209        let mut builder = builder.frame_counter(frame_counter);
1210        if options.encrypted {
1211            builder = builder.encrypted();
1212        }
1213        builder = builder.mic_size(options.mic_size);
1214        if let Some(salt) = salt {
1215            builder = builder.salt(salt);
1216        }
1217        if let Some(hops) = options.flood_hops {
1218            builder = builder.flood_hops(hops);
1219        }
1220        if options.trace_route {
1221            builder = builder.trace_route();
1222        }
1223        if let Some(route) = options.source_route.as_ref() {
1224            builder = builder.source_route(route.as_slice());
1225        }
1226        if let Some(region_code) = options.region_code {
1227            builder = builder.region_code(region_code);
1228        }
1229        if let Some(callsign) = self.operating_policy.operator_callsign {
1230            builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1231        }
1232        let mut packet = builder.payload(payload).build()?;
1233        self.crypto.seal_packet(&mut packet, &keys)?;
1234        self.enqueue_packet(packet, Some(receipt), Some(from))?;
1235        Ok(receipt)
1236    }
1237
1238    /// Enqueue a multicast frame for transmission.
1239    pub async fn send_multicast(
1240        &mut self,
1241        from: LocalIdentityId,
1242        channel_id: &ChannelId,
1243        payload: &[u8],
1244        options: &SendOptions,
1245    ) -> Result<SendReceipt, SendError> {
1246        self.queue_multicast(from, channel_id, payload, options)
1247    }
1248
1249    /// Enqueues a MAC ACK frame, using any cached route to `peer_id` when available.
1250    pub fn queue_mac_ack_for_peer(
1251        &mut self,
1252        peer_id: PeerId,
1253        dst: NodeHint,
1254        ack_tag: [u8; 8],
1255    ) -> Result<(), SendError> {
1256        let mut buf = [0u8; FRAME];
1257        let mut builder = PacketBuilder::new(&mut buf).mac_ack(dst, ack_tag);
1258        if let Some(peer) = self.peer_registry.get(peer_id) {
1259            match peer.route.as_ref() {
1260                Some(CachedRoute::Direct) | None => {}
1261                Some(CachedRoute::Source(route)) => {
1262                    builder = builder.source_route(route.as_slice());
1263                }
1264                Some(CachedRoute::Flood { hops, regions }) => {
1265                    builder = builder.flood_hops((*hops).clamp(1, 15));
1266                    for region in regions {
1267                        builder = builder.region_code(*region);
1268                    }
1269                }
1270            }
1271        }
1272        let frame = builder.build()?;
1273        if frame.len() > self.radio.max_frame_size() {
1274            return Err(SendError::Build(BuildError::BufferTooSmall));
1275        }
1276        self.tx_queue
1277            .enqueue(TxPriority::ImmediateAck, frame, None, None)
1278            .map_err(|_| SendError::QueueFull)?;
1279        Ok(())
1280    }
1281
1282    /// Enqueues an immediate direct MAC ACK frame.
1283    pub fn queue_mac_ack(&mut self, dst: NodeHint, ack_tag: [u8; 8]) -> Result<(), SendError> {
1284        let mut buf = [0u8; FRAME];
1285        let frame = PacketBuilder::new(&mut buf).mac_ack(dst, ack_tag).build()?;
1286        if frame.len() > self.radio.max_frame_size() {
1287            return Err(SendError::Build(BuildError::BufferTooSmall));
1288        }
1289        self.tx_queue
1290            .enqueue(TxPriority::ImmediateAck, frame, None, None)
1291            .map_err(|_| SendError::QueueFull)?;
1292        Ok(())
1293    }
1294
1295    /// Enqueues a unicast frame and optional pending-ACK state.
1296    pub fn queue_unicast(
1297        &mut self,
1298        from: LocalIdentityId,
1299        peer: &PublicKey,
1300        payload: &[u8],
1301        options: &SendOptions,
1302    ) -> Result<Option<SendReceipt>, SendError> {
1303        self.enforce_send_policy(None, options, false)?;
1304        let (peer_id, _) = self
1305            .peer_registry
1306            .lookup_by_key(peer)
1307            .ok_or(SendError::PeerMissing)?;
1308        let pairwise_keys = self
1309            .identity(from)
1310            .ok_or(SendError::IdentityMissing)?
1311            .peer_crypto()
1312            .get(&peer_id)
1313            .ok_or(SendError::PairwiseKeysMissing)?
1314            .pairwise_keys
1315            .clone();
1316        let effective_source_route = self.effective_source_route(peer_id, options);
1317
1318        let (source_key, frame_counter) = self.identity_and_advance(from)?;
1319        let salt = self.take_salt(options);
1320        let mut buf = [0u8; FRAME];
1321        let builder = PacketBuilder::new(&mut buf).unicast(peer.hint());
1322        let builder = if options.full_source {
1323            builder.source_full(&source_key)
1324        } else {
1325            builder.source_hint(source_key.hint())
1326        };
1327        let mut builder = builder.frame_counter(frame_counter);
1328        if options.ack_requested {
1329            builder = builder.ack_requested();
1330        }
1331        if options.encrypted {
1332            builder = builder.encrypted();
1333        }
1334        builder = builder.mic_size(options.mic_size);
1335        if let Some(salt) = salt {
1336            builder = builder.salt(salt);
1337        }
1338        if let Some(hops) = options.flood_hops {
1339            builder = builder.flood_hops(hops);
1340        }
1341        if options.trace_route {
1342            builder = builder.trace_route();
1343        }
1344        if let Some(route) = effective_source_route.as_ref() {
1345            builder = builder.source_route(route.as_slice());
1346        }
1347        if let Some(region_code) = options.region_code {
1348            builder = builder.region_code(region_code);
1349        }
1350        if let Some(callsign) = self.operating_policy.operator_callsign {
1351            builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1352        }
1353        let mut packet = builder.payload(payload).build()?;
1354
1355        let receipt = if options.ack_requested {
1356            Some(self.prepare_pending_ack(from, *peer, &packet, &pairwise_keys, options)?)
1357        } else {
1358            None
1359        };
1360
1361        self.crypto.seal_packet(&mut packet, &pairwise_keys)?;
1362        if let Some(receipt) = receipt {
1363            self.refresh_pending_resend(
1364                from,
1365                receipt,
1366                packet.as_bytes(),
1367                effective_source_route
1368                    .as_ref()
1369                    .map(|route| route.as_slice()),
1370            )?;
1371        }
1372        if let Err(err) = self.enqueue_packet(packet, receipt, Some(from)) {
1373            if let Some(receipt) = receipt {
1374                let _ = self
1375                    .identity_mut(from)
1376                    .and_then(|slot| slot.remove_pending_ack(&receipt));
1377            }
1378            return Err(err);
1379        }
1380        Ok(receipt)
1381    }
1382
1383    /// Enqueue a unicast frame for transmission, deriving secure peer state on first use.
1384    pub async fn send_unicast(
1385        &mut self,
1386        from: LocalIdentityId,
1387        peer: &PublicKey,
1388        payload: &[u8],
1389        options: &SendOptions,
1390    ) -> Result<Option<SendReceipt>, SendError> {
1391        let (peer_id, _) = self
1392            .peer_registry
1393            .lookup_by_key(peer)
1394            .ok_or(SendError::PeerMissing)?;
1395        let _ = self.ensure_peer_crypto(from, peer_id).await?;
1396        self.queue_unicast(from, peer, payload, options)
1397    }
1398
1399    /// Enqueues a blind-unicast frame and optional pending-ACK state.
1400    pub fn queue_blind_unicast(
1401        &mut self,
1402        from: LocalIdentityId,
1403        peer: &PublicKey,
1404        channel_id: &ChannelId,
1405        payload: &[u8],
1406        options: &SendOptions,
1407    ) -> Result<Option<SendReceipt>, SendError> {
1408        self.enforce_send_policy(Some(*channel_id), options, true)?;
1409        let (peer_id, _) = self
1410            .peer_registry
1411            .lookup_by_key(peer)
1412            .ok_or(SendError::PeerMissing)?;
1413        let pairwise_keys = self
1414            .identity(from)
1415            .ok_or(SendError::IdentityMissing)?
1416            .peer_crypto()
1417            .get(&peer_id)
1418            .ok_or(SendError::PairwiseKeysMissing)?
1419            .pairwise_keys
1420            .clone();
1421        let channel_keys = self
1422            .channels
1423            .lookup_by_id(channel_id)
1424            .next()
1425            .ok_or(SendError::ChannelMissing)?
1426            .derived
1427            .clone();
1428        let blind_keys = self.crypto.derive_blind_keys(&pairwise_keys, &channel_keys);
1429        let effective_source_route = self.effective_source_route(peer_id, options);
1430
1431        let (source_key, frame_counter) = self.identity_and_advance(from)?;
1432        let salt = self.take_salt(options);
1433        let mut buf = [0u8; FRAME];
1434        let builder = PacketBuilder::new(&mut buf).blind_unicast(*channel_id, peer.hint());
1435        let builder = if options.full_source {
1436            builder.source_full(&source_key)
1437        } else {
1438            builder.source_hint(source_key.hint())
1439        };
1440        let mut builder = builder.frame_counter(frame_counter);
1441        if options.ack_requested {
1442            builder = builder.ack_requested();
1443        }
1444        if !options.encrypted {
1445            builder = builder.unencrypted();
1446        }
1447        builder = builder.mic_size(options.mic_size);
1448        if let Some(salt) = salt {
1449            builder = builder.salt(salt);
1450        }
1451        if let Some(hops) = options.flood_hops {
1452            builder = builder.flood_hops(hops);
1453        }
1454        if options.trace_route {
1455            builder = builder.trace_route();
1456        }
1457        if let Some(route) = effective_source_route.as_ref() {
1458            builder = builder.source_route(route.as_slice());
1459        }
1460        if let Some(region_code) = options.region_code {
1461            builder = builder.region_code(region_code);
1462        }
1463        if let Some(callsign) = self.operating_policy.operator_callsign {
1464            builder = builder.option(OptionNumber::OperatorCallsign, callsign.as_trimmed_slice());
1465        }
1466        let mut packet = builder.payload(payload).build()?;
1467
1468        let receipt = if options.ack_requested {
1469            Some(self.prepare_pending_ack(from, *peer, &packet, &blind_keys, options)?)
1470        } else {
1471            None
1472        };
1473
1474        self.crypto
1475            .seal_blind_packet(&mut packet, &blind_keys, &channel_keys)
1476            .map_err(SendError::Crypto)?;
1477        if let Some(receipt) = receipt {
1478            self.refresh_pending_resend(
1479                from,
1480                receipt,
1481                packet.as_bytes(),
1482                effective_source_route
1483                    .as_ref()
1484                    .map(|route| route.as_slice()),
1485            )?;
1486        }
1487        if let Err(err) = self.enqueue_packet(packet, receipt, Some(from)) {
1488            if let Some(receipt) = receipt {
1489                let _ = self
1490                    .identity_mut(from)
1491                    .and_then(|slot| slot.remove_pending_ack(&receipt));
1492            }
1493            return Err(err);
1494        }
1495        Ok(receipt)
1496    }
1497
1498    /// Enqueue a blind-unicast frame for transmission, deriving secure peer state on first use.
1499    pub async fn send_blind_unicast(
1500        &mut self,
1501        from: LocalIdentityId,
1502        peer: &PublicKey,
1503        channel_id: &ChannelId,
1504        payload: &[u8],
1505        options: &SendOptions,
1506    ) -> Result<Option<SendReceipt>, SendError> {
1507        let (peer_id, _) = self
1508            .peer_registry
1509            .lookup_by_key(peer)
1510            .ok_or(SendError::PeerMissing)?;
1511        let _ = self.ensure_peer_crypto(from, peer_id).await?;
1512        self.queue_blind_unicast(from, peer, channel_id, payload, options)
1513    }
1514
1515    /// Transmit the next eligible queued frame, if any.
1516    ///
1517    /// While a post-transmit forwarding listen window is active, only immediate MAC
1518    /// ACK traffic is permitted to bypass the listen state. Forwarded sends arm a new
1519    /// listen window after the radio transmit completes. Non-immediate traffic honors
1520    /// queued CAD backoff state and gives up after the configured maximum number of
1521    /// CAD attempts.
1522    pub async fn transmit_next(
1523        &mut self,
1524        on_event: &mut impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1525    ) -> Result<Option<SendReceipt>, MacError<<P::Radio as Radio>::Error>> {
1526        self.expire_post_tx_listen_if_needed();
1527        let Some(queued) = self.tx_queue.pop_next() else {
1528            return Ok(None);
1529        };
1530        let now_ms = self.clock.now_ms();
1531
1532        if queued.not_before_ms > now_ms {
1533            self.requeue_tx(&queued).map_err(|_| MacError::QueueFull)?;
1534            return Ok(None);
1535        }
1536
1537        if self.post_tx_listen.is_some() && queued.priority != TxPriority::ImmediateAck {
1538            self.requeue_tx(&queued).map_err(|_| MacError::QueueFull)?;
1539            return Ok(None);
1540        }
1541
1542        let receipt = queued.receipt;
1543        let identity_id = queued.identity_id;
1544        let tx_options = if queued.priority == TxPriority::ImmediateAck {
1545            TxOptions::default()
1546        } else {
1547            TxOptions {
1548                cad_timeout_ms: Some(0),
1549            }
1550        };
1551        match self
1552            .radio
1553            .transmit(queued.frame.as_slice(), tx_options)
1554            .await
1555        {
1556            Ok(()) => {}
1557            Err(TxError::CadTimeout) => {
1558                let next_attempt = queued.cad_attempts.saturating_add(1);
1559                if next_attempt >= MAX_CAD_ATTEMPTS {
1560                    return Ok(None);
1561                }
1562                let backoff_ms = u64::from(
1563                    self.rng
1564                        .random_range(..self.radio.t_frame_ms().saturating_add(1)),
1565                );
1566                self.tx_queue
1567                    .enqueue_with_state(
1568                        queued.priority,
1569                        queued.frame.as_slice(),
1570                        queued.receipt,
1571                        queued.identity_id,
1572                        now_ms.saturating_add(backoff_ms),
1573                        next_attempt,
1574                        queued.forward_deferrals,
1575                    )
1576                    .map_err(|_| MacError::QueueFull)?;
1577                return Ok(None);
1578            }
1579            Err(error) => return Err(MacError::Transmit(error)),
1580        }
1581        if let Some(identity_id) = identity_id {
1582            on_event(
1583                identity_id,
1584                crate::MacEventRef::Transmitted {
1585                    identity_id,
1586                    receipt,
1587                },
1588            );
1589        }
1590        if let Some(receipt) = receipt {
1591            self.note_transmitted_ack_requested(receipt, queued.frame.as_slice());
1592        }
1593        Ok(receipt)
1594    }
1595
1596    /// Keep transmitting until the queue is empty.
1597    ///
1598    /// Progress stops when CAD keeps reporting busy, when a post-transmit listen window blocks
1599    /// normal traffic, or when the queue is otherwise unable to shrink further in the current cycle.
1600    pub async fn drain_tx_queue(
1601        &mut self,
1602        on_event: &mut impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1603    ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1604        while !self.tx_queue.is_empty() {
1605            let queue_len = self.tx_queue.len();
1606            let _ = self.transmit_next(on_event).await?;
1607            if self.tx_queue.len() >= queue_len {
1608                break;
1609            }
1610        }
1611        Ok(())
1612    }
1613
1614    /// Runs one coordinator cycle over the current MAC state.
1615    ///
1616    /// The cycle performs four ordered phases:
1617    ///
1618    /// 1. Drain any queued transmit work.
1619    /// 2. Receive and process at most one inbound frame.
1620    /// 3. Drain any immediate ACK generated during receive handling.
1621    /// 4. Service pending ACK timers and emit timeout events.
1622    ///
1623    /// The callback may be invoked zero or more times depending on what the
1624    /// receive and timeout phases accept or resolve.
1625    /// Service one MAC coordinator cycle.
1626    pub async fn poll_cycle(
1627        &mut self,
1628        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1629    ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1630        self.drain_tx_queue(&mut on_event).await?;
1631        if self.post_tx_listen.is_some() {
1632            self.service_post_tx_listen(&mut on_event).await?;
1633        } else {
1634            let _ = self.receive_one(&mut on_event).await?;
1635        }
1636        self.drain_tx_queue(&mut on_event).await?;
1637        self.service_pending_ack_timeouts(&mut on_event)
1638            .map_err(|_| MacError::QueueFull)?;
1639        Ok(())
1640    }
1641
1642    /// Compute the earliest deadline across all coordinator timers.
1643    ///
1644    /// Returns `None` when there are no pending timers.  The returned value
1645    /// covers pending ACK deadlines (both `ack_deadline_ms` and forwarding
1646    /// `confirm_deadline_ms`), the post-transmit listen window, and deferred
1647    /// transmit-queue entries.
1648    pub fn earliest_deadline_ms(&self) -> Option<u64> {
1649        let mut earliest: Option<u64> = None;
1650
1651        if let Some(listen) = &self.post_tx_listen {
1652            earliest =
1653                Some(earliest.map_or(listen.deadline_ms, |e: u64| e.min(listen.deadline_ms)));
1654        }
1655
1656        for slot in self.identities.iter().filter_map(|s| s.as_ref()) {
1657            for (_, pending) in slot.pending_acks.iter() {
1658                if !matches!(pending.state, crate::AckState::Queued { .. }) {
1659                    earliest = Some(earliest.map_or(pending.ack_deadline_ms, |e: u64| {
1660                        e.min(pending.ack_deadline_ms)
1661                    }));
1662                }
1663                if let crate::AckState::AwaitingForward {
1664                    confirm_deadline_ms,
1665                } = pending.state
1666                {
1667                    earliest = Some(
1668                        earliest.map_or(confirm_deadline_ms, |e: u64| e.min(confirm_deadline_ms)),
1669                    );
1670                }
1671            }
1672        }
1673
1674        if let Some(nb) = self.tx_queue.earliest_not_before_ms() {
1675            earliest = Some(earliest.map_or(nb, |e: u64| e.min(nb)));
1676        }
1677
1678        earliest
1679    }
1680
1681    /// Run the coordinator's event loop until at least one event is delivered
1682    /// or a timer-driven action (retransmit, timeout) is processed.
1683    ///
1684    /// Unlike [`poll_cycle`](Self::poll_cycle), this method properly awaits the
1685    /// radio and timer deadlines instead of returning immediately when nothing
1686    /// is ready.  Callers can use `tokio::select!` (or equivalent) to multiplex
1687    /// user input alongside MAC events:
1688    ///
1689    /// ```ignore
1690    /// loop {
1691    ///     tokio::select! {
1692    ///         line = stdin.next_line() => { /* handle input */ }
1693    ///         result = mac.next_event(|id, event| { /* handle event */ }) => {
1694    ///             result?;
1695    ///         }
1696    ///     }
1697    /// }
1698    /// ```
1699    pub async fn next_event(
1700        &mut self,
1701        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1702    ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1703        loop {
1704            // Phase 1: Drain ready transmit work.
1705            self.drain_tx_queue(&mut on_event).await?;
1706
1707            // Phase 2: Wait for a radio frame or the earliest timer deadline.
1708            let mut buf = [0u8; FRAME];
1709            let reason = poll_fn(|cx| self.poll_wait_for_wake(cx, &mut buf))
1710                .await
1711                .map_err(MacError::Radio)?;
1712
1713            // Phases 3-5: process the wake, drain any follow-up sends, and run timeouts.
1714            self.process_wake_reason(reason, &mut buf, &mut on_event)
1715                .await?;
1716
1717            // If the tx_queue has new work (e.g. retransmits just enqueued),
1718            // loop back to drain it before waiting again.
1719            if !self.tx_queue.is_empty() {
1720                continue;
1721            }
1722
1723            return Ok(());
1724        }
1725    }
1726
1727    /// Register radio/timer wakers and report what has become ready.
1728    ///
1729    /// This is Phase 2 of [`next_event`](Self::next_event) exposed as a sync
1730    /// poll method so that callers sharing the coordinator through an
1731    /// `AsyncRefCell` can release the exclusive borrow between polls. The
1732    /// caller-provided `buf` is populated with the received frame when the
1733    /// return value is [`WakeReason::Received`].
1734    pub fn poll_wait_for_wake(
1735        &mut self,
1736        cx: &mut core::task::Context<'_>,
1737        buf: &mut [u8; FRAME],
1738    ) -> Poll<Result<WakeReason, <P::Radio as Radio>::Error>> {
1739        match self.radio.poll_receive(cx, buf) {
1740            Poll::Ready(Ok(rx)) => return Poll::Ready(Ok(WakeReason::Received(rx))),
1741            Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
1742            Poll::Pending => {}
1743        }
1744
1745        let now_ms = self.clock.now_ms();
1746        if let Some(deadline) = self.earliest_deadline_ms() {
1747            if now_ms >= deadline {
1748                return Poll::Ready(Ok(WakeReason::TimerExpired));
1749            }
1750            let _ = self.clock.poll_delay_until(cx, deadline);
1751        }
1752
1753        if self.tx_queue.has_ready(now_ms) {
1754            return Poll::Ready(Ok(WakeReason::TimerExpired));
1755        }
1756
1757        Poll::Pending
1758    }
1759
1760    /// Run Phases 3-5 after [`poll_wait_for_wake`](Self::poll_wait_for_wake)
1761    /// has reported a wake reason: process the received frame (if any),
1762    /// drain immediate ACKs, and service pending ACK timeouts.
1763    pub async fn process_wake_reason(
1764        &mut self,
1765        reason: WakeReason,
1766        buf: &mut [u8; FRAME],
1767        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1768    ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1769        match reason {
1770            WakeReason::Received(rx) => {
1771                let frame_len = rx.len.min(buf.len());
1772                let _ = self
1773                    .process_received_frame(buf, frame_len, &rx, &mut on_event)
1774                    .await;
1775            }
1776            WakeReason::TimerExpired => {}
1777        }
1778
1779        self.drain_tx_queue(&mut on_event).await?;
1780
1781        self.service_pending_ack_timeouts(&mut on_event)
1782            .map_err(|_| MacError::QueueFull)?;
1783
1784        Ok(())
1785    }
1786
1787    /// Drive the coordinator forever, invoking `on_event` for each delivered event.
1788    ///
1789    /// This is the preferred long-lived run loop for standalone MAC-driven tasks such as
1790    /// repeaters or dedicated radio services. Unlike manually calling
1791    /// [`poll_cycle`](Self::poll_cycle) in a loop, `run` keeps the wake/sleep policy inside
1792    /// the coordinator by delegating to [`next_event`](Self::next_event), which already
1793    /// waits for radio activity and protocol deadlines.
1794    pub async fn run(
1795        &mut self,
1796        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1797    ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1798        loop {
1799            self.next_event(&mut on_event).await?;
1800        }
1801    }
1802
1803    /// Drive the coordinator forever while ignoring emitted events.
1804    ///
1805    /// Useful for standalone repeaters or bridge tasks that do not need to observe inbound
1806    /// deliveries directly but still need the coordinator to service forwarding, ACKs, and
1807    /// retransmissions without an app-owned polling loop.
1808    pub async fn run_quiet(&mut self) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
1809        self.run(|_, _| {}).await
1810    }
1811
1812    /// Process a received frame, dispatching events through `on_event`.
1813    ///
1814    /// This is the shared implementation used by both [`receive_one`](Self::receive_one)
1815    /// and [`next_event`](Self::next_event).  Returns `true` when the frame
1816    /// produced at least one event or side-effect.
1817    pub async fn process_received_frame(
1818        &mut self,
1819        buf: &mut [u8; FRAME],
1820        frame_len: usize,
1821        rx: &RxInfo,
1822        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1823    ) -> bool {
1824        let received_at_ms = self.clock.now_ms();
1825        let mut current_len = frame_len;
1826        let mut current_rx = RxInfo {
1827            len: frame_len,
1828            rssi: rx.rssi,
1829            snr: rx.snr,
1830            lqi: rx.lqi,
1831        };
1832        let mut current_received_at_ms = received_at_ms;
1833        let mut handled_any = false;
1834
1835        loop {
1836            let Ok(header) = PacketHeader::parse(&buf[..current_len]) else {
1837                return handled_any;
1838            };
1839            let forwarding_confirmed = if let Some((identity_id, receipt)) =
1840                self.observe_forwarding_confirmation(&buf[..current_len])
1841            {
1842                let hint = match header.source {
1843                    SourceAddrRef::Hint(h) => Some(RouterHint([h.0[0], h.0[1]])),
1844                    SourceAddrRef::FullKeyAt { offset } => {
1845                        let mut key_bytes = [0u8; 32];
1846                        key_bytes.copy_from_slice(&buf[offset..offset + 32]);
1847                        let h = PublicKey(key_bytes).hint();
1848                        Some(RouterHint([h.0[0], h.0[1]]))
1849                    }
1850                    _ => None,
1851                };
1852                on_event(
1853                    identity_id,
1854                    crate::MacEventRef::Forwarded {
1855                        identity_id,
1856                        receipt,
1857                        hint,
1858                    },
1859                );
1860                true
1861            } else {
1862                false
1863            };
1864
1865            let (handled, replay_target) = match header.packet_type() {
1866                PacketType::Broadcast => (
1867                    self.process_broadcast(
1868                        buf,
1869                        current_len,
1870                        &header,
1871                        &current_rx,
1872                        current_received_at_ms,
1873                        &mut on_event,
1874                    ),
1875                    None,
1876                ),
1877                PacketType::MacAck => (
1878                    self.process_mac_ack(
1879                        buf,
1880                        current_len,
1881                        &header,
1882                        &current_rx,
1883                        forwarding_confirmed,
1884                        &mut on_event,
1885                    ),
1886                    None,
1887                ),
1888                PacketType::Unicast | PacketType::UnicastAckReq => {
1889                    self.process_unicast(
1890                        buf,
1891                        current_len,
1892                        &header,
1893                        &current_rx,
1894                        current_received_at_ms,
1895                        forwarding_confirmed,
1896                        &mut on_event,
1897                    )
1898                    .await
1899                }
1900                PacketType::Multicast => (
1901                    self.process_multicast(
1902                        buf,
1903                        current_len,
1904                        &header,
1905                        &current_rx,
1906                        current_received_at_ms,
1907                        forwarding_confirmed,
1908                        &mut on_event,
1909                    ),
1910                    None,
1911                ),
1912                PacketType::BlindUnicast | PacketType::BlindUnicastAckReq => {
1913                    self.process_blind_unicast(
1914                        buf,
1915                        current_len,
1916                        &header,
1917                        &current_rx,
1918                        current_received_at_ms,
1919                        forwarding_confirmed,
1920                        &mut on_event,
1921                    )
1922                    .await
1923                }
1924                PacketType::Reserved5 => (false, None),
1925            };
1926            handled_any |= handled;
1927
1928            let Some((local_id, peer_id)) = replay_target else {
1929                return handled_any;
1930            };
1931            let Some(deferred) = self.take_deferred_counter_resync_frame(local_id, peer_id) else {
1932                return handled_any;
1933            };
1934            current_len = deferred.frame.len();
1935            buf[..current_len].copy_from_slice(deferred.frame.as_slice());
1936            current_rx = RxInfo {
1937                len: current_len,
1938                rssi: deferred.rssi,
1939                snr: deferred.snr,
1940                lqi: deferred.lqi,
1941            };
1942            current_received_at_ms = deferred.received_at_ms;
1943        }
1944    }
1945
1946    fn process_broadcast(
1947        &mut self,
1948        buf: &[u8; FRAME],
1949        frame_len: usize,
1950        header: &PacketHeader,
1951        rx: &RxInfo,
1952        received_at_ms: u64,
1953        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
1954    ) -> bool {
1955        let Some((from_hint, from_key)) = Self::resolve_broadcast_source(&buf[..frame_len], header)
1956        else {
1957            return false;
1958        };
1959        if !Self::payload_is_allowed(header.packet_type(), &buf[header.body_range.clone()]) {
1960            return false;
1961        }
1962        let mut delivered = false;
1963        for (index, slot) in self.identities.iter().enumerate() {
1964            if slot.is_none() {
1965                continue;
1966            }
1967            delivered = true;
1968            on_event(
1969                LocalIdentityId(index as u8),
1970                crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
1971                    &buf[..frame_len],
1972                    &buf[header.body_range.clone()],
1973                    header.clone(),
1974                    ParsedOptions::extract(&buf[..frame_len], header.options_range.clone())
1975                        .unwrap_or_default(),
1976                    from_key,
1977                    Some(from_hint),
1978                    false,
1979                    None,
1980                    crate::send::RxMetadata::new(
1981                        Some(rx.rssi),
1982                        Some(rx.snr),
1983                        rx.lqi,
1984                        Some(received_at_ms),
1985                    ),
1986                )),
1987            );
1988        }
1989        // Broadcast delivery does not consume the packet. Broadcast remains a
1990        // routable mesh packet and may still be forwarded by a repeater after
1991        // local delivery.
1992        let forwarded = self.maybe_forward_received(&buf[..frame_len], header, rx, false);
1993        delivered || forwarded
1994    }
1995
1996    fn process_mac_ack(
1997        &mut self,
1998        buf: &[u8; FRAME],
1999        frame_len: usize,
2000        header: &PacketHeader,
2001        rx: &RxInfo,
2002        forwarding_confirmed: bool,
2003        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2004    ) -> bool {
2005        let Some(ack_dst) = header.ack_dst else {
2006            return false;
2007        };
2008        let target_peer = self
2009            .identities
2010            .iter()
2011            .filter_map(|slot| slot.as_ref())
2012            .find(|slot| slot.identity().public_key().hint() == ack_dst)
2013            .and_then(|slot| self.match_pending_peer_for_ack(slot, &buf[header.mic_range.clone()]));
2014        if let Some(target_peer) = target_peer {
2015            let mut ack_tag = [0u8; 8];
2016            ack_tag.copy_from_slice(&buf[header.mic_range.clone()]);
2017            if let Some((identity_id, receipt)) = self.complete_ack(&target_peer, &ack_tag) {
2018                on_event(
2019                    identity_id,
2020                    crate::MacEventRef::AckReceived {
2021                        peer: target_peer,
2022                        receipt,
2023                    },
2024                );
2025                return true;
2026            }
2027        }
2028        forwarding_confirmed || self.maybe_forward_received(&buf[..frame_len], header, rx, false)
2029    }
2030
2031    async fn process_unicast(
2032        &mut self,
2033        buf: &mut [u8; FRAME],
2034        frame_len: usize,
2035        header: &PacketHeader,
2036        rx: &RxInfo,
2037        received_at_ms: u64,
2038        forwarding_confirmed: bool,
2039        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2040    ) -> (bool, Option<(LocalIdentityId, PeerId)>) {
2041        let mut original = [0u8; FRAME];
2042        original[..frame_len].copy_from_slice(&buf[..frame_len]);
2043        let mut replay_target = None;
2044        let handled = if let Some(local_id) = self.find_local_identity_for_dst(header.dst) {
2045            let mut handled = false;
2046            for (peer_id, peer_key) in
2047                self.resolve_source_peer_candidates(&buf[..frame_len], header)
2048            {
2049                let Ok(keys) = self.ensure_peer_crypto(local_id, peer_id).await else {
2050                    continue;
2051                };
2052                let Ok(body_range) = self
2053                    .crypto
2054                    .open_packet(&mut buf[..frame_len], header, &keys)
2055                else {
2056                    continue;
2057                };
2058                let payload = &buf[body_range.clone()];
2059                if !Self::payload_is_allowed(header.packet_type(), payload) {
2060                    continue;
2061                }
2062                match self.unicast_replay_verdict(local_id, peer_id, header, &buf[..frame_len]) {
2063                    Some(ReplayVerdict::Accept) => {
2064                        let _ = self.accept_unicast_replay(
2065                            local_id,
2066                            peer_id,
2067                            header,
2068                            &buf[..frame_len],
2069                        );
2070                    }
2071                    Some(ReplayVerdict::OutOfWindow | ReplayVerdict::Stale) => {
2072                        if self.try_accept_counter_resync_response(
2073                            local_id,
2074                            peer_id,
2075                            header,
2076                            &buf[..frame_len],
2077                            payload,
2078                        ) {
2079                            replay_target = Some((local_id, peer_id));
2080                        } else {
2081                            self.store_deferred_counter_resync_frame(
2082                                local_id,
2083                                peer_id,
2084                                &original[..frame_len],
2085                                rx,
2086                                received_at_ms,
2087                            );
2088                            self.maybe_request_counter_resync(local_id, peer_id, peer_key)
2089                                .await;
2090                            continue;
2091                        }
2092                    }
2093                    Some(ReplayVerdict::Replay) | None => continue,
2094                }
2095                self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2096
2097                if header.ack_requested()
2098                    && self.should_emit_destination_ack(&buf[..frame_len], header)
2099                {
2100                    let ack_tag = self.compute_received_ack_tag(
2101                        &buf[..frame_len],
2102                        header,
2103                        body_range.clone(),
2104                        &keys,
2105                    );
2106                    self.queue_mac_ack_for_peer(peer_id, peer_key.hint(), ack_tag)
2107                        .ok();
2108                }
2109
2110                if let Some(data) = Self::echo_request_data(payload) {
2111                    let response =
2112                        Self::build_echo_command_payload(MAC_COMMAND_ECHO_RESPONSE_ID, data);
2113                    let _ = self
2114                        .send_unicast(
2115                            local_id,
2116                            &peer_key,
2117                            response.as_slice(),
2118                            &SendOptions::default(),
2119                        )
2120                        .await;
2121                }
2122
2123                on_event(
2124                    local_id,
2125                    crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2126                        &original[..frame_len],
2127                        &buf[body_range],
2128                        header.clone(),
2129                        ParsedOptions::extract(
2130                            &original[..frame_len],
2131                            header.options_range.clone(),
2132                        )
2133                        .unwrap_or_default(),
2134                        Some(peer_key),
2135                        Some(peer_key.hint()),
2136                        true,
2137                        None,
2138                        crate::send::RxMetadata::new(
2139                            Some(rx.rssi),
2140                            Some(rx.snr),
2141                            rx.lqi,
2142                            Some(received_at_ms),
2143                        ),
2144                    )),
2145                );
2146                handled = true;
2147                break;
2148            }
2149            handled
2150        } else {
2151            false
2152        };
2153        let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, handled);
2154        (
2155            handled || forwarding_confirmed || forwarded,
2156            handled.then_some(()).and(replay_target),
2157        )
2158    }
2159
2160    fn process_multicast(
2161        &mut self,
2162        buf: &mut [u8; FRAME],
2163        frame_len: usize,
2164        header: &PacketHeader,
2165        rx: &RxInfo,
2166        received_at_ms: u64,
2167        forwarding_confirmed: bool,
2168        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2169    ) -> bool {
2170        let mut original = [0u8; FRAME];
2171        original[..frame_len].copy_from_slice(&buf[..frame_len]);
2172        let delivered = if let Some(channel_id) = header.channel {
2173            let channel_info = {
2174                self.channels
2175                    .lookup_by_id(&channel_id)
2176                    .next()
2177                    .map(|channel| (channel.channel_key.clone(), channel.derived.clone()))
2178            };
2179            if let Some((channel_key, derived)) = channel_info {
2180                let keys = PairwiseKeys {
2181                    k_enc: derived.k_enc,
2182                    k_mic: derived.k_mic,
2183                };
2184                if let Ok(body_range) =
2185                    self.crypto
2186                        .open_packet(&mut buf[..frame_len], header, &keys)
2187                {
2188                    if !Self::payload_is_allowed(header.packet_type(), &buf[body_range.clone()]) {
2189                        false
2190                    } else if let Some(source) =
2191                        self.resolve_multicast_source(&buf[..frame_len], header)
2192                    {
2193                        let accepted = if let Some(peer_id) = source.peer_id {
2194                            let accepted = self.accept_multicast_replay(
2195                                channel_id,
2196                                peer_id,
2197                                header,
2198                                &buf[..frame_len],
2199                            );
2200                            if accepted {
2201                                self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2202                            }
2203                            accepted
2204                        } else {
2205                            self.accept_unknown_multicast_replay(header, &buf[..frame_len])
2206                        };
2207                        if accepted {
2208                            let mut delivered = false;
2209                            for (index, slot) in self.identities.iter().enumerate() {
2210                                if slot.is_none() {
2211                                    continue;
2212                                }
2213                                delivered = true;
2214                                on_event(
2215                                    LocalIdentityId(index as u8),
2216                                    crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2217                                        &original[..frame_len],
2218                                        &buf[body_range.clone()],
2219                                        header.clone(),
2220                                        ParsedOptions::extract(
2221                                            &original[..frame_len],
2222                                            header.options_range.clone(),
2223                                        )
2224                                        .unwrap_or_default(),
2225                                        source.public_key,
2226                                        source
2227                                            .hint
2228                                            .or_else(|| source.public_key.map(|key| key.hint())),
2229                                        true,
2230                                        Some(crate::ChannelInfoRef {
2231                                            id: channel_id,
2232                                            key: &channel_key,
2233                                        }),
2234                                        crate::send::RxMetadata::new(
2235                                            Some(rx.rssi),
2236                                            Some(rx.snr),
2237                                            rx.lqi,
2238                                            Some(received_at_ms),
2239                                        ),
2240                                    )),
2241                                );
2242                            }
2243                            delivered
2244                        } else {
2245                            false
2246                        }
2247                    } else {
2248                        false
2249                    }
2250                } else {
2251                    false
2252                }
2253            } else {
2254                false
2255            }
2256        } else {
2257            false
2258        };
2259        let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, false);
2260        delivered || forwarding_confirmed || forwarded
2261    }
2262
2263    async fn process_blind_unicast(
2264        &mut self,
2265        buf: &mut [u8; FRAME],
2266        frame_len: usize,
2267        header: &PacketHeader,
2268        rx: &RxInfo,
2269        received_at_ms: u64,
2270        forwarding_confirmed: bool,
2271        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2272    ) -> (bool, Option<(LocalIdentityId, PeerId)>) {
2273        let mut original = [0u8; FRAME];
2274        original[..frame_len].copy_from_slice(&buf[..frame_len]);
2275        let mut replay_target = None;
2276        let handled = if let Some(channel_id) = header.channel {
2277            let channel_candidates: Vec<(ChannelKey, DerivedChannelKeys), CHANNELS> = self
2278                .channels
2279                .lookup_by_id(&channel_id)
2280                .map(|channel| (channel.channel_key.clone(), channel.derived.clone()))
2281                .collect();
2282            if channel_candidates.is_empty() {
2283                false
2284            } else {
2285                let mut handled = false;
2286                for (resolved_channel_key, channel_keys) in channel_candidates {
2287                    buf[..frame_len].copy_from_slice(&original[..frame_len]);
2288                    let Ok((dst, source_addr)) = self.crypto.decrypt_blind_addr(
2289                        &mut buf[..frame_len],
2290                        header,
2291                        &channel_keys,
2292                    ) else {
2293                        continue;
2294                    };
2295                    let Some(local_id) = self.find_local_identity_for_dst(Some(dst)) else {
2296                        continue;
2297                    };
2298                    for (peer_id, peer_key) in
2299                        self.resolve_blind_source_peer_candidates(&buf[..frame_len], source_addr)
2300                    {
2301                        let Ok(pairwise_keys) = self.ensure_peer_crypto(local_id, peer_id).await
2302                        else {
2303                            continue;
2304                        };
2305                        let blind_keys =
2306                            self.crypto.derive_blind_keys(&pairwise_keys, &channel_keys);
2307                        let body_range = match self.crypto.open_packet(
2308                            &mut buf[..frame_len],
2309                            header,
2310                            &blind_keys,
2311                        ) {
2312                            Ok(range) => range,
2313                            Err(_) => continue,
2314                        };
2315                        let payload = &buf[body_range.clone()];
2316                        if !Self::payload_is_allowed(header.packet_type(), payload) {
2317                            continue;
2318                        }
2319                        match self.unicast_replay_verdict(
2320                            local_id,
2321                            peer_id,
2322                            header,
2323                            &buf[..frame_len],
2324                        ) {
2325                            Some(ReplayVerdict::Accept) => {
2326                                let _ = self.accept_unicast_replay(
2327                                    local_id,
2328                                    peer_id,
2329                                    header,
2330                                    &buf[..frame_len],
2331                                );
2332                            }
2333                            Some(ReplayVerdict::OutOfWindow | ReplayVerdict::Stale) => {
2334                                if self.try_accept_counter_resync_response(
2335                                    local_id,
2336                                    peer_id,
2337                                    header,
2338                                    &buf[..frame_len],
2339                                    payload,
2340                                ) {
2341                                    replay_target = Some((local_id, peer_id));
2342                                } else {
2343                                    self.store_deferred_counter_resync_frame(
2344                                        local_id,
2345                                        peer_id,
2346                                        &original[..frame_len],
2347                                        rx,
2348                                        received_at_ms,
2349                                    );
2350                                    self.maybe_request_counter_resync(local_id, peer_id, peer_key)
2351                                        .await;
2352                                    continue;
2353                                }
2354                            }
2355                            Some(ReplayVerdict::Replay) | None => continue,
2356                        }
2357                        self.learn_route_for_peer(peer_id, &buf[..frame_len], header);
2358
2359                        if header.ack_requested()
2360                            && self.should_emit_destination_ack(&buf[..frame_len], header)
2361                        {
2362                            let ack_tag = self.compute_received_ack_tag(
2363                                &buf[..frame_len],
2364                                header,
2365                                body_range.clone(),
2366                                &blind_keys,
2367                            );
2368                            self.queue_mac_ack_for_peer(peer_id, peer_key.hint(), ack_tag)
2369                                .ok();
2370                        }
2371
2372                        if let Some(data) = Self::echo_request_data(payload) {
2373                            let response = Self::build_echo_command_payload(
2374                                MAC_COMMAND_ECHO_RESPONSE_ID,
2375                                data,
2376                            );
2377                            let _ = self
2378                                .send_unicast(
2379                                    local_id,
2380                                    &peer_key,
2381                                    response.as_slice(),
2382                                    &SendOptions::default(),
2383                                )
2384                                .await;
2385                        }
2386
2387                        on_event(
2388                            local_id,
2389                            crate::MacEventRef::Received(crate::ReceivedPacketRef::new(
2390                                &original[..frame_len],
2391                                &buf[body_range],
2392                                header.clone(),
2393                                ParsedOptions::extract(
2394                                    &original[..frame_len],
2395                                    header.options_range.clone(),
2396                                )
2397                                .unwrap_or_default(),
2398                                Some(peer_key),
2399                                Some(peer_key.hint()),
2400                                true,
2401                                Some(crate::ChannelInfoRef {
2402                                    id: channel_id,
2403                                    key: &resolved_channel_key,
2404                                }),
2405                                crate::send::RxMetadata::new(
2406                                    Some(rx.rssi),
2407                                    Some(rx.snr),
2408                                    rx.lqi,
2409                                    Some(received_at_ms),
2410                                ),
2411                            )),
2412                        );
2413                        handled = true;
2414                        break;
2415                    }
2416                    if handled {
2417                        break;
2418                    }
2419                }
2420                handled
2421            }
2422        } else {
2423            false
2424        };
2425        let forwarded = self.maybe_forward_received(&original[..frame_len], header, rx, handled);
2426        (
2427            handled || forwarding_confirmed || forwarded,
2428            handled.then_some(()).and(replay_target),
2429        )
2430    }
2431
2432    /// Non-blocking receive: polls the radio once and processes a frame if available.
2433    ///
2434    /// This is the legacy non-blocking API used by [`poll_cycle`](Self::poll_cycle).
2435    /// For new code, prefer [`next_event`](Self::next_event) which properly awaits
2436    /// the radio and timer deadlines.
2437    pub async fn receive_one(
2438        &mut self,
2439        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2440    ) -> Result<bool, MacError<<P::Radio as Radio>::Error>> {
2441        let mut buf = [0u8; FRAME];
2442        let Some(rx) = poll_fn(|cx| match self.radio.poll_receive(cx, &mut buf) {
2443            Poll::Ready(Ok(rx)) => Poll::Ready(Ok(Some(rx))),
2444            Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
2445            Poll::Pending => Poll::Ready(Ok(None)),
2446        })
2447        .await
2448        .map_err(MacError::Radio)?
2449        else {
2450            return Ok(false);
2451        };
2452
2453        let frame_len = rx.len.min(buf.len());
2454        Ok(self
2455            .process_received_frame(&mut buf, frame_len, &rx, &mut on_event)
2456            .await)
2457    }
2458
2459    /// Mark a pending receipt as acknowledged and emit an event through `on_event`.
2460    pub fn complete_ack(
2461        &mut self,
2462        peer: &PublicKey,
2463        ack_tag: &[u8; 8],
2464    ) -> Option<(LocalIdentityId, SendReceipt)> {
2465        for (index, slot) in self.identities.iter_mut().enumerate() {
2466            let Some(slot) = slot.as_mut() else {
2467                continue;
2468            };
2469
2470            let receipt = slot.pending_acks.iter().find_map(|(receipt, pending)| {
2471                (pending.peer == *peer && pending.ack_tag == *ack_tag).then_some(*receipt)
2472            });
2473
2474            if let Some(receipt) = receipt {
2475                slot.pending_acks.remove(&receipt);
2476                if self
2477                    .post_tx_listen
2478                    .as_ref()
2479                    .map(|listen| {
2480                        listen.identity_id == LocalIdentityId(index as u8)
2481                            && listen.receipt == receipt
2482                    })
2483                    .unwrap_or(false)
2484                {
2485                    self.post_tx_listen = None;
2486                }
2487                return Some((LocalIdentityId(index as u8), receipt));
2488            }
2489        }
2490
2491        None
2492    }
2493
2494    /// Expire or retry pending ACK state based on `now_ms`.
2495    pub fn service_pending_ack_timeouts(
2496        &mut self,
2497        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
2498    ) -> Result<(), CapacityError> {
2499        self.expire_post_tx_listen_if_needed();
2500
2501        #[derive(Clone)]
2502        enum Action<const FRAME: usize> {
2503            Retry {
2504                receipt: SendReceipt,
2505                resend: ResendRecord<FRAME>,
2506                not_before_ms: u64,
2507            },
2508            RouteRetry {
2509                receipt: SendReceipt,
2510                peer: PublicKey,
2511                resend: ResendRecord<FRAME>,
2512                not_before_ms: u64,
2513            },
2514            Timeout {
2515                receipt: SendReceipt,
2516                peer: PublicKey,
2517            },
2518        }
2519
2520        let now_ms = self.clock.now_ms();
2521        let t_frame_ms = self.radio.t_frame_ms();
2522
2523        for index in 0..self.identities.len() {
2524            let identity_id = LocalIdentityId(index as u8);
2525            let actions = {
2526                let Some(slot) = self.identities[index].as_mut() else {
2527                    continue;
2528                };
2529
2530                let mut actions: Vec<Action<FRAME>, ACKS> = Vec::new();
2531                for (receipt, pending) in slot.pending_acks.iter_mut() {
2532                    if !matches!(pending.state, crate::AckState::Queued { .. })
2533                        && now_ms >= pending.ack_deadline_ms
2534                    {
2535                        if Self::can_attempt_route_retry(pending) {
2536                            let backoff_cap_ms =
2537                                Self::forward_retry_backoff_cap_ms_for_t_frame(t_frame_ms, 1);
2538                            let backoff_ms = if backoff_cap_ms == 0 {
2539                                0
2540                            } else {
2541                                u64::from(self.rng.random_range(..backoff_cap_ms.saturating_add(1)))
2542                            };
2543                            actions
2544                                .push(Action::RouteRetry {
2545                                    receipt: *receipt,
2546                                    peer: pending.peer,
2547                                    resend: pending.resend.clone(),
2548                                    not_before_ms: now_ms.saturating_add(backoff_ms),
2549                                })
2550                                .map_err(|_| CapacityError)?;
2551                        } else {
2552                            actions
2553                                .push(Action::Timeout {
2554                                    receipt: *receipt,
2555                                    peer: pending.peer,
2556                                })
2557                                .map_err(|_| CapacityError)?;
2558                        }
2559                        continue;
2560                    }
2561
2562                    if let crate::AckState::AwaitingForward {
2563                        confirm_deadline_ms,
2564                    } = pending.state
2565                    {
2566                        if now_ms >= confirm_deadline_ms && pending.retries < MAX_FORWARD_RETRIES {
2567                            pending.retries = pending.retries.saturating_add(1);
2568                            let backoff_cap_ms = Self::forward_retry_backoff_cap_ms_for_t_frame(
2569                                t_frame_ms,
2570                                pending.retries,
2571                            );
2572                            let backoff_ms = if backoff_cap_ms == 0 {
2573                                0
2574                            } else {
2575                                u64::from(self.rng.random_range(..backoff_cap_ms.saturating_add(1)))
2576                            };
2577                            let not_before_ms = now_ms.saturating_add(backoff_ms);
2578                            pending.state = crate::AckState::RetryQueued;
2579                            actions
2580                                .push(Action::Retry {
2581                                    receipt: *receipt,
2582                                    resend: pending.resend.clone(),
2583                                    not_before_ms,
2584                                })
2585                                .map_err(|_| CapacityError)?;
2586                        }
2587                    }
2588                }
2589                actions
2590            };
2591
2592            for action in actions {
2593                match action {
2594                    Action::Retry {
2595                        receipt,
2596                        resend,
2597                        not_before_ms,
2598                    } => {
2599                        self.tx_queue.enqueue_with_state(
2600                            TxPriority::Retry,
2601                            resend.frame.as_slice(),
2602                            Some(receipt),
2603                            Some(identity_id),
2604                            not_before_ms,
2605                            0,
2606                            0,
2607                        )?;
2608                    }
2609                    Action::RouteRetry {
2610                        receipt,
2611                        peer,
2612                        resend,
2613                        not_before_ms,
2614                    } => {
2615                        if let Some(rewritten) = self.synthesize_route_retry_resend(&peer, &resend)
2616                        {
2617                            if let Some(pending) = self
2618                                .identity_mut(identity_id)
2619                                .and_then(|slot| slot.pending_ack_mut(&receipt))
2620                            {
2621                                pending.resend = rewritten.clone();
2622                                pending.retries = 0;
2623                                pending.sent_ms = 0;
2624                                pending.ack_deadline_ms = 0;
2625                                pending.state = crate::AckState::RetryQueued;
2626                            }
2627                            self.tx_queue.enqueue_with_state(
2628                                TxPriority::Retry,
2629                                rewritten.frame.as_slice(),
2630                                Some(receipt),
2631                                Some(identity_id),
2632                                not_before_ms,
2633                                0,
2634                                0,
2635                            )?;
2636                        } else {
2637                            if let Some(slot) = self.identity_mut(identity_id) {
2638                                slot.pending_acks.remove(&receipt);
2639                            }
2640                            on_event(
2641                                identity_id,
2642                                crate::MacEventRef::AckTimeout { peer, receipt },
2643                            );
2644                        }
2645                    }
2646                    Action::Timeout { receipt, peer } => {
2647                        if let Some(slot) = self.identity_mut(identity_id) {
2648                            slot.pending_acks.remove(&receipt);
2649                        }
2650                        on_event(
2651                            identity_id,
2652                            crate::MacEventRef::AckTimeout { peer, receipt },
2653                        );
2654                    }
2655                }
2656            }
2657        }
2658
2659        Ok(())
2660    }
2661
2662    /// Cancel a pending ACK-requested send, stopping retransmissions.
2663    ///
2664    /// Removes the pending ACK entry for the given identity slot and receipt,
2665    /// and removes any matching entry from the transmit queue. Returns `true`
2666    /// if a pending ACK was found and removed.
2667    pub fn cancel_pending_ack(
2668        &mut self,
2669        identity_id: LocalIdentityId,
2670        receipt: SendReceipt,
2671    ) -> bool {
2672        let removed = self
2673            .identity_mut(identity_id)
2674            .and_then(|slot| slot.remove_pending_ack(&receipt))
2675            .is_some();
2676
2677        // Also remove any queued retransmission for this receipt.
2678        self.tx_queue.remove_first_matching(|entry| {
2679            entry.receipt == Some(receipt) && entry.identity_id == Some(identity_id)
2680        });
2681
2682        // Clear the post-tx listen if it was tracking this receipt.
2683        if let Some(listen) = &self.post_tx_listen {
2684            if listen.identity_id == identity_id && listen.receipt == receipt {
2685                self.post_tx_listen = None;
2686            }
2687        }
2688
2689        removed
2690    }
2691
2692    fn identity_and_advance(
2693        &mut self,
2694        from: LocalIdentityId,
2695    ) -> Result<(PublicKey, u32), SendError> {
2696        let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
2697        if slot.counter_window_exhausted() {
2698            return Err(SendError::CounterPersistenceLag);
2699        }
2700        let source_key = *slot.identity().public_key();
2701        let frame_counter = slot.advance_frame_counter();
2702        slot.schedule_counter_persist_if_needed();
2703        Ok((source_key, frame_counter))
2704    }
2705
2706    fn take_salt(&mut self, options: &SendOptions) -> Option<u16> {
2707        options.salt.then(|| self.rng.next_u32() as u16)
2708    }
2709
2710    fn enforce_send_policy(
2711        &self,
2712        channel_id: Option<ChannelId>,
2713        options: &SendOptions,
2714        blind_unicast: bool,
2715    ) -> Result<(), SendError> {
2716        let _authority = self.classify_send_authority(options, blind_unicast)?;
2717
2718        let Some(channel_id) = channel_id else {
2719            return Ok(());
2720        };
2721        let Some(policy) = self
2722            .operating_policy
2723            .channel_policies
2724            .iter()
2725            .find(|policy| policy.channel_id == channel_id)
2726        else {
2727            return Ok(());
2728        };
2729
2730        if policy.require_unencrypted && options.encrypted {
2731            return Err(SendError::PolicyViolation);
2732        }
2733        if policy.require_full_source && !options.full_source {
2734            return Err(SendError::PolicyViolation);
2735        }
2736        if let Some(max_flood_hops) = policy.max_flood_hops {
2737            if options
2738                .flood_hops
2739                .map(|hops| hops > max_flood_hops)
2740                .unwrap_or(false)
2741            {
2742                return Err(SendError::PolicyViolation);
2743            }
2744        }
2745
2746        Ok(())
2747    }
2748
2749    fn classify_send_authority(
2750        &self,
2751        options: &SendOptions,
2752        _blind_unicast: bool,
2753    ) -> Result<TransmitAuthority, SendError> {
2754        match self.operating_policy.amateur_radio_mode {
2755            AmateurRadioMode::Unlicensed => Ok(TransmitAuthority::Unlicensed),
2756            AmateurRadioMode::LicensedOnly => {
2757                if options.encrypted || self.operating_policy.operator_callsign.is_none() {
2758                    return Err(SendError::PolicyViolation);
2759                }
2760                Ok(TransmitAuthority::Amateur)
2761            }
2762            AmateurRadioMode::Hybrid => {
2763                if options.encrypted {
2764                    // Hybrid encrypted traffic must be treated as unlicensed
2765                    // traffic for downstream transmit-power and duty-cycle policy.
2766                    Ok(TransmitAuthority::Unlicensed)
2767                } else if self.operating_policy.operator_callsign.is_some() {
2768                    Ok(TransmitAuthority::Amateur)
2769                } else {
2770                    Ok(TransmitAuthority::Unlicensed)
2771                }
2772            }
2773        }
2774    }
2775
2776    fn enqueue_packet(
2777        &mut self,
2778        packet: UnsealedPacket<'_>,
2779        receipt: Option<SendReceipt>,
2780        identity_id: Option<LocalIdentityId>,
2781    ) -> Result<(), SendError> {
2782        if packet.total_len() > self.radio.max_frame_size() {
2783            return Err(SendError::Build(BuildError::BufferTooSmall));
2784        }
2785        self.tx_queue
2786            .enqueue(
2787                TxPriority::Application,
2788                packet.as_bytes(),
2789                receipt,
2790                identity_id,
2791            )
2792            .map_err(|_| SendError::QueueFull)?;
2793        Ok(())
2794    }
2795
2796    fn refresh_pending_resend(
2797        &mut self,
2798        from: LocalIdentityId,
2799        receipt: SendReceipt,
2800        frame: &[u8],
2801        source_route: Option<&[RouterHint]>,
2802    ) -> Result<(), SendError> {
2803        let resend =
2804            ResendRecord::try_new(frame, source_route).map_err(|_| SendError::QueueFull)?;
2805        let pending = self
2806            .identity_mut(from)
2807            .ok_or(SendError::IdentityMissing)?
2808            .pending_ack_mut(&receipt)
2809            .ok_or(SendError::IdentityMissing)?;
2810        pending.resend = resend;
2811        Ok(())
2812    }
2813
2814    fn prepare_pending_ack(
2815        &mut self,
2816        from: LocalIdentityId,
2817        peer: PublicKey,
2818        packet: &UnsealedPacket<'_>,
2819        keys: &PairwiseKeys,
2820        options: &SendOptions,
2821    ) -> Result<SendReceipt, SendError> {
2822        let header = PacketHeader::parse(packet.as_bytes())?;
2823        let mut cmac: CmacState<_> = self.crypto.cmac_state(&keys.k_mic);
2824        feed_aad(&header, packet.as_bytes(), |chunk| cmac.update(chunk));
2825        cmac.update(packet.body());
2826        let full_mac = cmac.finalize();
2827        let ack_tag = self.crypto.compute_ack_tag(&full_mac, &keys.k_enc);
2828        let is_forwarded = options
2829            .source_route
2830            .as_ref()
2831            .map(|route| !route.is_empty())
2832            .unwrap_or(false)
2833            || options.flood_hops.unwrap_or(0) > 0;
2834        let resend = ResendRecord::try_new(
2835            packet.as_bytes(),
2836            options.source_route.as_ref().map(|route| route.as_slice()),
2837        )
2838        .map_err(|_| SendError::QueueFull)?;
2839
2840        let slot = self.identity_mut(from).ok_or(SendError::IdentityMissing)?;
2841        let receipt = slot.next_receipt();
2842        let pending = if is_forwarded {
2843            PendingAck::forwarded(ack_tag, peer, resend)
2844        } else {
2845            PendingAck::direct(ack_tag, peer, resend)
2846        };
2847        slot.try_insert_pending_ack(receipt, pending)
2848            .map_err(|_| SendError::PendingAckFull)?;
2849        Ok(receipt)
2850    }
2851
2852    async fn derive_pairwise_keys_for_peer(
2853        &self,
2854        local_id: LocalIdentityId,
2855        peer_key: &PublicKey,
2856    ) -> Result<PairwiseKeys, SendError> {
2857        let shared_secret = {
2858            let slot = self.identity(local_id).ok_or(SendError::IdentityMissing)?;
2859            match slot.identity() {
2860                LocalIdentity::LongTerm(identity) => identity
2861                    .agree(peer_key)
2862                    .await
2863                    .map_err(|_| SendError::IdentityAgreementFailed)?,
2864                #[cfg(feature = "software-crypto")]
2865                LocalIdentity::Ephemeral(identity) => identity
2866                    .agree(peer_key)
2867                    .await
2868                    .map_err(|_| SendError::IdentityAgreementFailed)?,
2869            }
2870        };
2871
2872        Ok(self.crypto.derive_pairwise_keys(&shared_secret))
2873    }
2874
2875    fn effective_source_route(
2876        &self,
2877        peer_id: PeerId,
2878        options: &SendOptions,
2879    ) -> Option<Vec<RouterHint, MAX_SOURCE_ROUTE_HOPS>> {
2880        if let Some(route) = options.source_route.as_ref() {
2881            return Some(route.clone());
2882        }
2883
2884        let Some(peer) = self.peer_registry.get(peer_id) else {
2885            return None;
2886        };
2887        match peer.route.as_ref() {
2888            Some(CachedRoute::Source(route)) => Some(route.clone()),
2889            _ => None,
2890        }
2891    }
2892
2893    fn cache_peer_crypto(
2894        &mut self,
2895        local_id: LocalIdentityId,
2896        peer_id: PeerId,
2897        pairwise_keys: PairwiseKeys,
2898    ) -> Result<(), SendError> {
2899        let slot = self
2900            .identity_mut(local_id)
2901            .ok_or(SendError::IdentityMissing)?;
2902        if slot.peer_crypto().get(&peer_id).is_some() {
2903            return Ok(());
2904        }
2905        slot.peer_crypto_mut()
2906            .insert(
2907                peer_id,
2908                crate::peers::PeerCryptoState {
2909                    pairwise_keys,
2910                    replay_window: ReplayWindow::new(),
2911                },
2912            )
2913            .map_err(|_| SendError::QueueFull)?;
2914        Ok(())
2915    }
2916
2917    async fn ensure_peer_crypto(
2918        &mut self,
2919        local_id: LocalIdentityId,
2920        peer_id: PeerId,
2921    ) -> Result<PairwiseKeys, SendError> {
2922        if let Some(keys) = self
2923            .identity(local_id)
2924            .and_then(|slot| slot.peer_crypto().get(&peer_id))
2925            .map(|state| state.pairwise_keys.clone())
2926        {
2927            return Ok(keys);
2928        }
2929
2930        let peer_key = self
2931            .peer_registry
2932            .get(peer_id)
2933            .ok_or(SendError::PeerMissing)?
2934            .public_key;
2935        let pairwise_keys = self
2936            .derive_pairwise_keys_for_peer(local_id, &peer_key)
2937            .await?;
2938        self.cache_peer_crypto(local_id, peer_id, pairwise_keys.clone())?;
2939        Ok(pairwise_keys)
2940    }
2941
2942    fn insert_identity(
2943        &mut self,
2944        identity: LocalIdentity<P::Identity>,
2945        pfs_parent: Option<LocalIdentityId>,
2946    ) -> Result<LocalIdentityId, CapacityError> {
2947        let initial_frame_counter = self.rng.next_u32();
2948
2949        if let Some((index, slot)) = self
2950            .identities
2951            .iter_mut()
2952            .enumerate()
2953            .find(|(_, slot)| slot.is_none())
2954        {
2955            *slot = Some(IdentitySlot::new(
2956                identity,
2957                initial_frame_counter,
2958                pfs_parent,
2959            ));
2960            return Ok(LocalIdentityId(index as u8));
2961        }
2962
2963        let next_id = self.identities.len();
2964        self.identities
2965            .push(Some(IdentitySlot::new(
2966                identity,
2967                initial_frame_counter,
2968                pfs_parent,
2969            )))
2970            .map_err(|_| CapacityError)?;
2971        Ok(LocalIdentityId(next_id as u8))
2972    }
2973
2974    fn compute_received_ack_tag(
2975        &self,
2976        buf: &[u8],
2977        header: &PacketHeader,
2978        body_range: core::ops::Range<usize>,
2979        keys: &PairwiseKeys,
2980    ) -> [u8; 8] {
2981        let mut cmac: CmacState<_> = self.crypto.cmac_state(&keys.k_mic);
2982        feed_aad(header, buf, |chunk| cmac.update(chunk));
2983        cmac.update(&buf[body_range]);
2984        let full_mac = cmac.finalize();
2985        self.crypto.compute_ack_tag(&full_mac, &keys.k_enc)
2986    }
2987
2988    fn requeue_tx(&mut self, queued: &crate::QueuedTx<FRAME>) -> Result<u32, CapacityError> {
2989        self.tx_queue.enqueue_with_state(
2990            queued.priority,
2991            queued.frame.as_slice(),
2992            queued.receipt,
2993            queued.identity_id,
2994            queued.not_before_ms,
2995            queued.cad_attempts,
2996            queued.forward_deferrals,
2997        )
2998    }
2999
3000    fn accept_unicast_replay(
3001        &mut self,
3002        local_id: LocalIdentityId,
3003        peer_id: PeerId,
3004        header: &PacketHeader,
3005        frame: &[u8],
3006    ) -> bool {
3007        let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3008            return false;
3009        };
3010        let now_ms = self.clock.now_ms();
3011        let Some(window) = self
3012            .identity_mut(local_id)
3013            .and_then(|slot| slot.peer_crypto_mut().get_mut(&peer_id))
3014            .map(|state| &mut state.replay_window)
3015        else {
3016            return false;
3017        };
3018
3019        if window.check(counter, mic, now_ms) != ReplayVerdict::Accept {
3020            return false;
3021        }
3022
3023        window.accept(counter, mic, now_ms);
3024        true
3025    }
3026
3027    fn unicast_replay_verdict(
3028        &mut self,
3029        local_id: LocalIdentityId,
3030        peer_id: PeerId,
3031        header: &PacketHeader,
3032        frame: &[u8],
3033    ) -> Option<ReplayVerdict> {
3034        let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3035            return None;
3036        };
3037        let now_ms = self.clock.now_ms();
3038        self.identity_mut(local_id)
3039            .and_then(|slot| slot.peer_crypto_mut().get_mut(&peer_id))
3040            .map(|state| state.replay_window.check(counter, mic, now_ms))
3041    }
3042
3043    fn try_accept_counter_resync_response(
3044        &mut self,
3045        local_id: LocalIdentityId,
3046        peer_id: PeerId,
3047        header: &PacketHeader,
3048        frame: &[u8],
3049        payload: &[u8],
3050    ) -> bool {
3051        let Some(nonce) = Self::echo_response_nonce(payload) else {
3052            return false;
3053        };
3054        let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3055            return false;
3056        };
3057        let now_ms = self.clock.now_ms();
3058        let Some(slot) = self.identity_mut(local_id) else {
3059            return false;
3060        };
3061        let Some(pending) = slot.pending_counter_resync().get(&peer_id).copied() else {
3062            return false;
3063        };
3064        if pending.nonce != nonce {
3065            return false;
3066        }
3067        let Some(state) = slot.peer_crypto_mut().get_mut(&peer_id) else {
3068            return false;
3069        };
3070        state.replay_window.reset(counter, now_ms);
3071        state.replay_window.accept(counter, mic, now_ms);
3072        let _ = slot.pending_counter_resync_mut().remove(&peer_id);
3073        true
3074    }
3075
3076    async fn maybe_request_counter_resync(
3077        &mut self,
3078        local_id: LocalIdentityId,
3079        peer_id: PeerId,
3080        peer_key: PublicKey,
3081    ) {
3082        let now_ms = self.clock.now_ms();
3083        let should_send = {
3084            let Some(slot) = self.identity(local_id) else {
3085                return;
3086            };
3087            match slot.pending_counter_resync().get(&peer_id).copied() {
3088                Some(pending) => {
3089                    now_ms.saturating_sub(pending.requested_ms) >= COUNTER_RESYNC_REQUEST_RETRY_MS
3090                }
3091                None => true,
3092            }
3093        };
3094        if !should_send {
3095            return;
3096        }
3097
3098        let nonce = self.rng.next_u32();
3099        let payload =
3100            Self::build_echo_command_payload(MAC_COMMAND_ECHO_REQUEST_ID, &nonce.to_be_bytes());
3101        let options = SendOptions::default();
3102        if self
3103            .send_unicast(local_id, &peer_key, payload.as_slice(), &options)
3104            .await
3105            .is_ok()
3106        {
3107            if let Some(slot) = self.identity_mut(local_id) {
3108                let _ = slot.pending_counter_resync_mut().insert(
3109                    peer_id,
3110                    PendingCounterResync {
3111                        nonce,
3112                        requested_ms: now_ms,
3113                    },
3114                );
3115            }
3116        }
3117    }
3118
3119    fn store_deferred_counter_resync_frame(
3120        &mut self,
3121        local_id: LocalIdentityId,
3122        peer_id: PeerId,
3123        frame: &[u8],
3124        rx: &RxInfo,
3125        received_at_ms: u64,
3126    ) {
3127        let mut stored = Vec::new();
3128        stored
3129            .extend_from_slice(frame)
3130            .expect("received frame length must fit configured frame capacity");
3131        self.deferred_counter_resync_frame = Some(DeferredCounterResyncFrame {
3132            local_id,
3133            peer_id,
3134            frame: stored,
3135            rssi: rx.rssi,
3136            snr: rx.snr,
3137            lqi: rx.lqi,
3138            received_at_ms,
3139        });
3140    }
3141
3142    fn take_deferred_counter_resync_frame(
3143        &mut self,
3144        local_id: LocalIdentityId,
3145        peer_id: PeerId,
3146    ) -> Option<DeferredCounterResyncFrame<FRAME>> {
3147        match self.deferred_counter_resync_frame.as_ref() {
3148            Some(deferred) if deferred.local_id == local_id && deferred.peer_id == peer_id => {
3149                self.deferred_counter_resync_frame.take()
3150            }
3151            _ => None,
3152        }
3153    }
3154
3155    fn accept_multicast_replay(
3156        &mut self,
3157        channel_id: ChannelId,
3158        peer_id: PeerId,
3159        header: &PacketHeader,
3160        frame: &[u8],
3161    ) -> bool {
3162        let Some((counter, mic)) = Self::replay_metadata(header, frame) else {
3163            return false;
3164        };
3165        let now_ms = self.clock.now_ms();
3166        let Some(channel) = self.channels.get_mut_by_id(&channel_id) else {
3167            return false;
3168        };
3169
3170        if let Some(window) = channel.replay.get_mut(&peer_id) {
3171            if window.check(counter, mic, now_ms) != ReplayVerdict::Accept {
3172                return false;
3173            }
3174            window.accept(counter, mic, now_ms);
3175            return true;
3176        }
3177
3178        let mut window = ReplayWindow::new();
3179        window.accept(counter, mic, now_ms);
3180        channel.replay.insert(peer_id, window).is_ok()
3181    }
3182
3183    fn clear_peer_slot_state(&mut self, peer_id: PeerId) {
3184        for slot in self.identities.iter_mut().filter_map(|slot| slot.as_mut()) {
3185            let _ = slot.peer_crypto_mut().remove(&peer_id);
3186            let _ = slot.pending_counter_resync_mut().remove(&peer_id);
3187        }
3188        for channel in self.channels.iter_mut() {
3189            let _ = channel.replay.remove(&peer_id);
3190        }
3191    }
3192
3193    fn try_auto_register_peer(&mut self, key: PublicKey) -> Result<PeerId, CapacityError> {
3194        let now_ms = self.clock.now_ms();
3195        let outcome = self.peer_registry.try_insert_or_update_auto(key, now_ms)?;
3196        if outcome.evicted_key.is_some() {
3197            self.clear_peer_slot_state(outcome.peer_id);
3198        }
3199        Ok(outcome.peer_id)
3200    }
3201
3202    fn replay_metadata<'a>(header: &PacketHeader, frame: &'a [u8]) -> Option<(u32, &'a [u8])> {
3203        let counter = header.sec_info?.frame_counter;
3204        let mic = frame.get(header.mic_range.clone())?;
3205        Some((counter, mic))
3206    }
3207
3208    fn build_echo_command_payload(command_id: u8, data: &[u8]) -> Vec<u8, FRAME> {
3209        let mut payload = Vec::new();
3210        let _ = payload.push(PayloadType::MacCommand as u8);
3211        let _ = payload.push(command_id);
3212        let _ = payload.extend_from_slice(data);
3213        payload
3214    }
3215
3216    fn echo_request_data(payload: &[u8]) -> Option<&[u8]> {
3217        let (&payload_type, rest) = payload.split_first()?;
3218        let (&command_id, data) = rest.split_first()?;
3219        if PayloadType::from_byte(payload_type)? != PayloadType::MacCommand
3220            || command_id != MAC_COMMAND_ECHO_REQUEST_ID
3221        {
3222            return None;
3223        }
3224        Some(data)
3225    }
3226
3227    fn echo_response_nonce(payload: &[u8]) -> Option<u32> {
3228        let (&payload_type, rest) = payload.split_first()?;
3229        let (&command_id, data) = rest.split_first()?;
3230        if PayloadType::from_byte(payload_type)? != PayloadType::MacCommand
3231            || command_id != MAC_COMMAND_ECHO_RESPONSE_ID
3232            || data.len() != COUNTER_RESYNC_NONCE_LEN
3233        {
3234            return None;
3235        }
3236        Some(u32::from_be_bytes(data.try_into().ok()?))
3237    }
3238
3239    fn full_key_at(frame: &[u8], offset: usize) -> Option<PublicKey> {
3240        let mut key = [0u8; 32];
3241        key.copy_from_slice(frame.get(offset..offset + 32)?);
3242        Some(PublicKey(key))
3243    }
3244
3245    fn find_local_identity_for_dst(
3246        &self,
3247        dst: Option<umsh_core::NodeHint>,
3248    ) -> Option<LocalIdentityId> {
3249        let dst = dst?;
3250        self.identities
3251            .iter()
3252            .enumerate()
3253            .find(|(_, slot)| {
3254                slot.as_ref()
3255                    .map(|slot| slot.identity().public_key().hint() == dst)
3256                    .unwrap_or(false)
3257            })
3258            .map(|(index, _)| LocalIdentityId(index as u8))
3259    }
3260
3261    fn resolve_source_peer_candidates(
3262        &mut self,
3263        frame: &[u8],
3264        header: &PacketHeader,
3265    ) -> Vec<(PeerId, PublicKey), PEERS> {
3266        match header.source {
3267            SourceAddrRef::FullKeyAt { offset } => {
3268                let Some(peer_key) = Self::full_key_at(frame, offset) else {
3269                    return Vec::new();
3270                };
3271
3272                if let Some((peer_id, _)) = self.peer_registry.lookup_by_key(&peer_key) {
3273                    let mut out = Vec::new();
3274                    let _ = out.push((peer_id, peer_key));
3275                    return out;
3276                }
3277
3278                if self.auto_register_full_key_peers {
3279                    if let Ok(peer_id) = self.try_auto_register_peer(peer_key) {
3280                        let mut out = Vec::new();
3281                        let _ = out.push((peer_id, peer_key));
3282                        return out;
3283                    }
3284                }
3285
3286                Vec::new()
3287            }
3288            SourceAddrRef::Hint(hint) => self
3289                .peer_registry
3290                .lookup_by_hint(&hint)
3291                .map(|(peer_id, info)| (peer_id, info.public_key))
3292                .collect(),
3293            SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => Vec::new(),
3294        }
3295    }
3296
3297    fn resolve_multicast_source(
3298        &mut self,
3299        frame: &[u8],
3300        header: &PacketHeader,
3301    ) -> Option<ResolvedMulticastSource> {
3302        match header.source {
3303            SourceAddrRef::FullKeyAt { offset } => {
3304                let mut key = [0u8; 32];
3305                key.copy_from_slice(frame.get(offset..offset + 32)?);
3306                let public_key = PublicKey(key);
3307                let peer_id = self
3308                    .peer_registry
3309                    .lookup_by_key(&public_key)
3310                    .map(|(peer_id, _)| peer_id);
3311                Some(ResolvedMulticastSource {
3312                    peer_id,
3313                    public_key: Some(public_key),
3314                    hint: Some(public_key.hint()),
3315                })
3316            }
3317            SourceAddrRef::Hint(hint) => {
3318                let resolved = self.resolve_unique_hint(hint);
3319                Some(ResolvedMulticastSource {
3320                    peer_id: resolved.map(|(peer_id, _)| peer_id),
3321                    public_key: resolved.map(|(_, key)| key),
3322                    hint: Some(hint),
3323                })
3324            }
3325            SourceAddrRef::Encrypted { offset, len } => match len {
3326                32 => {
3327                    let mut key = [0u8; 32];
3328                    key.copy_from_slice(frame.get(offset..offset + 32)?);
3329                    let public_key = PublicKey(key);
3330                    let peer_id = self
3331                        .peer_registry
3332                        .lookup_by_key(&public_key)
3333                        .map(|(peer_id, _)| peer_id);
3334                    Some(ResolvedMulticastSource {
3335                        peer_id,
3336                        public_key: Some(public_key),
3337                        hint: Some(public_key.hint()),
3338                    })
3339                }
3340                3 => {
3341                    let hint = umsh_core::NodeHint([
3342                        *frame.get(offset)?,
3343                        *frame.get(offset + 1)?,
3344                        *frame.get(offset + 2)?,
3345                    ]);
3346                    let resolved = self.resolve_unique_hint(hint);
3347                    Some(ResolvedMulticastSource {
3348                        peer_id: resolved.map(|(peer_id, _)| peer_id),
3349                        public_key: resolved.map(|(_, key)| key),
3350                        hint: Some(hint),
3351                    })
3352                }
3353                _ => None,
3354            },
3355            SourceAddrRef::None => None,
3356        }
3357    }
3358
3359    fn accept_unknown_multicast_replay(&mut self, header: &PacketHeader, frame: &[u8]) -> bool {
3360        let Some(cache_key) = Self::forward_dup_key(header, frame) else {
3361            return false;
3362        };
3363        if self.multicast_unknown_dup_cache.contains(&cache_key) {
3364            return false;
3365        }
3366        self.multicast_unknown_dup_cache
3367            .insert(cache_key, self.clock.now_ms());
3368        true
3369    }
3370
3371    fn resolve_blind_source_peer_candidates(
3372        &mut self,
3373        frame: &[u8],
3374        source: SourceAddrRef,
3375    ) -> Vec<(PeerId, PublicKey), PEERS> {
3376        match source {
3377            SourceAddrRef::FullKeyAt { offset } => {
3378                let Some(peer_key) = Self::full_key_at(frame, offset) else {
3379                    return Vec::new();
3380                };
3381
3382                if let Some((peer_id, _)) = self.peer_registry.lookup_by_key(&peer_key) {
3383                    let mut out = Vec::new();
3384                    let _ = out.push((peer_id, peer_key));
3385                    return out;
3386                }
3387
3388                if self.auto_register_full_key_peers {
3389                    if let Ok(peer_id) = self.try_auto_register_peer(peer_key) {
3390                        let mut out = Vec::new();
3391                        let _ = out.push((peer_id, peer_key));
3392                        return out;
3393                    }
3394                }
3395
3396                Vec::new()
3397            }
3398            SourceAddrRef::Hint(hint) => self
3399                .peer_registry
3400                .lookup_by_hint(&hint)
3401                .map(|(peer_id, info)| (peer_id, info.public_key))
3402                .collect(),
3403            SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => Vec::new(),
3404        }
3405    }
3406
3407    fn resolve_unique_hint(&self, hint: umsh_core::NodeHint) -> Option<(PeerId, PublicKey)> {
3408        let mut matches = self.peer_registry.lookup_by_hint(&hint);
3409        let (peer_id, info) = matches.next()?;
3410        if matches.next().is_some() {
3411            return None;
3412        }
3413        Some((peer_id, info.public_key))
3414    }
3415
3416    fn learn_route_for_peer(&mut self, peer_id: PeerId, frame: &[u8], header: &PacketHeader) {
3417        let now_ms = self.clock.now_ms();
3418        self.peer_registry.touch(peer_id, now_ms);
3419
3420        let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3421            return;
3422        };
3423
3424        if let Some(trace_range) = options.trace_route {
3425            if let Some(route) = self.source_route_from_trace(frame.get(trace_range).unwrap_or(&[]))
3426            {
3427                self.peer_registry
3428                    .update_route(peer_id, crate::CachedRoute::Source(route));
3429                return;
3430            }
3431        }
3432
3433        if let Some(flood_hops) = header.flood_hops {
3434            let regions = Self::region_codes_from_options(frame, header.options_range.clone());
3435            self.peer_registry.update_route(
3436                peer_id,
3437                crate::CachedRoute::Flood {
3438                    hops: flood_hops.accumulated(),
3439                    regions,
3440                },
3441            );
3442        }
3443    }
3444
3445    fn region_codes_from_options(
3446        frame: &[u8],
3447        options_range: core::ops::Range<usize>,
3448    ) -> Vec<[u8; 2], 8> {
3449        let mut regions = Vec::new();
3450        if options_range.is_empty() {
3451            return regions;
3452        }
3453        for entry in umsh_core::iter_options(frame, options_range) {
3454            let Ok((number, value)) = entry else {
3455                continue;
3456            };
3457            if OptionNumber::from(number) != OptionNumber::RegionCode || value.len() != 2 {
3458                continue;
3459            }
3460            if regions.push([value[0], value[1]]).is_err() {
3461                break;
3462            }
3463        }
3464        regions
3465    }
3466
3467    fn resolve_broadcast_source(
3468        frame: &[u8],
3469        header: &PacketHeader,
3470    ) -> Option<(umsh_core::NodeHint, Option<PublicKey>)> {
3471        match header.source {
3472            SourceAddrRef::Hint(hint) => Some((hint, None)),
3473            SourceAddrRef::FullKeyAt { offset } => {
3474                let mut key = [0u8; 32];
3475                key.copy_from_slice(frame.get(offset..offset + 32)?);
3476                let public_key = PublicKey(key);
3477                Some((public_key.hint(), Some(public_key)))
3478            }
3479            SourceAddrRef::Encrypted { .. } | SourceAddrRef::None => None,
3480        }
3481    }
3482
3483    fn payload_is_allowed(packet_type: PacketType, payload: &[u8]) -> bool {
3484        if payload.is_empty() {
3485            return true;
3486        }
3487
3488        PayloadType::from_byte(payload[0])
3489            .unwrap_or(PayloadType::Empty)
3490            .allowed_for(packet_type)
3491    }
3492
3493    fn source_route_from_trace(
3494        &self,
3495        trace_bytes: &[u8],
3496    ) -> Option<heapless::Vec<RouterHint, { crate::MAX_SOURCE_ROUTE_HOPS }>> {
3497        if trace_bytes.len() % 2 != 0 {
3498            return None;
3499        }
3500
3501        let mut route = heapless::Vec::new();
3502        for chunk in trace_bytes.chunks_exact(2) {
3503            route.push(RouterHint([chunk[0], chunk[1]])).ok()?;
3504        }
3505        Some(route)
3506    }
3507
3508    fn should_emit_destination_ack(&self, frame: &[u8], header: &PacketHeader) -> bool {
3509        let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3510            return false;
3511        };
3512
3513        // Destination ACKs are emitted only once a source route is fully
3514        // consumed. An empty SourceRoute still matters for provenance, but it
3515        // no longer constrains forwarding and therefore counts as "at the
3516        // destination" for ACK purposes.
3517        options
3518            .source_route
3519            .map(|range| range.is_empty())
3520            .unwrap_or(true)
3521    }
3522
3523    fn maybe_forward_received(
3524        &mut self,
3525        frame: &[u8],
3526        header: &PacketHeader,
3527        rx: &RxInfo,
3528        locally_handled_unicast: bool,
3529    ) -> bool {
3530        if !self.repeater.enabled {
3531            return false;
3532        }
3533        if !header.packet_type().is_routable() {
3534            return false;
3535        }
3536        // Once a point-to-point packet is handled by its actual destination, it
3537        // should not also be repeated by that same node. Other packet classes,
3538        // including broadcast and MAC ACK, remain routable.
3539        if locally_handled_unicast
3540            && matches!(
3541                header.packet_type(),
3542                PacketType::Unicast
3543                    | PacketType::UnicastAckReq
3544                    | PacketType::BlindUnicast
3545                    | PacketType::BlindUnicastAckReq
3546            )
3547        {
3548            return false;
3549        }
3550
3551        let Ok(options) = ParsedOptions::extract(frame, header.options_range.clone()) else {
3552            return false;
3553        };
3554        let Some(cache_key) = Self::forward_dup_key(header, frame) else {
3555            return false;
3556        };
3557        if self.dup_cache.contains(&cache_key) {
3558            self.defer_pending_forward(&cache_key, rx, &options);
3559            return false;
3560        }
3561
3562        let Some(plan) = self.plan_forwarding(frame, header, &options, rx) else {
3563            return false;
3564        };
3565
3566        let mut rewritten = [0u8; FRAME];
3567        let Ok(total_len) =
3568            self.rewrite_forwarded_frame(frame, header, &options, plan, &mut rewritten)
3569        else {
3570            return false;
3571        };
3572        if total_len > self.radio.max_frame_size() {
3573            return false;
3574        }
3575
3576        let now_ms = self.clock.now_ms();
3577        if self
3578            .tx_queue
3579            .enqueue_with_state(
3580                TxPriority::Forward,
3581                &rewritten[..total_len],
3582                None,
3583                None,
3584                now_ms.saturating_add(plan.delay_ms),
3585                0,
3586                0,
3587            )
3588            .is_err()
3589        {
3590            return false;
3591        }
3592        self.dup_cache.insert(cache_key, now_ms);
3593        true
3594    }
3595
3596    fn plan_forwarding(
3597        &mut self,
3598        frame: &[u8],
3599        header: &PacketHeader,
3600        options: &ParsedOptions,
3601        rx: &RxInfo,
3602    ) -> Option<ForwardPlan> {
3603        if options.has_unknown_critical {
3604            return None;
3605        }
3606
3607        let router_hint = self.repeater_router_hint()?;
3608        let station_action = self.classify_forward_station_action(frame, header)?;
3609
3610        let source_route_bytes = options
3611            .source_route
3612            .as_ref()
3613            .and_then(|range| frame.get(range.clone()))
3614            .unwrap_or(&[]);
3615        if source_route_bytes.len() % 2 != 0 {
3616            return None;
3617        }
3618
3619        let mut consume_source_route = false;
3620        let mut decrement_flood_hops = false;
3621        let mut insert_region_code = None;
3622        let mut delay_ms = 0u64;
3623
3624        if !source_route_bytes.is_empty() {
3625            if source_route_bytes[..2] != router_hint.0 {
3626                return None;
3627            }
3628            consume_source_route = true;
3629            if source_route_bytes.len() == 2 {
3630                decrement_flood_hops = header.flood_hops.is_some();
3631            }
3632        } else {
3633            decrement_flood_hops = true;
3634        }
3635
3636        if decrement_flood_hops {
3637            let flood_hops = header.flood_hops?;
3638            if flood_hops.remaining() == 0 {
3639                return None;
3640            }
3641            // Signal-quality filtering applies only to flood forwarding,
3642            // not to source-routed hops.
3643            if let Some(min_rssi) = Self::effective_min_rssi(options, &self.repeater) {
3644                if rx.rssi < min_rssi {
3645                    return None;
3646                }
3647            }
3648            if let Some(min_snr) = Self::effective_min_snr(options, &self.repeater) {
3649                if rx.snr < Snr::from_decibels(min_snr) {
3650                    return None;
3651                }
3652            }
3653            let mut saw_region_code = false;
3654            let mut matched_region_code = false;
3655            if !header.options_range.is_empty() {
3656                for entry in umsh_core::iter_options(frame, header.options_range.clone()) {
3657                    let (number, value) = entry.ok()?;
3658                    if OptionNumber::from(number) != OptionNumber::RegionCode || value.len() != 2 {
3659                        continue;
3660                    }
3661                    saw_region_code = true;
3662                    let region_code = [value[0], value[1]];
3663                    if self
3664                        .repeater
3665                        .regions
3666                        .iter()
3667                        .any(|configured| *configured == region_code)
3668                    {
3669                        matched_region_code = true;
3670                    }
3671                }
3672            }
3673            if saw_region_code {
3674                if !matched_region_code {
3675                    return None;
3676                }
3677            } else {
3678                insert_region_code = self.repeater.regions.first().copied();
3679            }
3680            delay_ms = self.sample_flood_contention_delay_ms(rx, options);
3681        }
3682
3683        Some(ForwardPlan {
3684            router_hint,
3685            consume_source_route,
3686            decrement_flood_hops,
3687            insert_region_code,
3688            delay_ms,
3689            station_action,
3690        })
3691    }
3692
3693    fn classify_forward_station_action(
3694        &self,
3695        frame: &[u8],
3696        header: &PacketHeader,
3697    ) -> Option<ForwardStationAction> {
3698        let has_operator_callsign = if header.options_range.is_empty() {
3699            false
3700        } else {
3701            umsh_core::iter_options(frame, header.options_range.clone())
3702                .filter_map(Result::ok)
3703                .any(|(number, _)| OptionNumber::from(number) == OptionNumber::OperatorCallsign)
3704        };
3705        let encrypted = header
3706            .sec_info
3707            .map(|sec| sec.scf.encrypted())
3708            .unwrap_or(false);
3709
3710        match self.repeater.amateur_radio_mode {
3711            AmateurRadioMode::Unlicensed => Some(ForwardStationAction::Remove),
3712            AmateurRadioMode::LicensedOnly => {
3713                if encrypted || !has_operator_callsign || self.repeater.station_callsign.is_none() {
3714                    None
3715                } else {
3716                    Some(ForwardStationAction::Replace)
3717                }
3718            }
3719            AmateurRadioMode::Hybrid => {
3720                if !encrypted && has_operator_callsign {
3721                    self.repeater
3722                        .station_callsign
3723                        .as_ref()
3724                        .map(|_| ForwardStationAction::Replace)
3725                } else {
3726                    Some(ForwardStationAction::Remove)
3727                }
3728            }
3729        }
3730    }
3731
3732    fn rewrite_forwarded_frame(
3733        &self,
3734        src: &[u8],
3735        header: &PacketHeader,
3736        options: &ParsedOptions,
3737        plan: ForwardPlan,
3738        dst: &mut [u8],
3739    ) -> Result<usize, CapacityError> {
3740        if dst.is_empty() {
3741            return Err(CapacityError);
3742        }
3743
3744        // FCF (no options bit in new format)
3745        dst[0] = umsh_core::Fcf::new(
3746            header.packet_type(),
3747            header.fcf.full_source(),
3748            header.flood_hops.is_some(),
3749        )
3750        .0;
3751        let mut cursor = 1;
3752
3753        // FHOPS
3754        if let Some(flood_hops) = header.flood_hops {
3755            let next = if plan.decrement_flood_hops {
3756                flood_hops.decremented().0
3757            } else {
3758                flood_hops.0
3759            };
3760            *dst.get_mut(cursor).ok_or(CapacityError)? = next;
3761            cursor += 1;
3762        }
3763
3764        // Fixed core: DST/CHANNEL/SRC/SECINFO from original (between FHOPS and options)
3765        let fhops_len = usize::from(header.flood_hops.is_some());
3766        let fixed_core = src
3767            .get(1 + fhops_len..header.options_range.start)
3768            .ok_or(CapacityError)?;
3769        let core_end = cursor + fixed_core.len();
3770        dst.get_mut(cursor..core_end)
3771            .ok_or(CapacityError)?
3772            .copy_from_slice(fixed_core);
3773        cursor = core_end;
3774
3775        // Re-encoded options (without 0xFF — caller emits marker)
3776        let options_len =
3777            self.encode_forwarded_options(src, header, options, plan, &mut dst[cursor..])?;
3778        cursor += options_len;
3779
3780        // 0xFF marker when a body follows (not needed for MAC ack whose trailer is at fixed offset)
3781        let needs_marker = !matches!(header.packet_type(), PacketType::MacAck)
3782            && header.options_range.end < header.total_len;
3783        if needs_marker {
3784            *dst.get_mut(cursor).ok_or(CapacityError)? = 0xFF;
3785            cursor += 1;
3786        }
3787
3788        // Body + MIC from original (src[options_range.end] is already past the original 0xFF)
3789        let tail = src
3790            .get(header.options_range.end..header.total_len)
3791            .ok_or(CapacityError)?;
3792        let end = cursor + tail.len();
3793        dst.get_mut(cursor..end)
3794            .ok_or(CapacityError)?
3795            .copy_from_slice(tail);
3796        Ok(end)
3797    }
3798
3799    fn encode_forwarded_options(
3800        &self,
3801        src: &[u8],
3802        header: &PacketHeader,
3803        _options: &ParsedOptions,
3804        plan: ForwardPlan,
3805        dst: &mut [u8],
3806    ) -> Result<usize, CapacityError> {
3807        let mut encoder = OptionEncoder::new(dst);
3808        let mut inserted_region = false;
3809        let mut inserted_station = false;
3810        let mut saw_station = false;
3811
3812        if !header.options_range.is_empty() {
3813            for entry in umsh_core::iter_options(src, header.options_range.clone()) {
3814                let (number, value) = entry.map_err(|_| CapacityError)?;
3815                if !inserted_region {
3816                    if let Some(region_code) = plan.insert_region_code {
3817                        if number > OptionNumber::RegionCode.as_u16() {
3818                            encoder
3819                                .put(OptionNumber::RegionCode.as_u16(), &region_code)
3820                                .map_err(|_| CapacityError)?;
3821                            inserted_region = true;
3822                        }
3823                    }
3824                }
3825                if !inserted_station
3826                    && matches!(plan.station_action, ForwardStationAction::Replace)
3827                    && number > OptionNumber::StationCallsign.as_u16()
3828                {
3829                    encoder
3830                        .put(
3831                            OptionNumber::StationCallsign.as_u16(),
3832                            self.repeater
3833                                .station_callsign
3834                                .as_ref()
3835                                .ok_or(CapacityError)?
3836                                .as_trimmed_slice(),
3837                        )
3838                        .map_err(|_| CapacityError)?;
3839                    inserted_station = true;
3840                }
3841
3842                match OptionNumber::from(number) {
3843                    OptionNumber::RegionCode => {
3844                        inserted_region = true;
3845                        encoder.put(number, value).map_err(|_| CapacityError)?;
3846                    }
3847                    OptionNumber::TraceRoute => {
3848                        let mut trace = [0u8; crate::MAX_SOURCE_ROUTE_HOPS * 2 + 2];
3849                        trace[..2].copy_from_slice(&plan.router_hint.0);
3850                        trace[2..2 + value.len()].copy_from_slice(value);
3851                        encoder
3852                            .put(number, &trace[..2 + value.len()])
3853                            .map_err(|_| CapacityError)?;
3854                    }
3855                    OptionNumber::SourceRoute if plan.consume_source_route => {
3856                        if value.len() < 2 || value.len() % 2 != 0 {
3857                            return Err(CapacityError);
3858                        }
3859                        let remaining = if value.len() > 2 { &value[2..] } else { &[] };
3860                        encoder.put(number, remaining).map_err(|_| CapacityError)?;
3861                    }
3862                    OptionNumber::StationCallsign => {
3863                        saw_station = true;
3864                        match plan.station_action {
3865                            ForwardStationAction::Remove => {}
3866                            ForwardStationAction::Replace => {
3867                                encoder
3868                                    .put(
3869                                        number,
3870                                        self.repeater
3871                                            .station_callsign
3872                                            .as_ref()
3873                                            .ok_or(CapacityError)?
3874                                            .as_trimmed_slice(),
3875                                    )
3876                                    .map_err(|_| CapacityError)?;
3877                                inserted_station = true;
3878                            }
3879                        }
3880                    }
3881                    _ => {
3882                        encoder.put(number, value).map_err(|_| CapacityError)?;
3883                    }
3884                }
3885            }
3886        }
3887
3888        if matches!(plan.station_action, ForwardStationAction::Replace)
3889            && !inserted_station
3890            && !saw_station
3891        {
3892            encoder
3893                .put(
3894                    OptionNumber::StationCallsign.as_u16(),
3895                    self.repeater
3896                        .station_callsign
3897                        .as_ref()
3898                        .ok_or(CapacityError)?
3899                        .as_trimmed_slice(),
3900                )
3901                .map_err(|_| CapacityError)?;
3902        }
3903        if let Some(region_code) = plan.insert_region_code {
3904            if !inserted_region {
3905                encoder
3906                    .put(OptionNumber::RegionCode.as_u16(), &region_code)
3907                    .map_err(|_| CapacityError)?;
3908            }
3909        }
3910        Ok(encoder.finish())
3911    }
3912
3913    fn synthesize_route_retry_resend(
3914        &self,
3915        peer: &PublicKey,
3916        resend: &ResendRecord<FRAME>,
3917    ) -> Option<ResendRecord<FRAME>> {
3918        let header = PacketHeader::parse(resend.frame.as_slice()).ok()?;
3919        let options =
3920            ParsedOptions::extract(resend.frame.as_slice(), header.options_range.clone()).ok()?;
3921        if options.route_retry {
3922            return None;
3923        }
3924        let source_route = resend.source_route.as_ref()?;
3925        if source_route.is_empty() {
3926            return None;
3927        }
3928
3929        let flood_hops = self.route_retry_flood_hops(peer, &header, source_route)?;
3930        let has_flood_hops = flood_hops > 0;
3931        let mut rewritten = [0u8; FRAME];
3932
3933        // FCF
3934        rewritten[0] = umsh_core::Fcf::new(
3935            header.packet_type(),
3936            header.fcf.full_source(),
3937            has_flood_hops,
3938        )
3939        .0;
3940        let mut cursor = 1;
3941
3942        // FHOPS
3943        if has_flood_hops {
3944            *rewritten.get_mut(cursor)? = FloodHops::new(flood_hops, 0)?.0;
3945            cursor += 1;
3946        }
3947
3948        // Fixed core: DST/CHANNEL/SRC/SECINFO from original
3949        let fhops_len = usize::from(header.flood_hops.is_some());
3950        let fixed_core = resend
3951            .frame
3952            .get(1 + fhops_len..header.options_range.start)?;
3953        let core_end = cursor + fixed_core.len();
3954        rewritten.get_mut(cursor..core_end)?.copy_from_slice(fixed_core);
3955        cursor = core_end;
3956
3957        // Re-encoded options
3958        let options_len = self
3959            .encode_route_retry_options(
3960                resend.frame.as_slice(),
3961                header.options_range.clone(),
3962                &options,
3963                &mut rewritten[cursor..],
3964            )
3965            .ok()?;
3966        cursor += options_len;
3967
3968        // 0xFF marker when body follows
3969        let needs_marker = !matches!(header.packet_type(), PacketType::MacAck)
3970            && header.options_range.end < header.total_len;
3971        if needs_marker {
3972            *rewritten.get_mut(cursor)? = 0xFF;
3973            cursor += 1;
3974        }
3975
3976        // Body + MIC from original
3977        let tail = resend
3978            .frame
3979            .get(header.options_range.end..header.total_len)?;
3980        let end = cursor + tail.len();
3981        rewritten.get_mut(cursor..end)?.copy_from_slice(tail);
3982
3983        ResendRecord::try_new(&rewritten[..end], None).ok()
3984    }
3985
3986    fn encode_route_retry_options(
3987        &self,
3988        src: &[u8],
3989        options_range: core::ops::Range<usize>,
3990        _options: &ParsedOptions,
3991        dst: &mut [u8],
3992    ) -> Result<usize, CapacityError> {
3993        let mut encoder = OptionEncoder::new(dst);
3994        let mut inserted_trace_route = false;
3995        let mut inserted_route_retry = false;
3996
3997        if !options_range.is_empty() {
3998            for entry in umsh_core::iter_options(src, options_range) {
3999                let (number, value) = entry.map_err(|_| CapacityError)?;
4000                if !inserted_trace_route && number > OptionNumber::TraceRoute.as_u16() {
4001                    encoder
4002                        .put(OptionNumber::TraceRoute.as_u16(), &[])
4003                        .map_err(|_| CapacityError)?;
4004                    inserted_trace_route = true;
4005                }
4006                if !inserted_route_retry && number > OptionNumber::RouteRetry.as_u16() {
4007                    encoder
4008                        .put(OptionNumber::RouteRetry.as_u16(), &[])
4009                        .map_err(|_| CapacityError)?;
4010                    inserted_route_retry = true;
4011                }
4012                match OptionNumber::from(number) {
4013                    OptionNumber::SourceRoute => {}
4014                    OptionNumber::TraceRoute => {
4015                        encoder.put(number, value).map_err(|_| CapacityError)?;
4016                        inserted_trace_route = true;
4017                    }
4018                    OptionNumber::RouteRetry => {}
4019                    _ => {
4020                        encoder.put(number, value).map_err(|_| CapacityError)?;
4021                    }
4022                }
4023            }
4024        }
4025
4026        if !inserted_trace_route {
4027            encoder
4028                .put(OptionNumber::TraceRoute.as_u16(), &[])
4029                .map_err(|_| CapacityError)?;
4030        }
4031        if !inserted_route_retry {
4032            encoder
4033                .put(OptionNumber::RouteRetry.as_u16(), &[])
4034                .map_err(|_| CapacityError)?;
4035        }
4036
4037        Ok(encoder.finish())
4038    }
4039
4040    fn route_retry_flood_hops(
4041        &self,
4042        peer: &PublicKey,
4043        header: &PacketHeader,
4044        source_route: &heapless::Vec<RouterHint, MAX_SOURCE_ROUTE_HOPS>,
4045    ) -> Option<u8> {
4046        let existing = header
4047            .flood_hops
4048            .map(|hops| hops.remaining())
4049            .filter(|hops| *hops > 0);
4050        let cached = self
4051            .peer_registry
4052            .lookup_by_key(peer)
4053            .and_then(|(_, info)| match info.route.as_ref() {
4054                Some(crate::CachedRoute::Flood { hops, .. }) => Some((*hops).clamp(1, 15)),
4055                _ => None,
4056            });
4057        let route_len = u8::try_from(source_route.len())
4058            .ok()
4059            .map(|hops| hops.clamp(1, 15));
4060
4061        existing.or(cached).or(route_len).or(Some(5))
4062    }
4063
4064    fn repeater_router_hint(&self) -> Option<RouterHint> {
4065        self.identities
4066            .iter()
4067            .filter_map(|slot| slot.as_ref())
4068            .next()
4069            .map(|slot| slot.identity().public_key().router_hint())
4070    }
4071
4072    fn effective_min_rssi(options: &ParsedOptions, repeater: &RepeaterConfig) -> Option<i16> {
4073        match (options.min_rssi, repeater.min_rssi) {
4074            (Some(packet), Some(local)) => Some(packet.max(local)),
4075            (Some(packet), None) => Some(packet),
4076            (None, Some(local)) => Some(local),
4077            (None, None) => None,
4078        }
4079    }
4080
4081    fn effective_min_snr(options: &ParsedOptions, repeater: &RepeaterConfig) -> Option<i8> {
4082        match (options.min_snr, repeater.min_snr) {
4083            (Some(packet), Some(local)) => Some(packet.max(local)),
4084            (Some(packet), None) => Some(packet),
4085            (None, Some(local)) => Some(local),
4086            (None, None) => None,
4087        }
4088    }
4089
4090    fn sample_flood_contention_delay_ms(&mut self, rx: &RxInfo, options: &ParsedOptions) -> u64 {
4091        let effective_threshold_db =
4092            Self::effective_min_snr(options, &self.repeater).unwrap_or(i8::MIN);
4093        let low_db = self
4094            .repeater
4095            .flood_contention_snr_low_db
4096            .max(effective_threshold_db);
4097        let high_db = self
4098            .repeater
4099            .flood_contention_snr_high_db
4100            .max(low_db.saturating_add(1));
4101        let low = i32::from(Snr::from_decibels(low_db).as_centibels());
4102        let high = i32::from(Snr::from_decibels(high_db).as_centibels());
4103        let received = i32::from(rx.snr.as_centibels());
4104        let clamped = (received - low).clamp(0, high - low) as u32;
4105        let range = (high - low) as u32;
4106        let t_frame_ms = u64::from(self.radio.t_frame_ms());
4107        let min_window_ms = t_frame_ms
4108            .saturating_mul(u64::from(self.repeater.flood_contention_min_window_percent))
4109            / 100;
4110        let max_window_ms = t_frame_ms
4111            .saturating_mul(u64::from(self.repeater.flood_contention_max_window_frames))
4112            .max(min_window_ms);
4113        let window_span_ms = max_window_ms.saturating_sub(min_window_ms);
4114        let window_ms = if range == 0 {
4115            max_window_ms
4116        } else {
4117            max_window_ms.saturating_sub(
4118                window_span_ms.saturating_mul(u64::from(clamped)) / u64::from(range),
4119            )
4120        };
4121        if window_ms == 0 {
4122            0
4123        } else {
4124            self.rng.random_range(..window_ms.saturating_add(1))
4125        }
4126    }
4127
4128    /// Routing identity used for duplicate suppression at repeaters.
4129    ///
4130    /// This is intentionally not the same thing as the destination's logical
4131    /// delivery identity:
4132    /// - delivery identity is governed by replay windows / frame counters at
4133    ///   the destination
4134    /// - routing identity must remain stable across repeater rewrites of
4135    ///   dynamic routing metadata
4136    /// - forwarding-confirmation identity intentionally matches routing
4137    ///   identity so a node can recognize "the same packet, forwarded onward"
4138    pub(crate) fn forward_dup_key(header: &PacketHeader, frame: &[u8]) -> Option<DupCacheKey> {
4139        Self::routable_packet_identity(header, frame)
4140    }
4141
4142    fn routable_packet_identity(header: &PacketHeader, frame: &[u8]) -> Option<DupCacheKey> {
4143        if !header.packet_type().is_secure() {
4144            return Some(DupCacheKey::Hash32(Self::normalized_routable_hash32(
4145                header, frame,
4146            )));
4147        }
4148        let options = ParsedOptions::extract(frame, header.options_range.clone()).ok()?;
4149        let mic = frame.get(header.mic_range.clone())?;
4150        if mic.is_empty() || mic.len() > 16 {
4151            return None;
4152        }
4153        let mut bytes = [0u8; 16];
4154        bytes[..mic.len()].copy_from_slice(mic);
4155        Some(DupCacheKey::Mic {
4156            bytes,
4157            len: mic.len() as u8,
4158            route_retry: options.route_retry,
4159        })
4160    }
4161
4162    fn defer_pending_forward(&mut self, key: &DupCacheKey, rx: &RxInfo, options: &ParsedOptions) {
4163        let Some(queued) = self.tx_queue.remove_first_matching(|entry| {
4164            entry.priority == TxPriority::Forward
4165                && Self::confirmation_key(entry.frame.as_slice())
4166                    .map(|entry_key| &entry_key == key)
4167                    .unwrap_or(false)
4168        }) else {
4169            return;
4170        };
4171
4172        if queued.forward_deferrals >= self.repeater.flood_contention_max_deferrals {
4173            return;
4174        }
4175
4176        let now_ms = self.clock.now_ms();
4177        let delay_ms = self.sample_flood_contention_delay_ms(rx, options);
4178        let _ = self.tx_queue.enqueue_with_state(
4179            queued.priority,
4180            queued.frame.as_slice(),
4181            queued.receipt,
4182            queued.identity_id,
4183            now_ms.saturating_add(delay_ms),
4184            queued.cad_attempts,
4185            queued.forward_deferrals.saturating_add(1),
4186        );
4187    }
4188
4189    async fn service_post_tx_listen(
4190        &mut self,
4191        mut on_event: impl FnMut(LocalIdentityId, crate::MacEventRef<'_>),
4192    ) -> Result<(), MacError<<P::Radio as Radio>::Error>> {
4193        loop {
4194            self.expire_post_tx_listen_if_needed();
4195            if self.post_tx_listen.is_none() {
4196                return Ok(());
4197            }
4198
4199            let handled = self.receive_one(&mut on_event).await?;
4200            if !handled {
4201                return Ok(());
4202            }
4203        }
4204    }
4205
4206    fn note_transmitted_ack_requested(&mut self, receipt: SendReceipt, frame: &[u8]) {
4207        let sent_ms = self.clock.now_ms();
4208        let direct_ack_deadline_ms = sent_ms.saturating_add(self.direct_ack_timeout_ms());
4209        let forwarded_ack_deadline_ms = sent_ms.saturating_add(self.forwarded_ack_timeout_ms());
4210        let confirm_timeout_ms = self.forward_confirm_timeout_ms();
4211        let confirm_key = Self::confirmation_key(frame);
4212
4213        let post_tx_listen = {
4214            let Some((identity_id, pending)) = self.pending_ack_mut(receipt) else {
4215                return;
4216            };
4217
4218            let needs_forward_confirmation = match pending.state {
4219                crate::AckState::Queued {
4220                    needs_forward_confirmation,
4221                } => needs_forward_confirmation,
4222                crate::AckState::RetryQueued => true,
4223                _ => return,
4224            };
4225
4226            pending.sent_ms = sent_ms;
4227            if pending.ack_deadline_ms == 0 {
4228                pending.ack_deadline_ms = if needs_forward_confirmation {
4229                    forwarded_ack_deadline_ms
4230                } else {
4231                    direct_ack_deadline_ms
4232                };
4233            }
4234
4235            if needs_forward_confirmation {
4236                let deadline_ms = sent_ms.saturating_add(confirm_timeout_ms);
4237                pending.state = crate::AckState::AwaitingForward {
4238                    confirm_deadline_ms: deadline_ms,
4239                };
4240                confirm_key.map(|confirm_key| PostTxListen {
4241                    identity_id,
4242                    receipt,
4243                    confirm_key,
4244                    deadline_ms,
4245                })
4246            } else {
4247                pending.state = crate::AckState::AwaitingAck;
4248                None
4249            }
4250        };
4251
4252        self.post_tx_listen = post_tx_listen;
4253    }
4254
4255    fn expire_post_tx_listen_if_needed(&mut self) {
4256        let should_clear = self
4257            .post_tx_listen
4258            .as_ref()
4259            .map(|listen| self.clock.now_ms() >= listen.deadline_ms)
4260            .unwrap_or(false);
4261        if should_clear {
4262            self.post_tx_listen = None;
4263        }
4264    }
4265
4266    fn forward_confirm_timeout_ms(&self) -> u64 {
4267        let t_frame_ms = u64::from(self.radio.t_frame_ms());
4268        t_frame_ms
4269            .saturating_add(self.max_forward_contention_delay_ms())
4270            .saturating_add(t_frame_ms)
4271    }
4272
4273    fn max_forward_contention_delay_ms(&self) -> u64 {
4274        u64::from(self.radio.t_frame_ms())
4275            .saturating_mul(u64::from(self.repeater.flood_contention_max_window_frames))
4276    }
4277
4278    fn forward_retry_backoff_cap_ms(&self, retry_number: u8) -> u32 {
4279        Self::forward_retry_backoff_cap_ms_for_t_frame(self.radio.t_frame_ms(), retry_number)
4280    }
4281
4282    fn forward_retry_backoff_cap_ms_for_t_frame(t_frame_ms: u32, retry_number: u8) -> u32 {
4283        let exponent = retry_number.saturating_sub(1).min(2);
4284        t_frame_ms
4285            .saturating_mul(1u32 << exponent)
4286            .min(t_frame_ms.saturating_mul(4))
4287    }
4288
4289    fn can_attempt_route_retry(pending: &PendingAck<FRAME>) -> bool {
4290        let Ok(header) = PacketHeader::parse(pending.resend.frame.as_slice()) else {
4291            return false;
4292        };
4293        let Ok(options) = ParsedOptions::extract(
4294            pending.resend.frame.as_slice(),
4295            header.options_range.clone(),
4296        ) else {
4297            return false;
4298        };
4299        !options.route_retry
4300            && pending
4301                .resend
4302                .source_route
4303                .as_ref()
4304                .map(|route| !route.is_empty())
4305                .unwrap_or(false)
4306    }
4307
4308    fn direct_ack_timeout_ms(&self) -> u64 {
4309        u64::from(self.radio.t_frame_ms()).saturating_mul(10)
4310    }
4311
4312    fn forwarded_ack_timeout_ms(&self) -> u64 {
4313        let mut total = self.forward_confirm_timeout_ms();
4314        for retry_number in 1..=MAX_FORWARD_RETRIES {
4315            total = total
4316                .saturating_add(u64::from(self.forward_retry_backoff_cap_ms(retry_number)))
4317                .saturating_add(self.forward_confirm_timeout_ms());
4318        }
4319        total.saturating_add(u64::from(self.radio.t_frame_ms()))
4320    }
4321
4322    fn pending_ack_mut(
4323        &mut self,
4324        receipt: SendReceipt,
4325    ) -> Option<(LocalIdentityId, &mut PendingAck<FRAME>)> {
4326        for (index, slot) in self.identities.iter_mut().enumerate() {
4327            let Some(slot) = slot.as_mut() else {
4328                continue;
4329            };
4330            if let Some(pending) = slot.pending_ack_mut(&receipt) {
4331                return Some((LocalIdentityId(index as u8), pending));
4332            }
4333        }
4334        None
4335    }
4336
4337    pub(crate) fn confirmation_key(frame: &[u8]) -> Option<DupCacheKey> {
4338        let header = PacketHeader::parse(frame).ok()?;
4339        Self::routable_packet_identity(&header, frame)
4340    }
4341
4342    fn normalized_routable_hash32(header: &PacketHeader, frame: &[u8]) -> u32 {
4343        let mut hash = 0x811C_9DC5u32;
4344
4345        Self::hash_u8(&mut hash, header.packet_type() as u8);
4346        Self::hash_u8(&mut hash, header.fcf.full_source() as u8);
4347
4348        if !header.options_range.is_empty() {
4349            for entry in umsh_core::iter_options(frame, header.options_range.clone()) {
4350                let Ok((number, value)) = entry else {
4351                    continue;
4352                };
4353                let option = OptionNumber::from(number);
4354                if option.is_dynamic() {
4355                    continue;
4356                }
4357                Self::hash_u16(&mut hash, number);
4358                Self::hash_u16(&mut hash, value.len() as u16);
4359                Self::hash_bytes(&mut hash, value);
4360            }
4361        }
4362
4363        match header.packet_type() {
4364            PacketType::Broadcast => {
4365                match header.source {
4366                    umsh_core::SourceAddrRef::Hint(hint) => Self::hash_bytes(&mut hash, &hint.0),
4367                    umsh_core::SourceAddrRef::FullKeyAt { offset } => {
4368                        if let Some(key) = frame.get(offset..offset + 32) {
4369                            Self::hash_bytes(&mut hash, key);
4370                        }
4371                    }
4372                    umsh_core::SourceAddrRef::Encrypted { offset, len } => {
4373                        if let Some(src) = frame.get(offset..offset + len) {
4374                            Self::hash_bytes(&mut hash, src);
4375                        }
4376                    }
4377                    umsh_core::SourceAddrRef::None => {}
4378                }
4379                if let Some(payload) = frame.get(header.body_range.clone()) {
4380                    Self::hash_bytes(&mut hash, payload);
4381                }
4382            }
4383            PacketType::MacAck => {
4384                if let Some(dst) = header.ack_dst {
4385                    Self::hash_bytes(&mut hash, &dst.0);
4386                }
4387                if let Some(tag) = frame.get(header.mic_range.clone()) {
4388                    Self::hash_bytes(&mut hash, tag);
4389                }
4390            }
4391            _ => {
4392                if let Some(bytes) = frame.get(header.body_range.clone()) {
4393                    Self::hash_bytes(&mut hash, bytes);
4394                }
4395            }
4396        }
4397        hash
4398    }
4399
4400    fn hash_u8(hash: &mut u32, value: u8) {
4401        *hash ^= u32::from(value);
4402        *hash = hash.wrapping_mul(0x0100_0193);
4403    }
4404
4405    fn hash_u16(hash: &mut u32, value: u16) {
4406        Self::hash_bytes(hash, &value.to_be_bytes());
4407    }
4408
4409    fn hash_bytes(hash: &mut u32, bytes: &[u8]) {
4410        for byte in bytes {
4411            Self::hash_u8(hash, *byte);
4412        }
4413    }
4414
4415    /// Check if a received frame confirms forwarding of a pending send.
4416    ///
4417    /// Returns `Some((identity_id, receipt))` on successful confirmation
4418    /// (AwaitingForward → AwaitingAck transition), `None` otherwise.
4419    fn observe_forwarding_confirmation(
4420        &mut self,
4421        frame: &[u8],
4422    ) -> Option<(LocalIdentityId, SendReceipt)> {
4423        self.expire_post_tx_listen_if_needed();
4424        let listen = self.post_tx_listen.clone()?;
4425
4426        let received_key = Self::confirmation_key(frame)?;
4427        if received_key != listen.confirm_key {
4428            return None;
4429        }
4430
4431        let Some(slot) = self.identity_mut(listen.identity_id) else {
4432            self.post_tx_listen = None;
4433            return None;
4434        };
4435        let Some(pending) = slot.pending_ack_mut(&listen.receipt) else {
4436            self.post_tx_listen = None;
4437            return None;
4438        };
4439        if !matches!(pending.state, crate::AckState::AwaitingForward { .. }) {
4440            self.post_tx_listen = None;
4441            return None;
4442        }
4443
4444        pending.state = crate::AckState::AwaitingAck;
4445        self.post_tx_listen = None;
4446        Some((listen.identity_id, listen.receipt))
4447    }
4448
4449    fn match_pending_peer_for_ack(
4450        &self,
4451        slot: &IdentitySlot<P::Identity, PEERS, ACKS, FRAME>,
4452        ack_tag_bytes: &[u8],
4453    ) -> Option<PublicKey> {
4454        if ack_tag_bytes.len() != 8 {
4455            return None;
4456        }
4457
4458        slot.pending_acks
4459            .iter()
4460            .find_map(|(_, pending)| (pending.ack_tag == ack_tag_bytes).then_some(pending.peer))
4461    }
4462}
4463
4464fn align_counter_boundary(value: u32) -> u32 {
4465    value & !COUNTER_PERSIST_BLOCK_MASK
4466}
4467
4468fn next_counter_persist_target(next_counter: u32) -> u32 {
4469    next_counter.wrapping_add(COUNTER_PERSIST_BLOCK_SIZE) & !COUNTER_PERSIST_BLOCK_MASK
4470}