diff --git a/README.md b/README.md index da4e0b6..0add7ff 100644 --- a/README.md +++ b/README.md @@ -178,7 +178,7 @@ The `tee_*` fields provide cryptographic proof of the response: **Flow:** -1. Client fetches `/v1/ohttp/config` (HPKE pubkey, key_id, suite IDs) and verifies it against the Nitro attestation. +1. Client fetches `/v1/ohttp/config` (HPKE pubkey, key_id, suite IDs) and verifies it. The HPKE key is **not** in the Nitro attestation transcript; instead the config carries an RSA-PSS `signature` from the enclave's attested signing key over `computeOHTTPConfigHash(...)`, giving the chain PCRs → attested signing key → signature → HPKE config (see [Verify OHTTP Config](#4-verify-ohttp-config)). 2. Client HPKE-encapsulates a normal chat-completion JSON body and POSTs the ciphertext to a **relay**. The client carries no payment material. 3. Relay forwards the ciphertext to `/v1/ohttp` and attaches its own `X-Payment: ` header. **`/v1/ohttp` is the x402-paid boundary** — verification and settlement happen on this outer request, against the relay's payment. 4. Enclave decrypts → re-issues the request in-process to `/v1/chat/completions` against the pre-x402 WSGI app (so connexion routing, validation, TEE signing and the LLM call still run, but x402 does **not** fire a second time and the relay's `X-Payment` is **not** forwarded into the inner dispatch) → response is sealed back to the client. @@ -275,6 +275,56 @@ computed_hash = keccak(request_bytes).hex() assert computed_hash == response["tee_request_hash"] ``` +### 4. Verify OHTTP Config + +The HPKE key for anonymous inference is **not** covered by the Nitro attestation +transcript. Before encrypting a request to it, verify the RSA-PSS `signature` in +`/v1/ohttp/config` against the **attested** signing key (recovered from the +attestation document — never trusted from the same response you're verifying): + +```python +import base64 +from eth_hash.auto import keccak +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding + +cfg = requests.get("https://your-enclave:443/v1/ohttp/config").json() + +# public_key MUST come from the verified Nitro attestation document, not cfg. +public_key = serialization.load_pem_public_key(attested_public_key_pem.encode()) +der = public_key.public_bytes( + serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo +) + +# 1. The config is bound to *this* attested key: tee_id == keccak256(DER(key)). +tee_id = keccak(der) +assert tee_id == bytes.fromhex(cfg["tee_id"].removeprefix("0x")) + +# 2. Recompute the signed hash (== TEERegistryV2.computeOHTTPConfigHash). +def word(v): return v.to_bytes(32, "big") +config_hash = keccak( + keccak(b"OPENGRADIENT_TEE_OHTTP_CONFIG_V1") # domain + + tee_id # bytes32 + + word(cfg["key_id"]) + word(cfg["kem_id"]) + + word(cfg["kdf_id"]) + word(cfg["aead_id"]) + + keccak(bytes.fromhex(cfg["public_key"])) # keccak256(public_key) + + keccak(base64.b64decode(cfg["key_config"])) # keccak256(key_config) +) +assert config_hash == bytes.fromhex(cfg["signature_hash"].removeprefix("0x")) + +# 3. Verify the RSA-PSS-SHA256 signature (salt_length=32 matches server). +public_key.verify( + base64.b64decode(cfg["signature"]), + config_hash, + padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=32), + hashes.SHA256(), +) +# Only now is cfg["public_key"] safe to HPKE-encapsulate to. +``` + +See `examples/verify_ohttp_config.py` for a complete example. Skipping this step +leaves you with an unauthenticated HPKE key any network attacker could swap. + ## Architecture ``` diff --git a/examples/verify_ohttp_config.py b/examples/verify_ohttp_config.py new file mode 100644 index 0000000..deb9e50 --- /dev/null +++ b/examples/verify_ohttp_config.py @@ -0,0 +1,172 @@ +"""Verify a signed OHTTP/HPKE key configuration against the attested TEE key. + +Unlike the response signature (which clients already verify per-inference), the +HPKE key used for anonymous inference is **not** part of the Nitro attestation +transcript. Instead the enclave binds it to its attested RSA signing key with an +RSA-PSS signature over a keccak256 config hash (the same hash +``TEERegistryV2.computeOHTTPConfigHash`` computes on-chain). + +So before encrypting anything to the HPKE public key, a client/relay MUST: + + 1. Obtain the enclave's signing key via the Nitro attestation document + (``/enclave/attestation`` → verified against the AWS Nitro root CA, see + ``verify_attestation.py``). This is the only trust anchor. + 2. Fetch the signed config from ``GET /v1/ohttp/config`` (or read the ``hpke`` + field embedded in ``/signing-key``). + 3. Confirm the config's ``tee_id`` equals keccak256(DER(signing key)) — this + ties the config to *this* attested key, not some other enclave's. + 4. Recompute the config hash from the config fields and check it matches + ``signature_hash``. + 5. Verify the RSA-PSS signature over that hash with the attested public key. + +If any step fails, the HPKE public key is NOT attested — do not use it. + +The trust chain is: PCRs → attested RSA signing key → RSA-PSS signature → HPKE +config. Skipping step 5 leaves you with an unauthenticated key that any +network attacker (including the relay) could swap. +""" + +import base64 + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding +from eth_hash.auto import keccak + +# --------------------------------------------------------------------------- +# INPUTS — replace with values from your deployment. +# --------------------------------------------------------------------------- + +# Attested signing key, recovered from the verified Nitro attestation document. +# (Here we hardcode a PEM only as a placeholder; in practice this MUST come from +# attestation, never from the same /v1/ohttp/config response you are verifying.) +public_key_pem = """-----BEGIN PUBLIC KEY----- +PASTE_THE_ATTESTED_PUBLIC_KEY_PEM_HERE +-----END PUBLIC KEY-----""" + +# The JSON object returned by GET /v1/ohttp/config (== the "hpke" field of +# /signing-key). Field names match TEEKeyManager.get_signed_hpke_config(). +signed_config = { + "tee_id": "0x...", # keccak256(DER(signing key)) + "key_id": 0x01, + "kem_id": 0x0020, # DHKEM(X25519, HKDF-SHA256) + "kdf_id": 0x0001, # HKDF-SHA256 + "aead_id": 0x0003, # ChaCha20-Poly1305 + "public_key": "...", # hex, raw X25519 public key (32 bytes) + "key_config": "...", # base64, RFC 9458 §3 key-config blob + "signature": "...", # base64, RSA-PSS-SHA256 over the config hash + "signature_hash": "0x...", # hex, keccak256 config hash (for convenience) +} + + +# --------------------------------------------------------------------------- +# This must byte-match tee_manager.compute_ohttp_config_hash and the on-chain +# TEERegistryV2.computeOHTTPConfigHash. Layout (every fixed field a 32-byte +# word, dynamic fields pre-hashed): +# keccak256( +# keccak256("OPENGRADIENT_TEE_OHTTP_CONFIG_V1") // domain +# || tee_id // bytes32 +# || uint256(key_id) || uint256(kem_id) +# || uint256(kdf_id) || uint256(aead_id) +# || keccak256(public_key) // bytes32 +# || keccak256(key_config) // bytes32 +# ) +# --------------------------------------------------------------------------- +def compute_ohttp_config_hash( + tee_id: bytes, + key_id: int, + kem_id: int, + kdf_id: int, + aead_id: int, + ohttp_public_key: bytes, + ohttp_key_config: bytes, +) -> bytes: + if len(tee_id) != 32: + raise ValueError("tee_id must be 32 bytes") + + def word(value: int) -> bytes: + return value.to_bytes(32, "big") + + domain = keccak(b"OPENGRADIENT_TEE_OHTTP_CONFIG_V1") + return keccak( + domain + + tee_id + + word(key_id) + + word(kem_id) + + word(kdf_id) + + word(aead_id) + + keccak(ohttp_public_key) + + keccak(ohttp_key_config) + ) + + +def main() -> None: + print("=" * 70) + print("OHTTP CONFIG VERIFICATION") + print("=" * 70) + + public_key = serialization.load_pem_public_key(public_key_pem.encode("utf-8")) + + # --- Step 1: config is bound to THIS attested signing key --------------- + # tee_id = keccak256(DER(SubjectPublicKeyInfo)) — recompute it from the + # attested key and require an exact match. This is what prevents a + # signature made by enclave A's key from being presented as a binding for + # enclave B. + public_key_der = public_key.public_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + expected_tee_id = keccak(public_key_der) + claimed_tee_id = bytes.fromhex(signed_config["tee_id"].removeprefix("0x")) + + print("\n[1] Binding config to attested signing key") + print(f" tee_id (from attested key): 0x{expected_tee_id.hex()}") + print(f" tee_id (claimed in config): 0x{claimed_tee_id.hex()}") + if claimed_tee_id != expected_tee_id: + raise SystemExit("✗ tee_id does not match the attested key — REJECT") + print(" ✓ config is bound to the attested signing key") + + # --- Step 2: recompute the signed hash ---------------------------------- + config_hash = compute_ohttp_config_hash( + expected_tee_id, + signed_config["key_id"], + signed_config["kem_id"], + signed_config["kdf_id"], + signed_config["aead_id"], + bytes.fromhex(signed_config["public_key"]), + base64.b64decode(signed_config["key_config"]), + ) + + print("\n[2] Recomputing config hash") + print(f" computed: 0x{config_hash.hex()}") + print(f" reported: {signed_config['signature_hash']}") + reported_hash = bytes.fromhex(signed_config["signature_hash"].removeprefix("0x")) + if config_hash != reported_hash: + raise SystemExit("✗ recomputed hash != signature_hash — REJECT") + print(" ✓ hash matches signature_hash") + + # --- Step 3: verify the RSA-PSS signature over the hash ------------------ + # salt_length=32 (SHA256 digest size) matches TEEKeyManager.sign_data. + print("\n[3] Verifying RSA-PSS-SHA256 signature") + try: + public_key.verify( + base64.b64decode(signed_config["signature"]), + config_hash, + padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=32), + hashes.SHA256(), + ) + except Exception as e: # noqa: BLE001 — surface any verification failure + raise SystemExit(f"✗ signature verification FAILED: {e}") + + print(" ✓ signature verifies against the attested key") + print("\n" + "=" * 70) + print("✓✓✓ HPKE CONFIG VERIFIED ✓✓✓") + print("=" * 70) + print( + "\nThe HPKE public key below is bound to the attested enclave; it is\n" + "safe to HPKE-encapsulate your request to it:\n" + f" {signed_config['public_key']}" + ) + + +if __name__ == "__main__": + main() diff --git a/tee_gateway/controllers/ohttp_controller.py b/tee_gateway/controllers/ohttp_controller.py index 50e5496..892e1f6 100644 --- a/tee_gateway/controllers/ohttp_controller.py +++ b/tee_gateway/controllers/ohttp_controller.py @@ -396,16 +396,19 @@ def _start_response(status: str, headers: list, exc_info: Any = None): def get_hpke_config(): - """GET /v1/ohttp/config — return the HPKE key configuration. + """GET /v1/ohttp/config — return the signed HPKE key configuration. Returns both an OHTTP-compliant binary key_config (base64) and the - individual fields for clients that prefer to parse JSON. The same data is - embedded inside the attestation document at /signing-key for clients that - want to verify the binding to the enclave's PCRs in one step. + individual fields for clients that prefer to parse JSON, plus an RSA-PSS + signature (over TEERegistryV2.computeOHTTPConfigHash) that binds the config + to the enclave's attested signing key. The same data is embedded inside the + attestation document at /signing-key, where the signing key's binding to + the enclave PCRs is verifiable, so a client can chain PCRs → signing key → + HPKE config. """ try: tee = get_tee_keys() - return tee.get_hpke_config(), 200 + return tee.get_signed_hpke_config(), 200 except Exception as exc: logger.error("HPKE config error: %s", exc, exc_info=True) return {"error": "Failed to retrieve HPKE config"}, 500 diff --git a/tee_gateway/ohttp.py b/tee_gateway/ohttp.py index 0f9fa7f..e560634 100644 --- a/tee_gateway/ohttp.py +++ b/tee_gateway/ohttp.py @@ -286,9 +286,11 @@ def generate_keypair() -> tuple[KEMKeyInterface, bytes]: The HPKE keypair is intentionally independent of the RSA TEE signing key: deriving one from the other would create a single point of compromise (a leak of the RSA private key would also leak the OHTTP - private key, and vice versa). Both public keys are still covered by - the same nitriding attestation transcript, so verifiers get binding - without sharing key material. + private key, and vice versa). The HPKE public key is bound to the + enclave by an RSA-PSS signature from the attested signing key (see + TEEKeyManager.get_signed_hpke_config), so verifiers get binding without + sharing key material — the X25519 key itself is not part of the + nitriding attestation transcript. pyhpke 0.6 derives the keypair from random IKM via ``kem.derive_key_pair(ikm)``; we feed it ``os.urandom(32)`` so each diff --git a/tee_gateway/tee_manager.py b/tee_gateway/tee_manager.py index 98deaa6..037d062 100644 --- a/tee_gateway/tee_manager.py +++ b/tee_gateway/tee_manager.py @@ -39,8 +39,7 @@ def __init__(self, register=True): self.tee_id = None self.wallet_address = None # HPKE keypair for OHTTP-style anonymous inference. Generated in the - # same enclave boot so the X25519 public key is covered by the same - # attestation that covers the RSA signing key. + # enclave and bound to the attested signing key by get_signed_hpke_config(). self.hpke_private_key = None self.hpke_public_key_raw: bytes | None = None self._generate_keys() @@ -78,10 +77,8 @@ def _generate_keys(self): self.wallet_address = wallet_account.address # HPKE X25519 keypair — independent random material from the RSA - # signing key. Both public keys are bound to the enclave by the - # nitriding attestation transcript (register_with_nitriding below), - # so verifiers still get a single attested fingerprint covering both, - # without sharing private-key material between them. + # signing key. The HPKE public config is signed by the RSA key for + # registry V2, while Nitro attestation continues to bind the RSA key. self.hpke_private_key, self.hpke_public_key_raw = ohttp.generate_keypair() logger.info("TEE key pair generated successfully") @@ -94,41 +91,18 @@ def _generate_keys(self): def register_with_nitriding(self): """Register public key hash with nitriding. - The hash covers both the RSA signing key (DER-encoded SPKI) and the - raw X25519 HPKE public key. Including both in a single attested digest - means a verifier who validates the attestation document automatically - gets binding for the HPKE config used for anonymous inference — no - separate trust anchor required. + The current on-chain Nitro verifier expects nitriding user_data to + contain SHA256(signing public key DER) as the second hash. OHTTP/HPKE + config is bound separately by an RSA-PSS signature from this same + attested signing key. """ - # Defensive check: the v2 transcript labels both keys, so refusing to - # register is the only safe behavior when one is missing. Falling back - # to b"" would produce a digest that's nominally v2 but only covers - # RSA, and a verifier trusting the label would accept an enclave whose - # HPKE key was never attested. Raise outside the broad try/except - # below so a real misconfiguration isn't masked as a non-TEE - # environment. - if not self.hpke_public_key_raw or len(self.hpke_public_key_raw) != 32: - raise RuntimeError( - "Refusing to register with nitriding: HPKE X25519 public key " - "is missing or wrong length; the v2 attestation transcript " - "requires both RSA and HPKE keys." - ) - try: public_key_der = self.public_key.public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo, ) - # Domain-separated transcript so a future addition of more keys - # can't be confused with the existing layout. - transcript = ( - b"og-tee-keys|v2|rsa-spki=" - + public_key_der - + b"|hpke-x25519=" - + self.hpke_public_key_raw - ) - key_hash = hashlib.sha256(transcript).digest() + key_hash = hashlib.sha256(public_key_der).digest() key_hash_b64 = base64.b64encode(key_hash).decode("utf-8") logger.info(f"Public key DER length: {len(public_key_der)} bytes") @@ -199,9 +173,11 @@ def get_hpke_config(self) -> dict: """Return the HPKE key configuration for anonymous inference. ``key_config`` is the RFC 9458 §3 binary key-config blob, base64-encoded. - Clients should treat this as authoritative only when fetched alongside - the Nitro attestation document (which commits to the same key hash via - nitriding registration). + This returns the raw config only; clients should treat it as + authoritative only via the RSA-PSS signature from + get_signed_hpke_config, which binds these fields to the attested + signing key (the X25519 key is not in the nitriding attestation + transcript). """ if self.hpke_public_key_raw is None: raise RuntimeError("HPKE keypair not initialized") @@ -216,13 +192,40 @@ def get_hpke_config(self) -> dict: ).decode("ascii"), } + def get_signed_hpke_config(self) -> dict: + """Return HPKE config plus an RSA-PSS signature for registry V2. + + The signature covers the same ABI-encoded hash computed by + TEERegistryV2.computeOHTTPConfigHash, so the registry can verify that + the OHTTP key came from the enclave signing key already proven by Nitro + attestation. + """ + config = self.get_hpke_config() + public_key = bytes.fromhex(config["public_key"]) + key_config = base64.b64decode(config["key_config"]) + config_hash = compute_ohttp_config_hash( + bytes.fromhex(self.tee_id), + config["key_id"], + config["kem_id"], + config["kdf_id"], + config["aead_id"], + public_key, + key_config, + ) + return { + **config, + "tee_id": f"0x{self.tee_id}", + "signature": self.sign_data(config_hash), + "signature_hash": f"0x{config_hash.hex()}", + } + def get_attestation_document(self) -> dict: """Return TEE attestation document.""" return { "public_key": self.public_key_pem, "tee_id": f"0x{self.tee_id}", "wallet_address": self.wallet_address, - "hpke": self.get_hpke_config() if self.hpke_public_key_raw else None, + "hpke": self.get_signed_hpke_config() if self.hpke_public_key_raw else None, "timestamp": datetime.now(UTC).isoformat(), "enclave_info": { "platform": "aws-nitro", @@ -233,6 +236,39 @@ def get_attestation_document(self) -> dict: } +def compute_ohttp_config_hash( + tee_id: bytes, + key_id: int, + kem_id: int, + kdf_id: int, + aead_id: int, + ohttp_public_key: bytes, + ohttp_key_config: bytes, +) -> bytes: + """Compute TEERegistryV2.computeOHTTPConfigHash off-chain.""" + if len(tee_id) != 32: + raise ValueError("tee_id must be 32 bytes") + + def word(value: int) -> bytes: + return value.to_bytes(32, "big") + + domain = keccak(b"OPENGRADIENT_TEE_OHTTP_CONFIG_V1") + return keccak( + b"".join( + [ + domain, + tee_id, + word(key_id), + word(kem_id), + word(kdf_id), + word(aead_id), + keccak(ohttp_public_key), + keccak(ohttp_key_config), + ] + ) + ) + + def signal_ready(): """Signal to nitriding that enclave is ready to accept traffic.""" try: diff --git a/tee_gateway/test/test_tee_core.py b/tee_gateway/test/test_tee_core.py index f719d0d..41aac33 100644 --- a/tee_gateway/test/test_tee_core.py +++ b/tee_gateway/test/test_tee_core.py @@ -18,9 +18,14 @@ from eth_hash.auto import keccak from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage +from tee_gateway import ohttp from tee_gateway.llm_backend import convert_messages, extract_usage from tee_gateway.model_registry import get_model_config, get_rate_card -from tee_gateway.tee_manager import TEEKeyManager, compute_tee_msg_hash +from tee_gateway.tee_manager import ( + TEEKeyManager, + compute_ohttp_config_hash, + compute_tee_msg_hash, +) # --------------------------------------------------------------------------- @@ -222,6 +227,203 @@ def test_output_hash_matches_keccak256_of_response(self): self.assertEqual(out_hex, keccak(resp.encode("utf-8")).hex()) +# --------------------------------------------------------------------------- +# compute_ohttp_config_hash — must byte-match TEERegistryV2.computeOHTTPConfigHash +# --------------------------------------------------------------------------- + + +class TestComputeOHTTPConfigHash(unittest.TestCase): + """The OHTTP config hash binds the enclave's HPKE key to its attested RSA + signing key. The signature returned by get_signed_hpke_config covers this + hash, so it MUST be computed exactly the way the on-chain + TEERegistryV2.computeOHTTPConfigHash does — otherwise every signature + silently fails verification on-chain. + + The layout (EIP-712 style, every field a 32-byte word): + keccak256( + keccak256("OPENGRADIENT_TEE_OHTTP_CONFIG_V1") // domain + || tee_id // bytes32 + || uint256(key_id) || uint256(kem_id) + || uint256(kdf_id) || uint256(aead_id) + || keccak256(public_key) // bytes32 + || keccak256(key_config) // bytes32 + ) + """ + + # Deterministic, non-random vectors so the golden hash below is stable. + TEE_ID = bytes(range(32)) + PUBLIC_KEY = b"\x02" * 32 + KEY_CONFIG = ohttp.key_config(PUBLIC_KEY) + + def _hash(self): + return compute_ohttp_config_hash( + self.TEE_ID, + ohttp.KEY_CONFIG_ID, + ohttp.KEM_ID_X25519, + ohttp.KDF_ID_HKDF_SHA256, + ohttp.AEAD_ID_CHACHA20_POLY1305, + self.PUBLIC_KEY, + self.KEY_CONFIG, + ) + + def test_golden_vector(self): + """Frozen expected hash for the fixed vectors above. + + IMPORTANT: this value must equal the output of + TEERegistryV2.computeOHTTPConfigHash for the same inputs. If this test + fails after an intentional encoding change, regenerate it from the + deployed contract (do NOT just paste the new Python output) — the whole + point is to catch off-chain/on-chain divergence. + """ + expected = "f2a783a780198b72145ce4f4824c3de4993975e17ee8df1e59f3b5dd78a0f1d9" + self.assertEqual(self._hash().hex(), expected) + + def test_matches_independent_reconstruction(self): + """Re-derive the layout independently from the implementation, so a + change to either side of the encoding is caught even if someone also + updates the golden vector.""" + + def word(v): + return v.to_bytes(32, "big") + + domain = keccak(b"OPENGRADIENT_TEE_OHTTP_CONFIG_V1") + expected = keccak( + domain + + self.TEE_ID + + word(ohttp.KEY_CONFIG_ID) + + word(ohttp.KEM_ID_X25519) + + word(ohttp.KDF_ID_HKDF_SHA256) + + word(ohttp.AEAD_ID_CHACHA20_POLY1305) + + keccak(self.PUBLIC_KEY) + + keccak(self.KEY_CONFIG) + ) + self.assertEqual(self._hash(), expected) + + def test_returns_32_bytes(self): + self.assertEqual(len(self._hash()), 32) + + def test_rejects_wrong_length_tee_id(self): + with self.assertRaises(ValueError): + compute_ohttp_config_hash( + b"\x00" * 31, # not 32 bytes + ohttp.KEY_CONFIG_ID, + ohttp.KEM_ID_X25519, + ohttp.KDF_ID_HKDF_SHA256, + ohttp.AEAD_ID_CHACHA20_POLY1305, + self.PUBLIC_KEY, + self.KEY_CONFIG, + ) + + def test_each_field_affects_the_hash(self): + """Flipping any single field must change the digest — confirms no field + is dropped from the preimage.""" + base = self._hash() + variants = [ + ( + bytes(reversed(self.TEE_ID)), + ohttp.KEY_CONFIG_ID, + ohttp.KEM_ID_X25519, + ohttp.KDF_ID_HKDF_SHA256, + ohttp.AEAD_ID_CHACHA20_POLY1305, + self.PUBLIC_KEY, + self.KEY_CONFIG, + ), + ( + self.TEE_ID, + ohttp.KEY_CONFIG_ID + 1, + ohttp.KEM_ID_X25519, + ohttp.KDF_ID_HKDF_SHA256, + ohttp.AEAD_ID_CHACHA20_POLY1305, + self.PUBLIC_KEY, + self.KEY_CONFIG, + ), + ( + self.TEE_ID, + ohttp.KEY_CONFIG_ID, + ohttp.KEM_ID_X25519 + 1, + ohttp.KDF_ID_HKDF_SHA256, + ohttp.AEAD_ID_CHACHA20_POLY1305, + self.PUBLIC_KEY, + self.KEY_CONFIG, + ), + ( + self.TEE_ID, + ohttp.KEY_CONFIG_ID, + ohttp.KEM_ID_X25519, + ohttp.KDF_ID_HKDF_SHA256 + 1, + ohttp.AEAD_ID_CHACHA20_POLY1305, + self.PUBLIC_KEY, + self.KEY_CONFIG, + ), + ( + self.TEE_ID, + ohttp.KEY_CONFIG_ID, + ohttp.KEM_ID_X25519, + ohttp.KDF_ID_HKDF_SHA256, + ohttp.AEAD_ID_CHACHA20_POLY1305 + 1, + self.PUBLIC_KEY, + self.KEY_CONFIG, + ), + ( + self.TEE_ID, + ohttp.KEY_CONFIG_ID, + ohttp.KEM_ID_X25519, + ohttp.KDF_ID_HKDF_SHA256, + ohttp.AEAD_ID_CHACHA20_POLY1305, + b"\x03" * 32, + self.KEY_CONFIG, + ), + ( + self.TEE_ID, + ohttp.KEY_CONFIG_ID, + ohttp.KEM_ID_X25519, + ohttp.KDF_ID_HKDF_SHA256, + ohttp.AEAD_ID_CHACHA20_POLY1305, + self.PUBLIC_KEY, + self.KEY_CONFIG + b"\x00", + ), + ] + for v in variants: + self.assertNotEqual(compute_ohttp_config_hash(*v), base) + + +class TestSignedHPKEConfig(unittest.TestCase): + """get_signed_hpke_config must return a signature that verifies against the + enclave's attested RSA key over the OHTTP config hash — this is the binding + a relay/registry relies on now that the HPKE key is no longer in the + nitriding attestation transcript.""" + + def setUp(self): + self.tee = TEEKeyManager(register=False) + + def test_signature_verifies_over_config_hash(self): + cfg = self.tee.get_signed_hpke_config() + + # Recompute the hash the same way an off-chain verifier would. + config_hash = compute_ohttp_config_hash( + bytes.fromhex(self.tee.tee_id), + cfg["key_id"], + cfg["kem_id"], + cfg["kdf_id"], + cfg["aead_id"], + bytes.fromhex(cfg["public_key"]), + base64.b64decode(cfg["key_config"]), + ) + self.assertEqual(cfg["signature_hash"], f"0x{config_hash.hex()}") + + # The signature must verify against the attested signing key. + self.tee.public_key.verify( + base64.b64decode(cfg["signature"]), + config_hash, + padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=32), + hashes.SHA256(), + ) + + def test_tee_id_field_has_0x_prefix(self): + cfg = self.tee.get_signed_hpke_config() + self.assertEqual(cfg["tee_id"], f"0x{self.tee.tee_id}") + + # --------------------------------------------------------------------------- # model_registry # ---------------------------------------------------------------------------