From 112e8ce1791bf08a05d63f12ad7e6b5b881d0a72 Mon Sep 17 00:00:00 2001 From: Danilo D Date: Thu, 2 Apr 2026 09:59:37 -0500 Subject: [PATCH] lora/lora-lorawan: Add LoRaWAN 1.0.x MAC layer. Add LoRaWAN Class A device operation with OTAA and ABP activation. Includes AES-128-CMAC/CTR crypto primitives built on ucryptolib for MIC calculation, payload encryption, and session key derivation per the LoRaWAN 1.0.x specification. Used by Pycom LoPy/LoPy4 boards as defined in micropython/micropython#19026. Signed-off-by: Danilo D --- micropython/lora/lora-lorawan/lora/lorawan.py | 397 ++++++++++++++++++ .../lora/lora-lorawan/lora/lorawan_crypto.py | 153 +++++++ micropython/lora/lora-lorawan/manifest.py | 2 + 3 files changed, 552 insertions(+) create mode 100644 micropython/lora/lora-lorawan/lora/lorawan.py create mode 100644 micropython/lora/lora-lorawan/lora/lorawan_crypto.py create mode 100644 micropython/lora/lora-lorawan/manifest.py diff --git a/micropython/lora/lora-lorawan/lora/lorawan.py b/micropython/lora/lora-lorawan/lora/lorawan.py new file mode 100644 index 000000000..1b5f085bf --- /dev/null +++ b/micropython/lora/lora-lorawan/lora/lorawan.py @@ -0,0 +1,397 @@ +""" +LoRaWAN 1.0.x MAC layer for MicroPython. + +Implements Class A device operation with OTAA and ABP activation. +Runs on top of the SX127x raw LoRa driver. + +Usage (OTAA): + from lora import LoRa + from lorawan import LoRaWAN + + radio = LoRa(frequency=868100000, sf=7, bw=125000) + wan = LoRaWAN(radio, mode=LoRaWAN.OTAA, + dev_eui=bytes.fromhex('...'), + app_eui=bytes.fromhex('...'), + app_key=bytes.fromhex('...')) + wan.join() + wan.send(1, b'Hello') + +Usage (ABP): + wan = LoRaWAN(radio, mode=LoRaWAN.ABP, + dev_addr=0x01234567, + nwk_skey=bytes.fromhex('...'), + app_skey=bytes.fromhex('...')) + wan.send(1, b'Hello') +""" + +from micropython import const +import struct +import time +import os +from lora.lorawan_crypto import ( + aes_cmac, + aes_ctr_encrypt, + aes_ctr_decrypt, + compute_mic, + derive_session_keys, +) + +# LoRaWAN MHDR message types +_MTYPE_JOIN_REQUEST = const(0x00) +_MTYPE_JOIN_ACCEPT = const(0x20) +_MTYPE_UNCONFIRMED_UP = const(0x40) +_MTYPE_UNCONFIRMED_DOWN = const(0x60) +_MTYPE_CONFIRMED_UP = const(0x80) +_MTYPE_CONFIRMED_DOWN = const(0xA0) + +# Direction constants +_DIR_UP = const(0) +_DIR_DOWN = const(1) + +# LoRaWAN data rate table for EU868 +_DR_TABLE_EU868 = { + 0: (12, 125000), + 1: (11, 125000), + 2: (10, 125000), + 3: (9, 125000), + 4: (8, 125000), + 5: (7, 125000), +} + +# Default RX2 parameters (EU868) +_RX2_FREQ = 869525000 +_RX2_DR = 0 # SF12/125kHz + +# Class A timing +_JOIN_ACCEPT_DELAY1 = 5000 # ms +_JOIN_ACCEPT_DELAY2 = 6000 # ms +_RECEIVE_DELAY1 = 1000 # ms +_RECEIVE_DELAY2 = 2000 # ms + + +class LoRaWAN: + """LoRaWAN 1.0.x Class A device.""" + + OTAA = 0 + ABP = 1 + + def __init__( + self, + radio, + mode=0, + dev_eui=None, + app_eui=None, + app_key=None, + dev_addr=None, + nwk_skey=None, + app_skey=None, + ): + """ + Initialize LoRaWAN MAC layer. + + Args: + radio: LoRa radio instance (from lora module) + mode: LoRaWAN.OTAA or LoRaWAN.ABP + dev_eui: 8-byte Device EUI (OTAA) + app_eui: 8-byte Application EUI / JoinEUI (OTAA) + app_key: 16-byte Application Key (OTAA) + dev_addr: 4-byte Device Address as int (ABP) + nwk_skey: 16-byte Network Session Key (ABP) + app_skey: 16-byte Application Session Key (ABP) + """ + self._radio = radio + self._mode = mode + self._joined = False + + # Frame counters + self._fcnt_up = 0 + self._fcnt_down = 0 + + # RX1 delay and data rate offset + self._rx1_delay = _RECEIVE_DELAY1 + self._rx1_dr_offset = 0 + + if mode == self.OTAA: + if not all([dev_eui, app_eui, app_key]): + raise ValueError("OTAA requires dev_eui, app_eui, app_key") + self._dev_eui = dev_eui + self._app_eui = app_eui + self._app_key = app_key + self._dev_addr = None + self._nwk_skey = None + self._app_skey = None + elif mode == self.ABP: + if not all([dev_addr is not None, nwk_skey, app_skey]): + raise ValueError("ABP requires dev_addr, nwk_skey, app_skey") + self._dev_addr = dev_addr + self._nwk_skey = nwk_skey + self._app_skey = app_skey + self._joined = True + else: + raise ValueError("mode must be OTAA or ABP") + + @property + def joined(self): + """Whether the device has joined the network.""" + return self._joined + + @property + def dev_addr(self): + """Device address (available after join).""" + return self._dev_addr + + def join(self, timeout=30000): + """ + Perform OTAA join procedure. + + Args: + timeout: Total timeout in ms for join attempts (default 30s) + + Raises: + RuntimeError: If join fails after timeout + """ + if self._mode != self.OTAA: + raise RuntimeError("join() only for OTAA mode") + + # Configure radio for join + self._radio._radio.set_sync_word(0x34) # LoRaWAN public sync word + + start = time.ticks_ms() + while time.ticks_diff(time.ticks_ms(), start) < timeout: + dev_nonce = os.urandom(2) + join_req = self._build_join_request(dev_nonce) + + # Send join request + self._radio._radio.send(join_req) + tx_end = time.ticks_ms() + + # RX1 window: JoinAcceptDelay1 + accept = self._rx_window(tx_end, _JOIN_ACCEPT_DELAY1) + if accept: + if self._process_join_accept(accept, dev_nonce): + self._joined = True + return True + + # RX2 window: JoinAcceptDelay2 at RX2 frequency/DR + sf, bw = _DR_TABLE_EU868[_RX2_DR] + saved_freq = self._radio._radio._frequency + self._radio._radio.set_frequency(_RX2_FREQ) + self._radio._radio.set_spreading_factor(sf) + self._radio._radio.set_bandwidth(bw) + + accept = self._rx_window(tx_end, _JOIN_ACCEPT_DELAY2) + + # Restore original settings + self._radio._radio.set_frequency(saved_freq) + self._radio._radio.set_spreading_factor(7) + self._radio._radio.set_bandwidth(125000) + + if accept: + if self._process_join_accept(accept, dev_nonce): + self._joined = True + return True + + # Backoff before retry + time.sleep_ms(1000) + + raise RuntimeError("Join failed: timeout") + + def send(self, port, payload, confirmed=False): + """ + Send an uplink frame. + + Args: + port: FPort (1-223) + payload: Data bytes to send + confirmed: If True, send confirmed uplink + + Returns: + Downlink data bytes or None + """ + if not self._joined: + raise RuntimeError("Not joined") + + if port < 1 or port > 223: + raise ValueError("FPort must be 1-223") + + mtype = _MTYPE_CONFIRMED_UP if confirmed else _MTYPE_UNCONFIRMED_UP + frame = self._build_data_frame(mtype, port, payload) + + # Send frame + self._radio._radio.set_sync_word(0x34) + self._radio._radio.send(frame) + tx_end = time.ticks_ms() + + self._fcnt_up += 1 + + # Class A: open RX1, then RX2 + # RX1 + rx_data = self._rx_window(tx_end, self._rx1_delay) + if rx_data: + return self._process_downlink(rx_data) + + # RX2 + sf, bw = _DR_TABLE_EU868[_RX2_DR] + saved_freq = self._radio._radio._frequency + self._radio._radio.set_frequency(_RX2_FREQ) + self._radio._radio.set_spreading_factor(sf) + self._radio._radio.set_bandwidth(bw) + + rx_data = self._rx_window(tx_end, _RECEIVE_DELAY2) + + self._radio._radio.set_frequency(saved_freq) + self._radio._radio.set_spreading_factor(7) + self._radio._radio.set_bandwidth(125000) + + if rx_data: + return self._process_downlink(rx_data) + + return None + + def _rx_window(self, tx_end, delay_ms): + """Open a receive window at tx_end + delay_ms.""" + now = time.ticks_ms() + wait = time.ticks_diff(tx_end + delay_ms, now) + if wait > 0: + time.sleep_ms(wait) + return self._radio._radio.recv(timeout_ms=1000) + + def _build_join_request(self, dev_nonce): + """Build LoRaWAN join-request message.""" + mhdr = bytes([_MTYPE_JOIN_REQUEST | 0x00]) # Major=0 + # AppEUI and DevEUI in little-endian + body = mhdr + self._app_eui[::-1] + self._dev_eui[::-1] + dev_nonce + mic = aes_cmac(self._app_key, body)[:4] + return body + mic + + def _process_join_accept(self, data, dev_nonce): + """Process and validate a join-accept message.""" + if len(data) < 17: + return False + + # Decrypt join-accept (the entire payload after MHDR is encrypted) + mhdr = data[0] + if (mhdr & 0xE0) != _MTYPE_JOIN_ACCEPT: + return False + + from ucryptolib import aes + + encrypted = data[1:] + decrypted = bytearray() + for i in range(0, len(encrypted), 16): + block = encrypted[i : i + 16] + if len(block) < 16: + block = block + b"\x00" * (16 - len(block)) + decrypted += aes(self._app_key, 1).encrypt(block) + + # Parse: AppNonce(3) | NetID(3) | DevAddr(4) | DLSettings(1) | RxDelay(1) [| CFList] + app_nonce = bytes(decrypted[0:3]) + net_id = bytes(decrypted[3:6]) + self._dev_addr = struct.unpack("> 4) & 0x07 + + # Parse RxDelay + if rx_delay == 0: + self._rx1_delay = _RECEIVE_DELAY1 + else: + self._rx1_delay = rx_delay * 1000 + + # Derive session keys + self._nwk_skey, self._app_skey = derive_session_keys( + self._app_key, app_nonce, net_id, dev_nonce + ) + + # Reset frame counters + self._fcnt_up = 0 + self._fcnt_down = 0 + + return True + + def _build_data_frame(self, mtype, port, payload): + """Build a LoRaWAN data frame (uplink).""" + mhdr = bytes([mtype | 0x00]) + + # FHDR: DevAddr(4) | FCtrl(1) | FCnt(2) + fctrl = 0x00 # No ADR, no ACK, no FOptsLen + fhdr = struct.pack(" 0 and len(msg) % _BLK == 0 + + X = b"\x00" * _BLK + for i in range(n - 1): + block = msg[i * _BLK : (i + 1) * _BLK] + X = _aes_ecb(key, _xor(X, block)) + + # Last block + if last_complete: + block = msg[(n - 1) * _BLK : n * _BLK] + block = _xor(block, K1) + else: + block = msg[(n - 1) * _BLK :] + block = block + b"\x80" + b"\x00" * (_BLK - 1 - len(block)) + block = _xor(block, K2) + + return _aes_ecb(key, _xor(X, block)) + + +def aes_ctr_encrypt(key, payload, dev_addr, fcnt, direction): + """ + LoRaWAN payload encryption using AES-128 in CTR mode. + + Per LoRaWAN 1.0.x spec section 4.3.3. + + Args: + key: 16-byte AppSKey (for FRMPayload) or NwkSKey (for FOpts) + payload: plaintext bytes + dev_addr: 4-byte device address (little-endian int) + fcnt: frame counter (uint32) + direction: 0 = uplink, 1 = downlink + + Returns: + encrypted/decrypted bytes (symmetric) + """ + k = (len(payload) + _BLK - 1) // _BLK + S = bytearray() + for i in range(1, k + 1): + A_i = struct.pack("