Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 139 additions & 4 deletions apps/aggregator/migrations/0001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,16 @@ CREATE TABLE releases (
verified_at TEXT NOT NULL,
tombstoned_at TEXT, -- soft delete (publisher deleted record)
PRIMARY KEY (did, package, version),
FOREIGN KEY (did, package) REFERENCES packages(did, slug)
-- ON DELETE CASCADE because Jetstream events for a publisher can arrive
-- in arbitrary order under network reorder. A publisher who deletes their
-- profile (and all releases) emits the events in author-order, but the
-- profile-delete might land at the consumer before the release-deletes.
-- Without cascade, the consumer would have to either skip the profile
-- delete (leaving stale rows) or sequence retries, neither of which is
-- worth the complexity. Releases are version-immutable from a publishing
-- perspective, but a publisher is still entitled to remove their entire
-- package; cascade mirrors that intent.
FOREIGN KEY (did, package) REFERENCES packages(did, slug) ON DELETE CASCADE
);

CREATE INDEX idx_releases_latest ON releases(did, package, version_sort DESC) WHERE tombstoned_at IS NULL;
Expand All @@ -67,16 +76,90 @@ CREATE INDEX idx_releases_cts ON releases(cts);
-- Audit trail for rejected duplicate-version attempts. FAIR PR #77 makes
-- versions immutable: a second record at the same (did, package, version) is
-- rejected at the SQL layer and logged here for forensics.
--
-- The UNIQUE constraint dedupes attempts by content (CID), not by raw bytes.
-- CAR bytes include the publisher's commit + MST proof which churns whenever
-- the publisher writes any other record in the same repo, so byte-equality
-- would misclassify benign retries as new attempts and bloat the table.
-- The CID is content-addressed and stable for an unchanged record.
--
-- The consumer's INSERT uses `ON CONFLICT … DO UPDATE SET rejected_at,
-- attempted_record_blob = excluded.{rejected_at, attempted_record_blob}` so
-- the row tracks the latest attempt timestamp + the latest envelope bytes
-- (newer proofs supersede older ones in the forensics column).
CREATE TABLE release_duplicate_attempts (
did TEXT NOT NULL,
package TEXT NOT NULL,
version TEXT NOT NULL,
-- CID of the verified record (stable for content; changes only when the
-- record itself changes). Used as the dedup key.
attempted_cid TEXT NOT NULL,
rejected_at TEXT NOT NULL,
reason TEXT NOT NULL,
attempted_record_blob BLOB NOT NULL
-- Raw CAR bytes from the most recent attempt. Kept for forensics so
-- operators can inspect what was actually attempted even if the
-- publisher has since deleted the offending record.
attempted_record_blob BLOB NOT NULL,
UNIQUE (did, package, version, attempted_cid)
);

CREATE INDEX idx_release_duplicates ON release_duplicate_attempts(did, package, version);
-- The UNIQUE constraint creates an implicit index on
-- (did, package, version, attempted_record_blob); a separate index on the
-- (did, package, version) prefix is redundant for both lookups (the implicit
-- index handles prefix seeks) and inserts (one fewer index to maintain).

------------------------------------------------------------------------------
-- Publishers: identity-level publisher profiles + verification claims
------------------------------------------------------------------------------

-- One row per publisher DID (rkey is always literal `self`). Optional: a DID
-- may publish packages without ever publishing a publisher.profile, in which
-- case the row is absent and clients fall back to the handle. This table is
-- the canonical source for "who is publishing these packages?" — distinct from
-- `packages.authors`, which is per-package and remains authoritative for that
-- package.
CREATE TABLE publishers (
did TEXT PRIMARY KEY,
display_name TEXT NOT NULL, -- bound by verification records — see publisher_verifications
description TEXT,
url TEXT,
contact TEXT, -- JSON array of { kind, url?, email? }
updated_at TEXT,
record_blob BLOB NOT NULL,
signature_metadata TEXT, -- JSON: head CID, signing key id
verified_at TEXT NOT NULL
);

-- Verification claims: issuer DID vouches for subject DID as a trusted
-- publisher. The rkey is a TID, so an issuer can issue multiple claims (e.g.
-- delegated + official) and we store each as its own row. Validity is bound to
-- the subject's handle + publisher.profile.displayName at issuance time:
-- clients re-resolve those at read time and treat the claim as not in force if
-- either has changed. Ingest stores the facts; the validity check is a
-- query-time concern.
CREATE TABLE publisher_verifications (
issuer_did TEXT NOT NULL, -- DID of the repo that wrote the record
rkey TEXT NOT NULL, -- TID
subject_did TEXT NOT NULL,
subject_handle TEXT NOT NULL, -- bound at issuance; query-time validity check compares against current
subject_display_name TEXT NOT NULL, -- bound at issuance; query-time validity check compares against current
created_at TEXT NOT NULL,
expires_at TEXT,
record_blob BLOB NOT NULL,
signature_metadata TEXT,
verified_at TEXT NOT NULL,
tombstoned_at TEXT,
PRIMARY KEY (issuer_did, rkey)
);

-- Hot path: "show me all unexpired, non-tombstoned verifications for subject X".
-- Partial index keeps the index small by excluding tombstoned rows.
CREATE INDEX idx_publisher_verifications_subject ON publisher_verifications(subject_did)
WHERE tombstoned_at IS NULL;

-- For periodic expiry sweeps.
CREATE INDEX idx_publisher_verifications_expires ON publisher_verifications(expires_at)
WHERE expires_at IS NOT NULL AND tombstoned_at IS NULL;

------------------------------------------------------------------------------
-- Mirror tracking (populated when the artifact mirror lands)
Expand Down Expand Up @@ -204,10 +287,62 @@ CREATE TABLE ingest_state (

-- Known publisher DIDs we've seen via Jetstream or Constellation. Reconciliation
-- iterates this table; cold-start backfill seeds it from Constellation.
--
-- Doubles as the DID-document resolution cache: `pds`, `signing_key`,
-- `signing_key_id` are populated by the records consumer on first verification
-- and refreshed when `pds_resolved_at` is older than the consumer's TTL
-- (currently 24h, applied at query time as
-- `pds_resolved_at > datetime('now', '-1 day')`). Backfill may insert a row
-- with these fields null; the consumer's first event for that DID forces a
-- resolution and UPDATE.
CREATE TABLE known_publishers (
did TEXT PRIMARY KEY,
pds TEXT, -- cached PDS endpoint from DID document
pds_resolved_at TEXT,
signing_key TEXT, -- cached #atproto signing key (multibase)
signing_key_id TEXT, -- e.g. 'did:plc:xxx#atproto'
pds_resolved_at TEXT, -- last successful DID-doc resolution
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL
);

------------------------------------------------------------------------------
-- Verification-failure forensics
------------------------------------------------------------------------------

-- Records that failed PDS-verified ingest (signature, MST proof, AT-URI,
-- lexicon, content-mismatch). Written instead of retrying, because these
-- failures indicate malicious or broken upstream — retrying would just burn
-- PDS round trips. Operators query this table to investigate suspected attacks
-- or upstream regressions; it is NOT used as a retry queue.
--
-- Distinct from the configured Cloudflare DLQ (`emdash-aggregator-records-dlq`,
-- see wrangler.jsonc), which receives messages after `max_retries` exhausted —
-- that is for transient failures (PDS down, profile-not-yet-arrived). Two
-- distinct failure modes, two distinct destinations.
--
-- `payload` holds the unverified record bytes from the Jetstream event so an
-- operator can inspect what was attempted without going back to the source PDS.
CREATE TABLE dead_letters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
did TEXT NOT NULL,
collection TEXT NOT NULL,
rkey TEXT NOT NULL,
-- Reason code; matches the `DeadLetterReason` union in records-consumer.ts.
-- Current values: 'RECORD_NOT_FOUND', 'RESPONSE_TOO_LARGE', 'INVALID_PROOF',
-- 'PDS_HTTP_ERROR', 'LEXICON_VALIDATION_FAILED', 'RKEY_MISMATCH',
-- 'CONTACT_VALIDATION_FAILED', 'INVALID_VERSION', 'UNKNOWN_COLLECTION',
-- 'UNEXPECTED_ERROR'.
reason TEXT NOT NULL,
-- Free-form context (which field, expected vs got, library error message, etc.).
detail TEXT,
-- UTF-8 encoded JSON bytes of `RecordsJob.jetstreamRecord` when present, or a
-- fallback envelope `{operation, cid}` for delete events that don't carry one.
-- Stored as BLOB so future formats (CBOR, raw record bytes) can land here
-- without a schema change; today operators must `CAST(payload AS TEXT)` to
-- read.
payload BLOB NOT NULL,
received_at TEXT NOT NULL DEFAULT (datetime('now'))
);

CREATE INDEX idx_dead_letters_did ON dead_letters(did);
CREATE INDEX idx_dead_letters_received ON dead_letters(received_at);
1 change: 1 addition & 0 deletions apps/aggregator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"@atcute/client": "catalog:",
"@atcute/crypto": "catalog:",
"@atcute/firehose": "catalog:",
"@atcute/identity": "catalog:",
"@atcute/identity-resolver": "catalog:",
"@atcute/jetstream": "catalog:",
"@atcute/lexicons": "catalog:",
Expand Down
12 changes: 12 additions & 0 deletions apps/aggregator/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,20 @@
* NSIDs the aggregator subscribes to via Jetstream and verifies via PDS
* fetches. Will migrate to FAIR-namespaced equivalents once those NSIDs
* stabilise.
*
* Two record families:
* - `package.*` — per-package metadata (profile + immutable releases) backing
* the discovery / install path.
* - `publisher.*` — identity-level metadata about the publishing entity
* (`publisher.profile`, rkey `self`) and verification claims about it
* (`publisher.verification`, rkey TID). Verifications are bound to the
* subject's handle + publisher.profile.displayName at issuance time;
* the consumer stores facts as observed and clients re-check validity at
* read time.
*/
export const WANTED_COLLECTIONS = [
"com.emdashcms.experimental.package.profile",
"com.emdashcms.experimental.package.release",
"com.emdashcms.experimental.publisher.profile",
"com.emdashcms.experimental.publisher.verification",
] as const;
Loading
Loading