✨ feat: session-tracking ingest path (sessions table, IngestTurn, worker dispatch)#212
✨ feat: session-tracking ingest path (sessions table, IngestTurn, worker dispatch)#212yeazelm wants to merge 2 commits into
Conversation
16feba2 to
e97203e
Compare
|
| Filename | Overview |
|---|---|
| pkg/storage/postgres/session_ingest.go | Core transactional ingest logic — well-structured and atomic, but parent-placeholder lookup uses child's HarnessID, causing silent FK drift when parent and child belong to different harnesses. |
| migrations/1779329142_session_tracking.up.sql | Adds sessions table and converts nodes to composite PK (org_id, hash); well-commented rationale. Composite PK change silently widens the scope of existing UPDATE queries (e.g., UpdateUsage). |
| migrations/1779329142_session_tracking.down.sql | Handles cross-org dedup before restoring single-column PK; destructive by design and clearly documented. |
| pkg/sessions/ingest_envelope.go | Validation gate and normalization helpers; correctly handles nil receiver, UUID parse check, empty parent pointer, and non-object harness_metadata. |
| proxy/worker/pool.go | Clean dispatch logic with compile-time interface assertion; session-aware and legacy paths coexist correctly. |
| pkg/storage/postgres/gensqlc/sessions.sql.go | Generated SQLC code; all session queries consistent with SQL source. Placeholder ON CONFLICT DO UPDATE no-op correctly enables RETURNING on conflict. |
| pkg/storage/postgres/queries/get_node.sql | Read-path queries unscoped by org_id; explicitly deferred per code comments. GetNode (:one) is non-deterministic when two orgs share the same hash. |
| pkg/storage/postgres/session_ingest_test.go | Comprehensive integration tests covering happy path, idempotency, parent FK resolution, placeholder backfill, and synthetic session ID derivation. |
Sequence Diagram
sequenceDiagram
participant HTTP as ingest HTTP handler
participant W as Worker (pool.go)
participant SI as IngestTurn (session_ingest.go)
participant PG as Postgres
HTTP->>HTTP: Validate() envelope
HTTP->>W: "Enqueue(Job{Session, Req, Resp})"
W->>W: buildTurnChain()
alt "driver implements SessionIngester AND Job.Session != nil"
W->>SI: IngestTurn(ctx, req)
SI->>PG: BEGIN Tx
SI->>PG: resolveHarnessSessionID
SI->>PG: GetSessionByNaturalKey (parent lookup)
alt parent not found
SI->>PG: InsertSessionPlaceholder
end
SI->>PG: UpsertSession
loop each node
SI->>PG: InsertNode ON CONFLICT DO NOTHING
alt "rows > 0"
SI->>PG: SetNodeSessionID
end
end
alt new nodes inserted
SI->>PG: UpdateSessionCounters
end
SI->>PG: COMMIT
SI-->>W: IngestTurnResult
else legacy path
W->>PG: Put() per node
end
Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
pkg/storage/postgres/session_ingest.go:270-293
**Parent placeholder uses child's `HarnessID` for the natural-key lookup**
`resolveParentSessionID` looks up the fork-parent using `HarnessID: envelope.HarnessIDOrUnknown()` (the child's harness). If the parent session was created under a different `HarnessID` (e.g., parent is from harness `"cursor"`, child is from `"claude"`), the `GetSessionByNaturalKey` call fails with no rows, and `InsertSessionPlaceholder` inserts `(org_id, child_harness_id, parent_key)` as the placeholder. When the parent's first real turn later lands with `(org_id, parent_harness_id, parent_key)`, `UpsertSession` finds no conflict on the natural key and creates a brand-new session row — while the child's `parent_session_id` FK still points at the orphaned placeholder with the wrong `harness_id`. The mismatch is silent; there is no validation that parent and child must share the same `HarnessID`, and no `ParentHarnessID` field on the envelope to encode the correct key.
Reviews (2): Last reviewed commit: "✨ feat: ingest session-bearing turns end..." | Re-trigger Greptile
Materializes the sessions table. Ingest will UPSERT a row keyed by (org_id, harness_id, harness_session_id) on each turn, attach nodes.session_id, and roll up the token/cost counters in the same transaction as the node insert. The DDL has no FK on org_id — this repo does not define an `orgs` table, so deployments that have one can layer the FK separately. The NOT NULL UUID org_id column is preserved.
Adds the ingest path for the session-tracking envelope: optional
Session block on /v1/ingest, a Postgres transactional implementation
behind storage.SessionIngester, and worker dispatch that routes
session-bearing turns through it. Callers without an envelope (and
the in-memory driver) keep working unchanged.
Postgres IngestTurn runs the whole transaction inside one pgx Tx
so a failure on any step rolls back every other write:
1. UPSERT the sessions row by (org_id, harness_id,
harness_session_id), minting a UUIDv7 app-side because
Postgres 17 has no native uuidv7().
2. Resolve the optional fork-parent FK. If the parent natural
key hasn't landed yet, placeholder-insert it so the FK chain
stays consistent and back-fills when the parent's first real
turn arrives.
3. Insert every node in the turn chain through the existing
InsertNode query, then stamp session_id on each row that was
newly inserted (rowcount > 0). Duplicate-hash retries are a
no-op on stamping because the existing row already FKs to
the correct session.
4. Update the per-turn counters (turn_count,
total_input_tokens, total_output_tokens, total_cost_usd,
last_seen_at) ONLY when at least one node was actually
inserted this call — so a retried envelope is a true no-op
on counters and end-to-end idempotency holds.
Worker dispatch (storeConversationTurn) builds the full
root-to-leaf chain of nodes up-front, then routes by capability:
driver implements storage.SessionIngester AND the Job carries a
session envelope → call IngestTurn; otherwise → the legacy
per-node Put loop. A compile-time interface assertion in the
worker prevents future signature drift from silently downgrading
to the per-node path.
Decisions worth flagging:
* Synthetic harness_session_id is the root node hash's first 16
hex chars (64 bits). The captured turn's root is a SHA-256 of
canonicalized JSON, so the prefix is effectively uniformly
random and far below the birthday bound for any plausible
org's synthetic-session population. The prefix length is
documented inline.
* When the envelope is nil or carries an empty OrgID, the
sentinel nil-UUID is written rather than letting the NOT NULL
constraint reject the row. This repo has no orgs(id) table to
FK against, so this keeps "no known org" turns persistable;
deployments that layer the orgs FK can filter on the sentinel
to triage them.
* CostUSD is stubbed at 0: this repo's worker has no pricing
lookup wired in, and sessions.total_cost_usd defaults to 0 so
a 0-delta UPDATE is a true no-op. The path exists so a
pricing layer can be added without re-touching this code.
| parent, err := qtx.GetSessionByNaturalKey(ctx, gensqlc.GetSessionByNaturalKeyParams{ | ||
| OrgID: orgID, | ||
| HarnessID: envelope.HarnessIDOrUnknown(), | ||
| HarnessSessionID: parentKey, | ||
| }) | ||
| if err == nil { | ||
| return parent.ID, nil | ||
| } | ||
| if !errors.Is(err, pgx.ErrNoRows) { | ||
| return pgtype.UUID{}, fmt.Errorf("lookup parent session: %w", err) | ||
| } | ||
|
|
||
| placeholderID, err := newAppUUID() | ||
| if err != nil { | ||
| return pgtype.UUID{}, fmt.Errorf("mint placeholder uuid: %w", err) | ||
| } | ||
| id, err := qtx.InsertSessionPlaceholder(ctx, gensqlc.InsertSessionPlaceholderParams{ | ||
| ID: placeholderID, | ||
| OrgID: orgID, | ||
| AuthSubject: envelope.AuthSubject, | ||
| HarnessID: envelope.HarnessIDOrUnknown(), | ||
| HarnessSessionID: parentKey, | ||
| Now: now, | ||
| }) |
There was a problem hiding this comment.
Parent placeholder uses child's
HarnessID for the natural-key lookup
resolveParentSessionID looks up the fork-parent using HarnessID: envelope.HarnessIDOrUnknown() (the child's harness). If the parent session was created under a different HarnessID (e.g., parent is from harness "cursor", child is from "claude"), the GetSessionByNaturalKey call fails with no rows, and InsertSessionPlaceholder inserts (org_id, child_harness_id, parent_key) as the placeholder. When the parent's first real turn later lands with (org_id, parent_harness_id, parent_key), UpsertSession finds no conflict on the natural key and creates a brand-new session row — while the child's parent_session_id FK still points at the orphaned placeholder with the wrong harness_id. The mismatch is silent; there is no validation that parent and child must share the same HarnessID, and no ParentHarnessID field on the envelope to encode the correct key.
Prompt To Fix With AI
This is a comment left during a code review.
Path: pkg/storage/postgres/session_ingest.go
Line: 270-293
Comment:
**Parent placeholder uses child's `HarnessID` for the natural-key lookup**
`resolveParentSessionID` looks up the fork-parent using `HarnessID: envelope.HarnessIDOrUnknown()` (the child's harness). If the parent session was created under a different `HarnessID` (e.g., parent is from harness `"cursor"`, child is from `"claude"`), the `GetSessionByNaturalKey` call fails with no rows, and `InsertSessionPlaceholder` inserts `(org_id, child_harness_id, parent_key)` as the placeholder. When the parent's first real turn later lands with `(org_id, parent_harness_id, parent_key)`, `UpsertSession` finds no conflict on the natural key and creates a brand-new session row — while the child's `parent_session_id` FK still points at the orphaned placeholder with the wrong `harness_id`. The mismatch is silent; there is no validation that parent and child must share the same `HarnessID`, and no `ParentHarnessID` field on the envelope to encode the correct key.
How can I resolve this? If you propose a fix, please make it concise.
Fixes PCC-561.
Resolves PCC-533.
Summary
Adds the cloud-side ingest path for the session-tracking envelope: optional
sessionblock on/v1/ingest, a Postgres transactional implementation behindstorage.SessionIngester, and worker dispatch that routes session-bearing turns through it. Legacy clients (no envelope, in-memory driver) keep working unchanged.The Postgres
IngestTurnruns the whole transaction inside one pgxTxso a failure on any step rolls back every other write:sessionsrow by(org_id, harness_id, harness_session_id)— UUIDv7 minted app-side because Postgres 17 has no nativeuuidv7().InsertNodequery, then stampsession_idon each row that was newly inserted (rowcount > 0). Duplicate-hash retries are no-ops on stamping because the existing row already FKs to the correct session.turn_count,total_input_tokens,total_output_tokens,total_cost_usd,last_seen_at) ONLY when at least one node was actually inserted — so a retried envelope is a true no-op on counters and end-to-end idempotency holds.Worker dispatch builds the full root-to-leaf chain up-front, then routes by capability: driver implements
storage.SessionIngesterAND the Job carries a session envelope → callIngestTurn; otherwise → the legacy per-nodePutloop. A compile-time interface assertion in the worker prevents future signature drift from silently downgrading to the per-node path.Decisions worth flagging
OrgID— this repo has noorgs(id)table to FK against, so this keeps "no known org" turns persistable; deployments that layer the orgs FK can filter on the sentinel to triage.CostUSDstubbed at 0: no pricing lookup in this repo's worker.sessions.total_cost_usddefaults to 0 so a 0-delta UPDATE is a true no-op — a pricing layer can be added on top without re-touching this code.Test plan
make checkgreen (sqlc + dagger)session{org_id, harness_id, harness_session_id}→ row appears insessions, nodes carry the FKCompanion PRs
This is the cloud-side ingest piece of an umbrella feature. Two companion PRs ship the upstream pieces and supply the envelope this PR consumes:
papercomputeco/tapes-extproc#8— Envoy ext_proc filter that builds thesessionblock on the way to ingestpapercomputeco/paper#50— daemon-side capture of the harness-session identifiers that feed the envelope