WIP: wire orchestration v2 provider adapters#2829
Conversation
Co-authored-by: codex <codex@users.noreply.github.com>
- Initialize provider as unchecked in a pending state - Update initial probe message to reflect session-local status
- Type the runtime effect with `Scope` - Build the ACP session runtime without wrapping it in `Effect.scoped`
- Use strict TurnId and ProviderItemId parsing in Codex session routing - Decode in-memory stdio chunks in streaming mode to avoid split UTF-8 corruption
- Transfer session-owned scopes into adapter state - Ensure runtime scopes close on stop and startup failure - Add regression coverage for scoped lifecycle cleanup
- Close the managed native event logger when the adapter layer tears down - Make session runtime close idempotent with an atomic closed flag - Add coverage for flushing thread native logs on shutdown
- Use codex app-server snapshots for auth, models, and skills - Remove legacy CLI/config discovery paths and related helpers - Update tests for the new provider status flow
Co-authored-by: codex <codex@users.noreply.github.com>
Co-authored-by: codex <codex@users.noreply.github.com>
- Document the target orchestration graph, IDs, lifecycles, and capability model - Add Codex app-server probe fixtures and update the probe test harness
- Introduce orchestration v2 service interfaces and error types - Add replay runtime, fixtures, and integration coverage - Update shared contracts and probe transcripts Co-authored-by: codex <codex@users.noreply.github.com>
- Add Codex adapter and replay harness wiring - Introduce in-memory orchestration projections and provider registry - Expand orchestration contracts for turn and runtime events
Co-authored-by: codex <codex@users.noreply.github.com>
- Add context transfer IDs, schemas, and projections - Support cheap fork creation and Codex native fork rollback - Cover fork idempotency and replay behavior in tests
- Track remaining projection, context transfer, rollback, capability, and subagent work - Clarify current V2 baseline and debugger-only follow-ups
- Map fork and merge-back turns into stored handoffs and transfer resolutions - Add shell snapshot projection support plus coverage tests - Update replay fixtures and web contracts for the new turn flow
Co-authored-by: codex <codex@users.noreply.github.com>
- Move Codex replay recording into `apps/server` - Add Claude Agent SDK replay fixtures and test harness - Update orchestration-v2 fixture scenarios and docs
- Move Claude provider runtime logic into its own module - Share the SDK query runner between live and replay paths - Add replay driver error wrapping for unexpected failures
Port orchestration V2 provider adapter wiring to the provider-instance driver registry. Co-authored-by: codex <codex@users.noreply.github.com>
- persist the selected model on run records - surface run model selection in the debug UI - update replay fixtures and contracts for the new field
- Record Claude SDK transcripts across multiple prompts and restart/query modes - Add approval and tool-call replay coverage for new orchestration fixtures - Update Claude adapter testkit to model open/prompt/permission frames
- Derive Claude SDK query options from runtime policy - Add read-only replay fixture and policy mapping tests - Reuse shared approval-policy fixtures across orchestrator tests Co-authored-by: codex <codex@users.noreply.github.com>
- add active steering and interrupt-restart replay fixtures - update Claude adapter/orchestrator turn handling for steering - refresh replay and integration test coverage
- add interrupt and mid-tool replay fixtures for Claude and Codex - log Claude Agent SDK protocol frames to native event traces - project Codex commandExecution start events into orchestration updates
- Map Cursor SDK agents and runs to V2 thread and turn lifecycles - Update MCP capability, tool, and testing guidance for SDK-based injection
Co-authored-by: codex <codex@users.noreply.github.com>
Co-authored-by: codex <codex@users.noreply.github.com>
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
| const append: EventStoreV2Shape["append"] = (input) => | ||
| Effect.forEach( | ||
| input.events, | ||
| (event) => | ||
| Effect.gen(function* () { | ||
| const encoded = yield* encodeEventJson(event); | ||
| const normalized = parseJson(encoded) as { | ||
| readonly payload: unknown; | ||
| readonly occurredAt: string; | ||
| }; | ||
|
|
||
| const rows = yield* sql<EventRow>` | ||
| INSERT INTO orchestration_v2_events ( | ||
| event_id, | ||
| command_id, | ||
| thread_id, | ||
| run_id, | ||
| node_id, | ||
| provider, | ||
| raw_event_id, | ||
| event_type, | ||
| occurred_at, | ||
| payload_json | ||
| ) | ||
| VALUES ( | ||
| ${event.id}, | ||
| ${input.commandId ?? null}, | ||
| ${event.threadId}, | ||
| ${event.runId ?? null}, | ||
| ${event.nodeId ?? null}, | ||
| ${event.provider ?? null}, | ||
| ${event.rawEventId ?? null}, | ||
| ${event.type}, | ||
| ${normalized.occurredAt}, | ||
| ${encodeUnknownJsonString(normalized.payload)} | ||
| ) | ||
| RETURNING | ||
| sequence, | ||
| event_id, | ||
| command_id, | ||
| thread_id, | ||
| run_id, | ||
| node_id, | ||
| provider, | ||
| raw_event_id, | ||
| event_type, | ||
| occurred_at, | ||
| payload_json | ||
| `; | ||
| const row = rows[0]; | ||
| if (!row) { | ||
| return yield* new EventStoreAppendEventsError({ | ||
| eventCount: input.events.length, | ||
| cause: "Insert did not return a stored event row.", | ||
| }); | ||
| } | ||
| return yield* rowToStoredEvent(row).pipe( | ||
| Effect.mapError( | ||
| (cause) => | ||
| new EventStoreAppendEventsError({ | ||
| eventCount: input.events.length, | ||
| cause, | ||
| }), | ||
| ), | ||
| ); | ||
| }), | ||
| { concurrency: 1 }, | ||
| ).pipe( | ||
| Effect.mapError( | ||
| (cause) => | ||
| new EventStoreAppendEventsError({ | ||
| eventCount: input.events.length, | ||
| cause, | ||
| }), | ||
| ), | ||
| ); |
There was a problem hiding this comment.
🟢 Low orchestration-v2/EventStore.ts:146
When rowToStoredEvent fails or the !row check triggers, the error is already an EventStoreAppendEventsError. The outer Effect.mapError on lines 214-220 then wraps it again, producing nested errors like { cause: EventStoreAppendEventsError { cause: ... } } with duplicated eventCount fields. Consider using Effect.catchAll with a type guard to avoid re-wrapping already-typed errors, or remove the inner error construction and let the outer handler do the work.
- ).pipe(
- Effect.mapError(
- (cause) =>
- new EventStoreAppendEventsError({
- eventCount: input.events.length,
- cause,
- }),
- ),
- );🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/server/src/orchestration-v2/EventStore.ts around lines 146-221:
When `rowToStoredEvent` fails or the `!row` check triggers, the error is already an `EventStoreAppendEventsError`. The outer `Effect.mapError` on lines 214-220 then wraps it again, producing nested errors like `{ cause: EventStoreAppendEventsError { cause: ... } }` with duplicated `eventCount` fields. Consider using `Effect.catchAll` with a type guard to avoid re-wrapping already-typed errors, or remove the inner error construction and let the outer handler do the work.
Evidence trail:
apps/server/src/orchestration-v2/EventStore.ts lines 146-221 (REVIEWED_COMMIT): Inner `EventStoreAppendEventsError` created at lines 197-200 and 203-209; outer `Effect.mapError` at lines 214-220 unconditionally wraps all errors into another `EventStoreAppendEventsError`. Lines 18-28 define `EventStoreAppendEventsError` with `eventCount` and `cause` fields.
| return decodeTranscript({ | ||
| ...metadata, | ||
| entries, | ||
| }); | ||
| }); |
There was a problem hiding this comment.
🟢 Low testkit/ReplayTranscriptNdjson.ts:116
The call to decodeTranscript at line 116 invokes Schema.decodeUnknownSync, which throws on validation failure. Since this isn't wrapped in Effect.try, any validation error becomes an uncaught exception (defect) instead of a typed ProviderReplayNdjsonParseError. This breaks the function's declared error contract. Consider wrapping the call in Effect.try to catch the exception and convert it to the declared error type.
- return decodeTranscript({
- ...metadata,
- entries,
- });
+ return yield* Effect.try({
+ try: () =>
+ decodeTranscript({
+ ...metadata,
+ entries,
+ }),
+ catch: (cause) =>
+ new ProviderReplayNdjsonLineParseError({
+ lineNumber: lines.length,
+ line: "<transcript validation>",
+ cause,
+ }),
+ });🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/server/src/orchestration-v2/testkit/ReplayTranscriptNdjson.ts around lines 116-120:
The call to `decodeTranscript` at line 116 invokes `Schema.decodeUnknownSync`, which throws on validation failure. Since this isn't wrapped in `Effect.try`, any validation error becomes an uncaught exception (defect) instead of a typed `ProviderReplayNdjsonParseError`. This breaks the function's declared error contract. Consider wrapping the call in `Effect.try` to catch the exception and convert it to the declared error type.
Evidence trail:
apps/server/src/orchestration-v2/testkit/ReplayTranscriptNdjson.ts lines 50-53: `decodeTranscript = Schema.decodeUnknownSync(ProviderReplayTranscript)` — throws on failure.
Line 116-118: `return decodeTranscript({...metadata, entries})` — called directly inside Effect.gen without Effect.try wrapper.
Lines 55-68 (`parseReplayRecord`): same pattern but correctly wrapped in `Effect.try`.
Line 80: function declares error type `ProviderReplayNdjsonParseError`.
packages/contracts/src/orchestrationV2.ts lines 1561-1568: `ProviderReplayTranscript` schema with `TrimmedNonEmptyString` fields that can fail validation.
| for (const [instanceId, existing] of previous) { | ||
| if (!nextIds.has(instanceId)) { | ||
| staleEntries.push(existing); | ||
| } |
There was a problem hiding this comment.
🟡 Medium orchestration-v2/ProviderAdapterRegistry.ts:356
When a config entry's driver changes from a known driver to an unknown driver, the old adapter's scope is leaked without being closed. The cleanup loop at line 357 uses !nextIds.has(instanceId) to detect stale entries, but nextIds is populated before the unknown driver check (line 327), so entries with unknown drivers remain in nextIds and bypass cleanup. Consider changing the check to !nextEntries.has(instanceId) so adapters with unknown drivers are properly closed.
- for (const [instanceId, existing] of previous) {
- if (!nextIds.has(instanceId)) {
+ for (const [instanceId, existing] of previous) {
+ if (!nextEntries.has(instanceId)) {🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/server/src/orchestration-v2/ProviderAdapterRegistry.ts around lines 356-359:
When a config entry's driver changes from a known driver to an unknown driver, the old adapter's scope is leaked without being closed. The cleanup loop at line 357 uses `!nextIds.has(instanceId)` to detect stale entries, but `nextIds` is populated before the unknown driver check (line 327), so entries with unknown drivers remain in `nextIds` and bypass cleanup. Consider changing the check to `!nextEntries.has(instanceId)` so adapters with unknown drivers are properly closed.
Evidence trail:
...
| const sql = yield* SqlClient.SqlClient; | ||
| const eventStore = yield* EventStoreV2; | ||
| const projectionStore = yield* ProjectionStoreV2; | ||
| const liveEvents = yield* PubSub.unbounded<OrchestrationV2StoredEvent>(); |
There was a problem hiding this comment.
🟠 High orchestration-v2/EventSink.ts:89
Events published to liveEvents between when eventStore.read completes and when Stream.fromPubSub(liveEvents) subscribes are lost because the PubSub has no replay buffer. Stream.concat only subscribes to the second stream after the first finishes, creating a race window where published events are dropped before the subscription exists. Consider buffering events from liveEvents until the subscription is established, or using a PubSub with replay.
- const liveEvents = yield* PubSub.unbounded<OrchestrationV2StoredEvent>();
+ const liveEvents = yield* PubSub.unbounded<OrchestrationV2StoredEvent>({ replay: 1 });🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/server/src/orchestration-v2/EventSink.ts around line 89:
Events published to `liveEvents` between when `eventStore.read` completes and when `Stream.fromPubSub(liveEvents)` subscribes are lost because the PubSub has no replay buffer. `Stream.concat` only subscribes to the second stream after the first finishes, creating a race window where published events are dropped before the subscription exists. Consider buffering events from `liveEvents` until the subscription is established, or using a PubSub with replay.
Evidence trail:
apps/server/src/orchestration-v2/EventSink.ts lines 89 (PubSub.unbounded), 112 (PubSub.publishAll), 128-141 (Stream.concat with Stream.fromPubSub); apps/server/src/provider/Services/ProviderInstanceRegistry.ts lines 55-79 (codebase documentation of this exact race condition pattern with Stream.fromPubSub deferring subscription); apps/server/src/provider/Layers/ProviderRegistry.ts line 636 (same documented pattern)
…der adapters (t3-29f.6) Assessment of upstream PR pingdotgg#2829 (pingdotgg/t3code) from juliusmarminge: WIP wire orchestration v2 provider adapters with Codex and Claude adapters, event sourcing, provider session management, and replay testkit. Relevance to target issues: - pingdotgg#2838 (session resume): HIGH — ProviderSessionManager persists session IDs and separates startSession/resumeSession operations - pingdotgg#2778 (subagent hang): MEDIUM — ProviderEventIngestor provides infrastructure to forward permission events, but UI plumbing not yet wired - pingdotgg#2886 (thread stuck working): HIGH — event-sourced projections replace mutable state flags, eliminating sticky "working" states The PR is a draft (34 commits, not merged). No OpenCode ACP adapter exists yet in v2 — OpenCode would need its own adapter wired into the ProviderAdapterRegistry. Recommend watching for merge and adding an OpenCode adapter post-merge.
…n v2 provider adapters)
The upstream PR pingdotgg#2829 added orchestrationV2 methods to the WsRpcClient interface. The test mock in service.threadSubscriptions.test.ts was missing the orchestrationV2 property, causing a typecheck failure: 'Property orchestrationV2 is missing in type...' Added orchestrationV2 mock with dispatchCommand, getThreadProjection, subscribeShell, and subscribeThread as vi.fn() stubs.
The upstream PR pingdotgg#2829 targets a newer Effect version than our fork's pinned effect@4.0.0-beta.73. Fixes: - Replace Random.nextUUIDv4 with Crypto.randomUUIDv4 (beta.73 API) - Fix deterministic Service tag keys to match fork convention (include file path segments; e.g. Adapters/ClaudeAdapterV2/...) - Replace Schema.decodeSync with Schema.decodeUnknownEffect inside Effect.gen generators (tsgo schemaSyncInEffect rule) - Replace inline Schema.encodeUnknownSync with module-level wrappers to avoid schemaSyncInEffect rule inside generators
Co-authored-by: codex <codex@users.noreply.github.com>
|
🚀 Expo continuous deployment is ready!
|
| Effect.catchCause((cause) => | ||
| Ref.get(latestProviderThread).pipe( | ||
| Effect.flatMap((providerThread) => | ||
| writeFinalRunEvents({ | ||
| run: input.run, | ||
| rootNode: input.rootNode, | ||
| checkpointScope: input.checkpointScope, | ||
| providerThread, | ||
| attempt: input.attempt, | ||
| status: "failed", | ||
| runtimePolicy: input.runtimePolicy, | ||
| }), | ||
| ), | ||
| Effect.mapError( | ||
| (writeCause) => | ||
| new RunExecutionIngestError({ | ||
| runId: input.run.id, | ||
| cause: { ingest: cause, write: writeCause }, | ||
| }), | ||
| ), | ||
| ), | ||
| ), |
There was a problem hiding this comment.
🟠 High orchestration-v2/RunExecutionService.ts:339
The catchCause handler for the event processing fiber calls writeFinalRunEvents directly (lines 347-350) instead of using finalizeRun. This bypasses the finalized ref guard, allowing duplicate final events when a turn.terminal event has already finalized the run and then the stream fails (e.g., connection drop). Consider using finalizeRun("failed") instead to ensure the deduplication guard is respected.
- Effect.catchCause((cause) =>
- Ref.get(latestProviderThread).pipe(
- Effect.flatMap((providerThread) =>
- writeFinalRunEvents({
- run: input.run,
- rootNode: input.rootNode,
- checkpointScope: input.checkpointScope,
- providerThread,
- attempt: input.attempt,
- status: "failed",
- runtimePolicy: input.runtimePolicy,
- }),
- ),🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @apps/server/src/orchestration-v2/RunExecutionService.ts around lines 339-360:
The `catchCause` handler for the event processing fiber calls `writeFinalRunEvents` directly (lines 347-350) instead of using `finalizeRun`. This bypasses the `finalized` ref guard, allowing duplicate final events when a `turn.terminal` event has already finalized the run and then the stream fails (e.g., connection drop). Consider using `finalizeRun("failed")` instead to ensure the deduplication guard is respected.
Evidence trail:
apps/server/src/orchestration-v2/RunExecutionService.ts lines 292-318 (finalizeRun definition with finalized ref guard), line 333 (turn.terminal calls finalizeRun), lines 339-350 (catchCause calls writeFinalRunEvents directly, bypassing guard)
Co-authored-by: codex <codex@users.noreply.github.com>
Co-authored-by: codex <codex@users.noreply.github.com>
Record provider fixtures for continued forks and sibling merge-backs, exercise context survival end to end, and stop completed run listeners from consuming later provider events. Co-authored-by: codex <codex@users.noreply.github.com>
Co-authored-by: codex <codex@users.noreply.github.com>
Allow a pending merge-back to be consumed while changing providers and deliver both the provider-switch history and fork delta to the selected provider. Co-authored-by: codex <codex@users.noreply.github.com>
Send full canonical target-thread context when a provider switch consumes a merge-back, and cover returning to an existing provider thread after a cross-provider fork. Co-authored-by: codex <codex@users.noreply.github.com>
- Require operate scope for v2 command dispatch - Allow read scope for v2 thread and shell subscriptions
| environment: adapterOptions.environment, | ||
| }); | ||
| const initialized = yield* Ref.make(false); | ||
| const ensureInitialized = Effect.gen(function* () { |
There was a problem hiding this comment.
🟡 Medium Adapters/CodexAdapterV2.ts:884
The ensureInitialized check-then-act pattern using Ref.get(initialized) followed by conditional logic and Ref.set(initialized, true) is not atomic. If multiple fibers call ensureInitialized concurrently via ensureThread, resumeThread, or other methods, both can read false before either sets true, causing duplicate initialize requests and initialized notifications to be sent to the Codex server. Consider using Ref.getAndSet or Ref.modify for atomic check-and-set.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @apps/server/src/orchestration-v2/Adapters/CodexAdapterV2.ts around line 884:
The `ensureInitialized` check-then-act pattern using `Ref.get(initialized)` followed by conditional logic and `Ref.set(initialized, true)` is not atomic. If multiple fibers call `ensureInitialized` concurrently via `ensureThread`, `resumeThread`, or other methods, both can read `false` before either sets `true`, causing duplicate `initialize` requests and `initialized` notifications to be sent to the Codex server. Consider using `Ref.getAndSet` or `Ref.modify` for atomic check-and-set.
Evidence trail:
apps/server/src/orchestration-v2/Adapters/CodexAdapterV2.ts lines 883-896 (REVIEWED_COMMIT): non-atomic Ref.get/Ref.set pattern with I/O in between. Callers at lines 2490, 2515, 2715, 2764, 2797, 2812 show multiple session runtime methods use `ensureInitialized` and can run concurrently as separate Effect fibers.
| ), | ||
| ), | ||
| ), | ||
| Effect.forkDetach, |
There was a problem hiding this comment.
🟡 Medium orchestration-v2/RunExecutionService.ts:369
If checkpointService.captureBaseline fails (lines 372-386), the function exits with a RunExecutionStartError but leaves providerEventFiber running indefinitely. The fiber was forked with Effect.forkDetach on line 369 and consumes events from input.session.events, but the error path lacks the Fiber.interrupt(providerEventFiber) cleanup that the startTurn error handler (line 401-427) includes.
Also found in 1 other location(s)
apps/server/src/orchestration-v2/Adapters/ClaudeAdapterV2.testkit.ts:1866
The catch block only closes
sourcePromptQueueandsourceRuntime, but if an error occurs after line 1700 (where these are already closed) during the fork phase (lines 1722-1804) or continuation phase (lines 1807-1864), the activetargetRuntimeorcontinuationRuntimewill not be closed, potentially leaking resources. The catch block also pushes aruntime_exitentry that's misleading since the source runtime already exited successfully at that point.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @apps/server/src/orchestration-v2/RunExecutionService.ts around line 369:
If `checkpointService.captureBaseline` fails (lines 372-386), the function exits with a `RunExecutionStartError` but leaves `providerEventFiber` running indefinitely. The fiber was forked with `Effect.forkDetach` on line 369 and consumes events from `input.session.events`, but the error path lacks the `Fiber.interrupt(providerEventFiber)` cleanup that the `startTurn` error handler (line 401-427) includes.
Evidence trail:
apps/server/src/orchestration-v2/RunExecutionService.ts lines 338-369 (providerEventFiber forked with Effect.forkDetach), lines 372-386 (captureBaseline error path with no fiber interrupt), lines 401-427 (startTurn error path with Fiber.interrupt(providerEventFiber) on line 402)
Also found in 1 other location(s):
- apps/server/src/orchestration-v2/Adapters/ClaudeAdapterV2.testkit.ts:1866 -- The catch block only closes `sourcePromptQueue` and `sourceRuntime`, but if an error occurs after line 1700 (where these are already closed) during the fork phase (lines 1722-1804) or continuation phase (lines 1807-1864), the active `targetRuntime` or `continuationRuntime` will not be closed, potentially leaking resources. The catch block also pushes a `runtime_exit` entry that's misleading since the source runtime already exited successfully at that point.
Summary
Validation
Notes
Note
Wire orchestration v2 provider adapters, contracts, replay testkit, and WebSocket RPCs
runtimeLayer.tsand exposed via the server layer.CodexAdapterV2.ts,ClaudeAdapterV2.ts) with capability matrices, protocol loggers, and provider adapter registry.dispatchCommand,getThreadProjection,subscribeShell,subscribeThread) exposed through the server WS layer and fronted by the web RPC client andEnvironmentApi.031_OrchestrationV2.ts) creating orchestration v2 event, command receipt, and projection tables (threads, runs, run attempts, nodes, provider sessions).orchestrationV2.tscovering commands, domain events, projections, stream items, and RPC schemas, plus new branded entity IDs inbaseSchemas.ts.📊 Macroscope summarized 6d1d513. 55 files reviewed, 0 issues evaluated, 0 issues filtered, 0 comments posted
🗂️ Filtered Issues
No issues evaluated.