Skip to content

feat(agent): Pi-backed agent workflow service, template, and tracing#4758

Closed
mmabrouk wants to merge 1 commit into
mainfrom
feat/agent-workflows
Closed

feat(agent): Pi-backed agent workflow service, template, and tracing#4758
mmabrouk wants to merge 1 commit into
mainfrom
feat/agent-workflows

Conversation

@mmabrouk

@mmabrouk mmabrouk commented Jun 19, 2026

Copy link
Copy Markdown
Member

This PR is part of a stack. Review bottom-up.

Each PR's diff is only its own delta. Merge from the bottom. This PR's base is main.

"## Context\n\nThis PR adds a Pi-backed agent that runs as a normal Agenta workflow, alongside chat and completion. A request hits /invoke, the handler runs one cold agent turn through a TypeScript Pi wrapper, and the final assistant message comes back. It is the bottom of the agent-workflows stack (base main) and seeds the slices that follow in docs/design/agent-workflows/pr-stack.md: the protocol shell (#2), runner streaming (#3), and the agent template (#4).\n\n## What this changes\n\nThe agent registers like any other workflow. create_agent_app() calls ag.create_app() + ag.workflow(schemas=...) + ag.route(), so the backend and playground treat it as a workflow with /invoke and /inspect. The handler reads config (model + AGENTS.md) from parameters, falls back to file config, builds the user turn, and runs it through a Harness port.\n\nThe service stays harness-agnostic and environment-agnostic behind two ports in services/oss/src/agent_pi/ports.py:\n\n- Harness is the seam to the agent engine. PiHarness drives the TS wrapper over JSON-on-stdio. PiHttpHarness calls the same wrapper as an HTTP sidecar. Same port, two transports.\n- Runtime is the seam to the run environment. LocalRuntime runs the wrapper as a subprocess. A Daytona adapter slots in later behind the same port.\n\n_build_harness() picks the transport from the environment: AGENTA_AGENT_PI_URL set means the HTTP sidecar (the Python container has no Node), unset means a local subprocess.\n\nThe TS wrapper (services/agent/src/runPi.ts) injects AGENTS.md in memory, runs an in-memory session in a throwaway temp dir, and writes one JSON result to stdout. Nothing invocation-specific touches a persistent disk.\n\nTracing threads the caller's trace context across the Python/Node boundary. The handler captures the active /invoke span's traceparent and passes it down. The wrapper starts invoke_agent as a child of that remote span, so the agent's whole run nests under the response trace the way chat and completion nest their LLM spans.\n\nThe SDK seeds the builtin interface and catalog template for agenta:builtin:agent:v0 (model + agents_md, messages-in, message-out), and flips the agent registry entry from evaluator-only to application:\n\n\n- (\"builtin\", \"agent\"): (False, True, False) # evaluator-only\n+ (\"builtin\", \"agent\"): (True, False, False) # application\n\n\nThe running handler does not register under that builtin URI yet. It takes an auto user:custom:... URI. Wiring the handler to the builtin type is deferred (the comment in agent.py points at WP-6). The frontend adds "Agent" to the create-app menus, a robot icon, and treats the agent as a chat-mode app (is_chat: type === \"chat\" || type === \"agent\").\n\nMost of the line count is design docs and POC artifacts under docs/design/agent-workflows/. The running code is small.\n\n## Key architectural decision to review\n\n1. Register through ag.route, not as a builtin workflow type (yet). services/oss/src/agent.py. The agent ships as a user:custom: workflow now, while the SDK seeds the agenta:builtin:agent:v0 interface for later. The win is that this PR delivers a working agent without touching builtin-type resolution; the cost is two sources of truth for the agent schema (the live schemas.py and the seeded SDK interface) that must stay in sync until WP-6 unifies them. Check that the seeded SDK interface and schemas.py agree on defaults and shape.\n\n2. Cross-boundary trace nesting. services/oss/src/agent_pi/ports.py (TraceContext) and services/agent/src/agenta-otel.ts (TraceBatchProcessor). The handler ships its traceparent, OTLP endpoint, and Authorization into the wrapper, which makes invoke_agent a child of the remote span. Because that root parent never ends inside the Node process, root-span-end cannot trigger the export. So the wrapper buffers a trace's spans and flushes them by trace id after the run. Look at whether a failed or timed-out run still flushes (or intentionally drops the trace), since the export hangs off the run's happy path in runPi.ts.\n\n## How to review this PR\n\n1. Open services/oss/src/agent.py first. It is the whole control flow in one file: build the turn, pick the harness, invoke, return one assistant message. Read _build_harness, _trace_context, and create_agent_app.\n2. Read services/oss/src/agent_pi/ports.py to see the two ports and TraceContext. The adapters (pi_harness.py, pi_http_harness.py, local_runtime.py) are thin once the ports are clear.\n3. Read services/agent/src/runPi.ts for the Pi run: in-memory AGENTS.md, temp cwd, stream accumulation, flush-by-trace-id at the end.\n4. Skim services/agent/src/agenta-otel.ts only for TraceBatchProcessor (buffer + flush). The span-attribute code below it is ported unchanged from the WP-1 POC.\n5. Skip the docs/design/agent-workflows/** docs and POC files unless you want the background; they do not run in production.\n6. Most likely regression: the dual schema source. If schemas.py defaults drift from the seeded SDK agent_v0_interface, the playground pre-fill and the runtime fallback disagree. The other thing to watch is the trace flush on the error path.\n\n## Tests / notes\n\n/invoke was verified live with curl through the standalone entrypoints/agent_main.py runner (auth bypassed for the curl path), and the agent run was confirmed to nest under the /invoke span in the same trace. The wrapper needs Pi auth (~/.pi/agent/auth.json) or OPENAI_API_KEY / ANTHROPIC_API_KEY. The model default is gpt-5.5. Tools, streaming, multi-message output, and the Daytona runtime are later work packages in this stack.\n"

… docs

- New agent workflow service wrapping the Pi harness, served same-origin like
  chat/completion at /services/agent/v0 (Python service + Node Pi sidecar, ports/adapters).
- Builtin 'agent' app type + create-app template; config is model + AGENTS.md.
- /inspect chat schema and OpenTelemetry tracing into Agenta.
- EE dev compose agent-pi sidecar; design docs under docs/design/agent-workflows.
@vercel

vercel Bot commented Jun 19, 2026

Copy link
Copy Markdown

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
agenta-documentation Ready Ready Preview, Comment Jun 19, 2026 3:40pm

Request Review

@dosubot dosubot Bot added size:XXL This PR changes 1000+ lines, ignoring generated files. documentation Improvements or additions to documentation feature labels Jun 19, 2026
@coderabbitai

coderabbitai Bot commented Jun 19, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

Introduces a new "agent" workflow type backed by the pi.dev coding-agent harness. Adds research documentation, a TypeScript Pi wrapper service (services/agent) with OTel span export, a Python agent service using ports/adapters (Harness/Runtime), Daytona sandbox POC scripts, SDK registration of agenta:builtin:agent:v0, deployment Docker Compose wiring, and frontend UI for creating agent apps.

Changes

Agent Workflows Feature

Layer / File(s) Summary
Design overview and research docs
docs/design/agent-workflows/README.md, docs/design/agent-workflows/research/*
Adds the agent-workflows context document and six research files covering auth/secrets, diskless in-memory config, OTel instrumentation, pi programmatic interaction, sandbox sharing strategy, and open questions.
WP-1: Pi OTel tracing POC
docs/design/agent-workflows/wp-1-pi-tracing/..., docs/design/agent-workflows/wp-1-pi-tracing/poc/*
Adds WP-1 design docs and a TypeScript proof-of-concept agenta-otel Pi extension that converts Pi lifecycle events into OTLP/HTTP spans via a custom TraceBatchProcessor (parent-first export), plus a runner script and env/package configuration.
WP-2 through WP-7 design documents
docs/design/agent-workflows/wp-2-agent-service/..., docs/design/agent-workflows/wp-3-daytona-sandbox/README.md, docs/design/agent-workflows/wp-4-*/README.md, docs/design/agent-workflows/wp-5-*/README.md, docs/design/agent-workflows/wp-6-*/README.md, docs/design/agent-workflows/wp-7-*/README.md
Adds design READMEs and an implementation plan for WP-2 through WP-7, covering agent service architecture, Daytona sandbox integration, multi-message output, chat/completion interface contract, workflow type registration, tools config, and traceparent propagation.
Python agent_pi ports, adapters, config, and schemas
services/oss/src/agent_pi/ports.py, services/oss/src/agent_pi/config.py, services/oss/src/agent_pi/local_runtime.py, services/oss/src/agent_pi/pi_harness.py, services/oss/src/agent_pi/pi_http_harness.py, services/oss/src/agent_pi/schemas.py, services/oss/src/agent_pi/__init__.py
Defines Runtime and Harness abstract base classes with data models (ExecResult, TraceContext, HarnessRequest, HarnessResult), and implements LocalRuntime (asyncio subprocess), PiHarness (JSON-over-stdio), PiHttpHarness (HTTP POST to sidecar), AgentConfig/load_config, and AGENT_SCHEMAS for /inspect.
Python agent service and entrypoints
services/oss/src/agent.py, services/entrypoints/agent_main.py, services/entrypoints/main.py
Implements the _agent workflow handler that selects a harness adapter based on AGENTA_AGENT_PI_URL, extracts W3C traceparent from the active workflow span, invokes the harness, and returns an assistant message; mounts agent_v0_app in both standalone and main FastAPI entrypoints.
TypeScript Pi wrapper service
services/agent/src/agenta-otel.ts, services/agent/src/runPi.ts, services/agent/src/cli.ts, services/agent/src/server.ts, services/agent/package.json, services/agent/tsconfig.json, services/agent/docker/Dockerfile.dev, services/agent/config/*, services/agent/scripts/register_agent_app.py, services/agent/README.md, services/agent/.dockerignore
Adds the full TypeScript Pi wrapper: agenta-otel.ts (run-scoped OTel extension with TraceBatchProcessor and traceparent propagation), runPi.ts (in-memory Pi session orchestration with diskless config), cli.ts (JSON-over-stdio adapter), server.ts (HTTP /run and /health), plus build/config/registration files.
WP-3: Daytona sandbox POC scripts
docs/design/agent-workflows/wp-3-daytona-sandbox/poc/build_snapshot.py, docs/design/agent-workflows/wp-3-daytona-sandbox/poc/run_agent.py, docs/design/agent-workflows/wp-3-daytona-sandbox/poc/bench_coldstart.py, docs/design/agent-workflows/wp-3-daytona-sandbox/poc/cleanup.py, docs/design/agent-workflows/wp-3-daytona-sandbox/poc/README.md
Adds Python scripts for the Daytona POC: snapshot baking, per-run sandbox provisioning with file/secret injection and Pi JSON-mode streaming, cold-start benchmarking, and labeled-sandbox cleanup.
SDK registration, Docker wiring, and frontend
sdks/python/agenta/sdk/engines/running/interfaces.py, sdks/python/agenta/sdk/engines/running/utils.py, hosting/docker-compose/ee/docker-compose.dev.yml, services/agent/docker-compose.agent.yml, services/agent/docker-compose.stack.yml, web/oss/src/components/pages/app-management/..., web/packages/agenta-entities/src/workflow/state/appUtils.ts
Registers agenta:builtin:agent:v0 in INTERFACE_REGISTRY, CATALOG_REGISTRY, CONFIGURATION_REGISTRY, and the role table; adds the agent-pi sidecar to EE dev and standalone Docker Compose files; extends the frontend AppType union with "agent", sets is_chat: true for agent apps, and adds agent entry points in CreateAppDropdown, CreateAppTypeModal, and getAppTypeIcon.

Sequence Diagram(s)

sequenceDiagram
    rect rgba(70, 130, 180, 0.5)
        note over Client, AgentaOTLP: Agent invoke flow
    end
    participant Client
    participant agent_v0_app
    participant _agent as _agent (Python)
    participant PiHttpHarness
    participant server_ts as server.ts (Node)
    participant runPi
    participant PiSDK
    participant AgentaOTLP

    Client->>agent_v0_app: POST /agent/v0/invoke
    agent_v0_app->>_agent: dispatch
    _agent->>_agent: load_config() + _trace_context()
    _agent->>PiHttpHarness: invoke(HarnessRequest + traceparent)
    PiHttpHarness->>server_ts: POST /run {agentsMd, model, trace}
    server_ts->>runPi: runPi(request)
    runPi->>runPi: createAgentaOtel(traceparent)
    runPi->>PiSDK: createAgentSession(inMemoryConfig)
    PiSDK-->>runPi: text_delta events
    runPi->>AgentaOTLP: flushTrace → OTLP/HTTP batch (invoke_agent→turn→chat→tool)
    runPi-->>server_ts: AgentRunResult {ok, output, traceId}
    server_ts-->>PiHttpHarness: JSON response
    PiHttpHarness-->>_agent: HarnessResult
    _agent-->>Client: assistant message
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Agenta-AI/agenta#4274: Both PRs modify web/packages/agenta-entities/src/workflow/state/appUtils.ts — this PR adds "agent" to the AppType union while that PR shapes the core chat/completion app-type workflow creation utilities in the same file.
  • Agenta-AI/agenta#4609: Both PRs affect flags.is_chat behavior — this PR sets is_chat: true for the new agent type, while that PR modifies playground mode selection logic that reads flags.is_chat to determine capability.
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.23% which is insufficient. The required threshold is 60.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: introducing a Pi-backed agent workflow service with tracing capabilities, matching the scope of the substantial changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The pull request description comprehensively explains the purpose, architecture, and scope of changes, including key decisions, integration points, and review guidance.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/agent-workflows

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

"content-type": "application/json",
"content-length": Buffer.byteLength(payload),
});
res.end(payload);
@mmabrouk

Copy link
Copy Markdown
Member Author

Reviewer guide: interesting code

A map to the hunks that carry the architecture. Start at the first one.

  • services/oss/src/agent.py:135routed = ag.workflow(schemas=AGENT_SCHEMAS)(_agent) then ag.route(...). The agent registers as a normal workflow through the public SDK decorators, which is what makes the backend and playground treat it like chat or completion.
  • services/oss/src/agent.py:132 — The "No builtin URI yet" comment. The handler deliberately takes an auto user:custom: URI; binding it to the seeded agenta:builtin:agent:v0 type is deferred to WP-6. This is the decision to scrutinize.
  • services/oss/src/agent.py:70_trace_context() reads the active /invoke span's traceparent via inject({}) and the OTLP endpoint, then hands them to the harness. This is where cross-boundary tracing starts on the Python side. It is best-effort: any failure returns None and the run is traced standalone.
  • services/oss/src/agent_pi/ports.py:60TraceContext and to_wire(). The seam contract: snake_case in Python, camelCase on the wire. Both Harness adapters serialize through this one shape.
  • services/agent/src/runPi.ts:168noContextFiles: true plus agentsFilesOverride injecting /virtual/AGENTS.md. The agent's instructions are injected in memory and on-disk context files are kept out, so a run touches no persistent disk. This is the diskless-config decision.
  • services/agent/src/agenta-otel.ts:123if (!span.parentSpanId) in TraceBatchProcessor.onEnd. Auto-flush fires only for a local root. When the caller passes a traceparent, the root has a remote parent that never ends here, so the run must flush by trace id explicitly (see runPi.ts otel.flush()). Confirm the error path still flushes or intentionally drops.
  • sdks/python/agenta/sdk/engines/running/utils.py:569 — the agent registry entry flips from (False, True, False) (evaluator-only) to (True, False, False) (application). Small line, real behavior change: it makes the agent show up as a creatable app.

Comment thread services/oss/src/agent.py

def create_agent_app():
app = ag.create_app()
# No builtin URI yet: registering the agent as a first-class workflow type

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decision to scrutinize: the handler registers under an auto user:custom: URI, not the seeded agenta:builtin:agent:v0 builtin type. That keeps this PR off builtin-type resolution, but it leaves the agent schema in two places (schemas.py here and the seeded SDK agent_v0_interface) until WP-6 unifies them. They must stay in sync.



@dataclass
class TraceContext:

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single seam for cross-boundary tracing: TraceContext.to_wire() maps snake_case Python to the camelCase shape the TS wrapper reads. Both Harness adapters (stdio and HTTP) serialize through this, so a field added here must be mirrored in runPi.ts TraceContext.

const loader = new DefaultResourceLoader({
cwd,
agentDir: getAgentDir(),
noContextFiles: true,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diskless config: noContextFiles: true plus the agentsFilesOverride virtual AGENTS.md inject the instructions in memory and keep on-disk context files out of the run. With the throwaway temp cwd, a run touches no persistent disk. Worth confirming no on-disk fallback path slips context back in.

spans.push(span);
this.buffers.set(traceId, spans);
// No parent in this process => this is the local root and the trace is done.
if (!span.parentSpanId) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-flush fires only when the span has no parent in this process (the local root). When the caller threads a traceparent, the root has a remote parent that never ends here, so onEnd never auto-flushes and the run must call otel.flush(traceId) explicitly. Check the error/timeout path in runPi.ts still reaches that flush, or the trace is dropped.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 18

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
services/agent/docker/Dockerfile.dev (1)

7-29: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Run the container as a non-root user by default.

No USER is set, so the image defaults to root; this weakens least-privilege posture for the sidecar.

Suggested fix
 FROM node:24-slim
@@
 RUN pnpm install --frozen-lockfile
@@
 COPY tsconfig.json ./
 COPY src ./src
+
+# Drop root privileges for runtime.
+USER node
@@
 CMD ["node_modules/.bin/tsx", "watch", "src/server.ts"]

Source: Linters/SAST tools

🟡 Minor comments (6)
hosting/docker-compose/ee/docker-compose.dev.yml-397-399 (1)

397-399: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Declare the new sidecar dependency for startup ordering.

After introducing AGENTA_AGENT_PI_URL, services should explicitly depend on agent-pi to reduce transient boot-time failures for agent requests.

Suggested change
     services:
@@
         networks:
             - agenta-network
+        depends_on:
+            agent-pi:
+                condition: service_started
docs/design/agent-workflows/README.md-14-16 (1)

14-16: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Broken internal link: "Open research topics" heading not found.

Line 15 references [Open research topics](#open-research-topics), but no section with that ID exists in the document. The heading "## POC work packages" (line 95) appears to be the intended target, or a new section titled "## Open research topics" should be added before the POC work packages.

As written, readers clicking the link will not navigate to the referenced section.

🔧 Proposed fix

Either update the link to point to the correct section:

-The research topics in [Open research topics](`#open-research-topics`) will be assigned to subagents and
+The research topics below will be assigned to subagents and

Or, rename the POC work packages section to match the link:

-## POC work packages
+## Open research topics (POC work packages)
docs/design/agent-workflows/wp-3-daytona-sandbox/poc/run_agent.py-183-184 (1)

183-184: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Handle missing flag values in arg() to avoid IndexError crashes.

Line 184 assumes every present flag has a value; --auth/--model without a following token crashes with a traceback.

Suggested fix
 def arg(name: str, default: str) -> str:
-    return sys.argv[sys.argv.index(name) + 1] if name in sys.argv else default
+    if name not in sys.argv:
+        return default
+    i = sys.argv.index(name) + 1
+    if i >= len(sys.argv) or sys.argv[i].startswith("--"):
+        raise SystemExit(f"missing value for {name}")
+    return sys.argv[i]
docs/design/agent-workflows/wp-1-pi-tracing/poc/README.md-15-20 (1)

15-20: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add language identifiers to fenced code blocks.

markdownlint MD040 flags both fences; add explicit languages (for example, text) to keep docs lint-clean.

Also applies to: 67-73

Source: Linters/SAST tools

docs/design/agent-workflows/wp-1-pi-tracing/tracing-in-the-agent-service.md-25-31 (1)

25-31: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Specify languages for fenced code blocks.

markdownlint MD040 is triggered here; add fence languages (for example, text and bash).

Also applies to: 100-102

Source: Linters/SAST tools

docs/design/agent-workflows/wp-1-pi-tracing/poc/run.ts-173-178 (1)

173-178: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reset runConfig.traceId before each prompt to avoid stale IDs.

The loop can push a previous trace id when a prompt iteration does not produce a fresh one.

🔧 Suggested change
   const traceIds: string[] = [];
   for (let i = 0; i < scenario.prompts.length; i++) {
+    runConfig.traceId = undefined;
     const p = scenario.prompts[i];
     console.log(`\n[run] prompt ${i + 1}/${scenario.prompts.length}: ${p}\n`);
     await session.prompt(p);
     if (runConfig.traceId) traceIds.push(runConfig.traceId);
   }
🧹 Nitpick comments (2)
docs/design/agent-workflows/wp-1-pi-tracing/integrating-the-tracing-extension.md (1)

23-28: ⚡ Quick win

Add language specifiers to three fenced code blocks.

Three code blocks are missing language identifiers, which triggers markdown linting warnings (MD040):

  • Lines 23–28 (span tree structure)
  • Lines 147–155 (dependencies)
  • Lines 161–163 (curl verification command)
Add language specifiers
-```
+```
 invoke_agent              openinference.span.kind = AGENT   (root, one per user prompt)
   turn N                  CHAIN
     chat <model>          LLM    model, latency, token usage, finish reason, messages
     execute_tool <name>   TOOL   args in, result out
-```
+```

-```
+```
 `@earendil-works/pi-coding-agent`  0.79.4
 `@opentelemetry/api`               1.9.0
 `@opentelemetry/exporter-trace-otlp-proto`  0.54.0
@@ -150,7 +152,7 @@
 `@opentelemetry/sdk-trace-node`    1.28.0
 `@opentelemetry/semantic-conventions`  1.28.0
-```
+```

-```
+```
    curl -s "${AGENTA_HOST}/api/spans/?trace_id=<id>" -H "Authorization: ApiKey ${AGENTA_API_KEY}"
-```
+```

The first block is a diagram (use a code fence without a language or add a comment prefix like text), the second is a version list (add text or plaintext), and the third is a bash command (add bash).

Also applies to: 147-155, 161-163

services/agent/scripts/register_agent_app.py (1)

44-45: ⚖️ Poor tradeoff

Schema definitions must remain in sync with services/oss/src/agent_pi/schemas.py.

The AGENT_SCHEMAS defined here (lines 47–72) are registered as the chat interface for the agent app. The comment on lines 44–45 notes they must be kept in sync with the Python agent service's schema definitions. If these diverge, the playground will render a mismatched schema, breaking the chat UI.

Consider adding a check or docstring reminder in both files to surface any drift early, or extracting the schema to a shared definition (e.g., a JSON/YAML file committed to both locations).

Also applies to: 47-72


ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: c2ca2e05-b7c6-4d53-8c90-a69e487dadc4

📥 Commits

Reviewing files that changed from the base of the PR and between a97e608 and 6898615.

⛔ Files ignored due to path filters (2)
  • docs/design/agent-workflows/wp-1-pi-tracing/poc/pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
  • services/agent/pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (59)
  • docs/design/agent-workflows/README.md
  • docs/design/agent-workflows/research/auth-secrets.md
  • docs/design/agent-workflows/research/daytona-sandbox.md
  • docs/design/agent-workflows/research/diskless-in-memory-config.md
  • docs/design/agent-workflows/research/open-questions.md
  • docs/design/agent-workflows/research/otel-instrumentation.md
  • docs/design/agent-workflows/research/pi-interaction.md
  • docs/design/agent-workflows/research/sandbox-sharing.md
  • docs/design/agent-workflows/wp-1-pi-tracing/README.md
  • docs/design/agent-workflows/wp-1-pi-tracing/integrating-the-tracing-extension.md
  • docs/design/agent-workflows/wp-1-pi-tracing/poc/.env.example
  • docs/design/agent-workflows/wp-1-pi-tracing/poc/README.md
  • docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts
  • docs/design/agent-workflows/wp-1-pi-tracing/poc/package.json
  • docs/design/agent-workflows/wp-1-pi-tracing/poc/run.ts
  • docs/design/agent-workflows/wp-1-pi-tracing/tracing-in-the-agent-service.md
  • docs/design/agent-workflows/wp-2-agent-service/README.md
  • docs/design/agent-workflows/wp-2-agent-service/implementation-plan.md
  • docs/design/agent-workflows/wp-3-daytona-sandbox/README.md
  • docs/design/agent-workflows/wp-3-daytona-sandbox/poc/README.md
  • docs/design/agent-workflows/wp-3-daytona-sandbox/poc/bench_coldstart.py
  • docs/design/agent-workflows/wp-3-daytona-sandbox/poc/build_snapshot.py
  • docs/design/agent-workflows/wp-3-daytona-sandbox/poc/cleanup.py
  • docs/design/agent-workflows/wp-3-daytona-sandbox/poc/run_agent.py
  • docs/design/agent-workflows/wp-4-multi-message-output/README.md
  • docs/design/agent-workflows/wp-5-chat-vs-completion/README.md
  • docs/design/agent-workflows/wp-6-workflow-type-and-template/README.md
  • docs/design/agent-workflows/wp-7-tools/README.md
  • hosting/docker-compose/ee/docker-compose.dev.yml
  • sdks/python/agenta/sdk/engines/running/interfaces.py
  • sdks/python/agenta/sdk/engines/running/utils.py
  • services/agent/.dockerignore
  • services/agent/README.md
  • services/agent/config/AGENTS.md
  • services/agent/config/agent.json
  • services/agent/docker-compose.agent.yml
  • services/agent/docker-compose.stack.yml
  • services/agent/docker/Dockerfile.dev
  • services/agent/package.json
  • services/agent/scripts/register_agent_app.py
  • services/agent/src/agenta-otel.ts
  • services/agent/src/cli.ts
  • services/agent/src/runPi.ts
  • services/agent/src/server.ts
  • services/agent/tsconfig.json
  • services/entrypoints/agent_main.py
  • services/entrypoints/main.py
  • services/oss/src/agent.py
  • services/oss/src/agent_pi/__init__.py
  • services/oss/src/agent_pi/config.py
  • services/oss/src/agent_pi/local_runtime.py
  • services/oss/src/agent_pi/pi_harness.py
  • services/oss/src/agent_pi/pi_http_harness.py
  • services/oss/src/agent_pi/ports.py
  • services/oss/src/agent_pi/schemas.py
  • web/oss/src/components/pages/app-management/components/CreateAppDropdown/index.tsx
  • web/oss/src/components/pages/app-management/modals/CreateAppTypeModal/index.tsx
  • web/oss/src/components/pages/prompts/assets/iconHelpers.tsx
  • web/packages/agenta-entities/src/workflow/state/appUtils.ts

Comment on lines +301 to +306
- Caveat: the existing map uses the older `gen_ai.usage.prompt_tokens` /
`completion_tokens` names. The Pi extensions emit the newer
`gen_ai.usage.input_tokens` / `output_tokens`. Those newer keys are **not**
in `semconv.py` yet, so token metrics from Pi would be dropped until we add
the two aliases. (Verified by reading `semconv.py` — only `prompt_tokens` /
`completion_tokens` / `total_tokens` are present.)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Token attribute mismatch: Pi extensions emit newer semconv keys not yet mapped in Agenta.

Lines 301-306 note that existing semconv.py mappings handle only gen_ai.usage.prompt_tokens|completion_tokens|total_tokens, but Pi extensions emit gen_ai.usage.input_tokens|output_tokens. Relevant code context confirms api/oss/src/apis/fastapi/otlp/opentelemetry/semconv.py lacks mappings for the newer attribute names.

Impact: when Pi extensions export spans over OTLP to Agenta's /otlp/v1/traces, token usage metrics will be silently dropped unless semconv.py is updated to include:

  • gen_ai.usage.input_tokens -> ag.metrics.unit.tokens.prompt
  • gen_ai.usage.output_tokens -> ag.metrics.unit.tokens.completion

This must be addressed before WP-1 (Pi OTel tracing extension) can be fully integrated.

Comment on lines +2 to +3
AGENTA_HOST=http://144.76.237.122:8280/
AGENTA_API_KEY=your-agenta-project-api-key

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid a real shared collector endpoint in the checked-in example config.

The default AGENTA_HOST points to a concrete external host, which can cause accidental telemetry/content leakage when someone runs the POC with defaults.

🔧 Suggested change
-AGENTA_HOST=http://144.76.237.122:8280/
+AGENTA_HOST=http://localhost:8280/
 AGENTA_API_KEY=your-agenta-project-api-key
🧰 Tools
🪛 dotenv-linter (4.0.0)

[warning] 3-3: [UnorderedKey] The AGENTA_API_KEY key should go before the AGENTA_HOST key

(UnorderedKey)

Comment on lines +66 to +67
this.exporter.export(orderParentFirst(spans), () => {});
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's verify the file exists and check its content
ls -la docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts

Repository: Agenta-AI/agenta

Length of output: 173


🏁 Script executed:

# Get the file size to determine how to read it
wc -l docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts

Repository: Agenta-AI/agenta

Length of output: 127


🏁 Script executed:

# Read the specific sections mentioned in the review (lines 60-80 and 365-400)
cat -n docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts | sed -n '60,80p'

Repository: Agenta-AI/agenta

Length of output: 900


🏁 Script executed:

# Also check the forceFlush method around lines 73-75
cat -n docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts | sed -n '65,85p'

Repository: Agenta-AI/agenta

Length of output: 957


🏁 Script executed:

# Check the second location mentioned (73-75 and the forceFlush context)
cat -n docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts | sed -n '360,400p'

Repository: Agenta-AI/agenta

Length of output: 1964


🏁 Script executed:

# Check if ExportResultCode is available in the suggested import location
rg "ExportResultCode" --type ts --type tsx

Repository: Agenta-AI/agenta

Length of output: 88


🏁 Script executed:

# Check what OpenTelemetry imports are already in the file
head -30 docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts

Repository: Agenta-AI/agenta

Length of output: 1562


🏁 Script executed:

# Check more of the imports and look for ExportResultCode usage patterns
cat -n docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts | head -80

Repository: Agenta-AI/agenta

Length of output: 3999


🏁 Script executed:

# Check if ExportResultCode is used anywhere in the codebase
rg "ExportResultCode" -t ts

Repository: Agenta-AI/agenta

Length of output: 404


🏁 Script executed:

# Check OpenTelemetry packages in package.json to understand available versions
find . -name "package.json" | xargs grep -l "`@opentelemetry/core`" | head -3

Repository: Agenta-AI/agenta

Length of output: 109


Do not swallow OTLP export failures.

Both export callbacks at line 66 and lines 73–75 ignore the result, and forceFlush() resolves even when export fails. This masks telemetry loss as success.

🔧 Suggested change
+import { ExportResultCode } from "`@opentelemetry/core`";
@@
-      this.exporter.export(orderParentFirst(spans), () => {});
+      this.exporter.export(orderParentFirst(spans), (result) => {
+        if (result.code !== ExportResultCode.SUCCESS) {
+          console.warn("[agenta-otel] span export failed", result.error);
+        }
+      });
@@
-    return new Promise((resolve) =>
-      this.exporter.export(orderParentFirst(leftovers), () => resolve()),
-    );
+    return new Promise((resolve, reject) =>
+      this.exporter.export(orderParentFirst(leftovers), (result) => {
+        if (result.code === ExportResultCode.SUCCESS) resolve();
+        else reject(result.error ?? new Error("OTLP export failed"));
+      }),
+    );

Comment on lines +362 to +381
pi.on("tool_execution_start", async (event: any) => {
const parent = currentTurn?.ctx ?? agentCtx ?? context.active();
const name = event?.toolName ? `execute_tool ${event.toolName}` : "execute_tool";
const span = t.startSpan(name, undefined, parent);
span.setAttribute("openinference.span.kind", "TOOL");
span.setAttribute("gen_ai.operation.name", "execute_tool");
if (event?.toolName) span.setAttribute("gen_ai.tool.name", event.toolName);
if (event?.toolCallId) span.setAttribute("gen_ai.tool.call.id", event.toolCallId);
setInputs(span, (event?.args as Record<string, unknown>) ?? {});
if (event?.toolCallId) toolSpans.set(event.toolCallId, span);
});

pi.on("tool_execution_end", async (event: any) => {
const span = event?.toolCallId ? toolSpans.get(event.toolCallId) : undefined;
if (!span) return;
setOutput(span, toolResultText(event?.result));
if (event?.isError) span.setStatus({ code: SpanStatusCode.ERROR });
span.end();
toolSpans.delete(event.toolCallId);
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Close in-flight tool spans on turn end as a safety net.

llmSpan has a fallback close in turn_end, but tool spans do not. If a tool_execution_end event is missed, tool spans remain open and are never exported.

🔧 Suggested change
   pi.on("turn_end", async (event: any) => {
@@
     if (currentTurn) {
       currentTurn.span.end();
       currentTurn = undefined;
     }
+    for (const span of toolSpans.values()) {
+      span.setStatus({
+        code: SpanStatusCode.ERROR,
+        message: "tool_execution_end not received before turn_end",
+      });
+      span.end();
+    }
+    toolSpans.clear();
   });

Also applies to: 383-395

Comment on lines +276 to +281
await sandbox.process.get_session_command_logs_async(
session_id,
cmd_id,
collector.feed_stdout,
collector.feed_stderr,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Add a timeout guard to live log streaming so teardown always progresses.

Line 276 awaits streaming logs with no upper bound; if the session stalls, the coroutine can hang forever and never reach sandbox deletion.

Suggested fix
+            stream_timeout_s = int(os.environ.get("PI_STREAM_TIMEOUT_S", "900"))
             await sandbox.process.get_session_command_logs_async(
-                session_id,
-                cmd_id,
-                collector.feed_stdout,
-                collector.feed_stderr,
-            )
+                session_id,
+                cmd_id,
+                collector.feed_stdout,
+                collector.feed_stderr,
+            )
+            try:
+                await asyncio.wait_for(
+                    sandbox.process.get_session_command_logs_async(
+                        session_id,
+                        cmd_id,
+                        collector.feed_stdout,
+                        collector.feed_stderr,
+                    ),
+                    timeout=stream_timeout_s,
+                )
+            except asyncio.TimeoutError:
+                collector.error = f"stream timeout after {stream_timeout_s}s"
+                log(f"\n  [error] {collector.error}")
+                raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await sandbox.process.get_session_command_logs_async(
session_id,
cmd_id,
collector.feed_stdout,
collector.feed_stderr,
)
stream_timeout_s = int(os.environ.get("PI_STREAM_TIMEOUT_S", "900"))
try:
await asyncio.wait_for(
sandbox.process.get_session_command_logs_async(
session_id,
cmd_id,
collector.feed_stdout,
collector.feed_stderr,
),
timeout=stream_timeout_s,
)
except asyncio.TimeoutError:
collector.error = f"stream timeout after {stream_timeout_s}s"
log(f"\n [error] {collector.error}")
raise

Comment on lines +28 to +34
async function readBody(req: IncomingMessage): Promise<string> {
const chunks: Buffer[] = [];
for await (const chunk of req) {
chunks.push(chunk as Buffer);
}
return Buffer.concat(chunks).toString("utf8");
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Cap /run request body size to prevent memory exhaustion.

readBody() buffers the full payload without a limit, so a large request can exhaust memory.

Suggested fix
-async function readBody(req: IncomingMessage): Promise<string> {
+async function readBody(req: IncomingMessage, maxBytes = 1_000_000): Promise<string> {
   const chunks: Buffer[] = [];
+  let total = 0;
   for await (const chunk of req) {
-    chunks.push(chunk as Buffer);
+    const buf = chunk as Buffer;
+    total += buf.length;
+    if (total > maxBytes) {
+      throw new Error("Request body too large");
+    }
+    chunks.push(buf);
   }
   return Buffer.concat(chunks).toString("utf8");
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async function readBody(req: IncomingMessage): Promise<string> {
const chunks: Buffer[] = [];
for await (const chunk of req) {
chunks.push(chunk as Buffer);
}
return Buffer.concat(chunks).toString("utf8");
}
async function readBody(req: IncomingMessage, maxBytes = 1_000_000): Promise<string> {
const chunks: Buffer[] = [];
let total = 0;
for await (const chunk of req) {
const buf = chunk as Buffer;
total += buf.length;
if (total > maxBytes) {
throw new Error("Request body too large");
}
chunks.push(buf);
}
return Buffer.concat(chunks).toString("utf8");
}

Comment on lines +56 to +58
} catch (err) {
const message = err instanceof Error ? err.stack ?? err.message : String(err);
return send(res, 500, { ok: false, error: message });

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Do not return stack traces to clients.

The 500 response currently includes err.stack, which exposes internal paths/implementation details over the API.

Suggested fix
   } catch (err) {
-    const message = err instanceof Error ? err.stack ?? err.message : String(err);
-    return send(res, 500, { ok: false, error: message });
+    const message = err instanceof Error ? err.stack ?? err.message : String(err);
+    process.stderr.write(`[pi-wrapper] server error: ${message}\n`);
+    return send(res, 500, { ok: false, error: "Internal server error" });
   }
 });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch (err) {
const message = err instanceof Error ? err.stack ?? err.message : String(err);
return send(res, 500, { ok: false, error: message });
} catch (err) {
const message = err instanceof Error ? err.stack ?? err.message : String(err);
process.stderr.write(`[pi-wrapper] server error: ${message}\n`);
return send(res, 500, { ok: false, error: "Internal server error" });
}

Source: Linters/SAST tools

Comment on lines +63 to +66
if meta_path.exists():
meta = json.loads(meta_path.read_text(encoding="utf-8"))
model = meta.get("model") or DEFAULT_MODEL
tools = meta.get("tools", []) or []

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle invalid agent.json defensively to prevent request-time failures.

At Line 64, malformed JSON (or non-object JSON) will raise and break every call path that loads config; Line 65/Line 66 also assume the parsed shape matches dict/List[str]. Validate parse + types before constructing AgentConfig.

Suggested patch
 def load_config() -> AgentConfig:
@@
     model: str = DEFAULT_MODEL
     tools: List[str] = []
     meta_path = base / "agent.json"
     if meta_path.exists():
-        meta = json.loads(meta_path.read_text(encoding="utf-8"))
-        model = meta.get("model") or DEFAULT_MODEL
-        tools = meta.get("tools", []) or []
+        try:
+            parsed = json.loads(meta_path.read_text(encoding="utf-8"))
+        except json.JSONDecodeError:
+            parsed = {}
+
+        meta = parsed if isinstance(parsed, dict) else {}
+        raw_model = meta.get("model")
+        if isinstance(raw_model, str) and raw_model:
+            model = raw_model
+
+        raw_tools = meta.get("tools", [])
+        if isinstance(raw_tools, list):
+            tools = [tool for tool in raw_tools if isinstance(tool, str)]
 
     return AgentConfig(agents_md=agents_md, model=model, tools=tools)

Comment on lines +34 to +53
proc = await asyncio.create_subprocess_exec(
*command,
cwd=cwd,
env=env,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=input_bytes),
timeout=timeout,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise RuntimeError(
f"Harness process timed out after {timeout}s: {' '.join(command)}"
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, examine the actual file to see the current implementation
cat -n services/oss/src/agent_pi/local_runtime.py | head -80

Repository: Agenta-AI/agenta

Length of output: 2187


🏁 Script executed:

# Check the full imports and context
head -30 services/oss/src/agent_pi/local_runtime.py

Repository: Agenta-AI/agenta

Length of output: 803


🏁 Script executed:

# Look for the exec method and timeout handling
sed -n '20,60p' services/oss/src/agent_pi/local_runtime.py

Repository: Agenta-AI/agenta

Length of output: 1251


🏁 Script executed:

# Check if ruff is available and run format/lint checks on the file
ruff check services/oss/src/agent_pi/local_runtime.py --select=E,W,F 2>&1 || true

Repository: Agenta-AI/agenta

Length of output: 90


🏁 Script executed:

# Check if there are any other subprocess/timeout handling patterns in the codebase
rg "create_subprocess_exec|proc.kill|TimeoutError" --type py -B 2 -A 2

Repository: Agenta-AI/agenta

Length of output: 5625


🏁 Script executed:

# Check if there are any Windows/platform compatibility checks in the codebase
rg "start_new_session|killpg|signal.SIGKILL|platform|sys.platform" --type py services/oss/src/agent_pi/ -A 2 -B 2

Repository: Agenta-AI/agenta

Length of output: 42


🏁 Script executed:

# Check if this file needs to support Windows or is Unix-only
rg "Windows|win32|platform|os.name" --type py services/oss/src/agent_pi/ -A 2

Repository: Agenta-AI/agenta

Length of output: 42


🌐 Web query:

asyncio subprocess start_new_session Windows compatibility cross-platform

💡 Result:

The start_new_session parameter is not supported on Windows, as it is a concept specific to POSIX systems (Linux, macOS) for managing process sessions and groups [1][2]. Attempting to use start_new_session=True on Windows will typically raise an AttributeError or TypeError because the underlying subprocess module does not support this keyword argument on that platform [1]. To achieve process isolation or detachment on Windows in a cross-platform manner, you should use platform-specific conditional logic [1]: 1. On POSIX systems, you can use start_new_session=True [1][2]. 2. On Windows, you can achieve similar isolation by using the creationflags argument with the constant subprocess.CREATE_NEW_PROCESS_GROUP or subprocess.DETACHED_PROCESS [3][1]. Example of cross-platform implementation: import asyncio import sys import subprocess async def run_process(cmd): kwargs = {} if sys.platform == "win32": # Windows-specific flags for process isolation kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP else: # POSIX-specific flag kwargs['start_new_session'] = True process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, kwargs) return await process.communicate Additionally, note that on Windows, asyncio subprocesses require the ProactorEventLoop, which has been the default since Python 3.8 [4][5][6]. If you are using an older version or a custom loop, ensure this event loop is active, or you may encounter NotImplementedError when attempting to spawn subprocesses [4][5].

Citations:


🌐 Web query:

Python asyncio kill subprocess process group portable cross-platform alternative to killpg

💡 Result:

There is no single built-in, cross-platform method in asyncio to kill a process group, as process group concepts are inherently platform-specific [1][2]. The standard approach to achieving this requires different strategies for POSIX (Linux/macOS) and Windows [3][2][4]. For cross-platform compatibility, you must conditionally implement group-based signaling depending on the operating system [4]. POSIX (Linux/macOS) On POSIX systems, you can terminate a process and its children by using process groups [1][2]. 1. Start the subprocess with start_new_session=True to make the process a group leader [2][5]. 2. Use os.killpg(pgid, signal) to send a signal to the entire process group, where pgid is the process ID of the group leader (the subprocess PID) [1][2]. Example: import os import signal import asyncio async def run_and_kill_group: # Start as session leader proc = await asyncio.create_subprocess_shell("your_command", start_new_session=True) # Terminate the group os.killpg(os.getpgid(proc.pid), signal.SIGTERM) await proc.wait Windows On Windows, process groups are managed via creation flags rather than POSIX signals [3][4]. 1. When starting the subprocess, include creationflags=subprocess.CREATE_NEW_PROCESS_GROUP [4][6]. 2. To terminate, you can send a control event to the process group, such as signal.CTRL_BREAK_EVENT, or use standard termination methods if they have been configured to affect the group [3][4]. Example: import subprocess import asyncio import signal async def run_and_kill_windows: # Use creationflags for group support proc = await asyncio.create_subprocess_shell( "your_command", creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) # Send CTRL_BREAK_EVENT to the process group proc.send_signal(signal.CTRL_BREAK_EVENT) await proc.wait Key Considerations - Robustness: Always verify if the process is still running before sending signals to avoid ProcessLookupError, as asyncio may reap processes asynchronously [7][8]. - Portability Wrapper: Create a wrapper class that detects sys.platform and chooses between os.killpg and proc.send_signal(signal.CTRL_BREAK_EVENT) [4]. - Alternatives: If absolute cross-platform consistency is required without manual platform checks, third-party libraries like psutil are often used to recursively traverse and terminate process trees, although they are not part of the standard library [1].

Citations:


Timeout cleanup only kills the parent process, not the whole subprocess tree.

At Line 49, proc.kill() targets only the wrapper PID; if it spawns children (e.g., Node spawning other processes), they can outlive the request and accumulate over time, causing resource leaks.

The suggested approach using start_new_session=True and os.killpg() is POSIX-only and will raise errors on Windows. A cross-platform fix requires conditional logic:

  • POSIX (Linux/macOS): start_new_session=True + os.killpg(proc.pid, signal.SIGTERM)
  • Windows: creationflags=subprocess.CREATE_NEW_PROCESS_GROUP + proc.send_signal(signal.CTRL_BREAK_EVENT)

This should be wrapped in a platform check using sys.platform to ensure compatibility across deployment environments.

Comment on lines +55 to +79
result = await self._runtime.exec(
self._command,
payload,
cwd=self._wrapper_dir,
env={**os.environ},
timeout=self._timeout,
)

if not result.stdout.strip():
raise RuntimeError(
"Pi wrapper returned no output. "
f"exit={result.code} stderr={result.stderr[-2000:]}"
)

try:
data = json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise RuntimeError(
"Pi wrapper returned invalid JSON. "
f"stdout={result.stdout[:500]} stderr={result.stderr[-1000:]}"
) from exc

if not data.get("ok"):
raise RuntimeError(f"Pi run failed: {data.get('error')}")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Non-zero wrapper exit codes can be treated as successful runs.

At Line 55 onward, success is decided only from parsed JSON. Add an explicit result.code != 0 failure gate so crashed/erroring wrapper processes cannot return false success.

Suggested fix
         result = await self._runtime.exec(
             self._command,
             payload,
             cwd=self._wrapper_dir,
             env={**os.environ},
             timeout=self._timeout,
         )
 
+        if result.code != 0:
+            raise RuntimeError(
+                "Pi wrapper exited with non-zero status. "
+                f"exit={result.code} stderr={result.stderr[-2000:]}"
+            )
+
         if not result.stdout.strip():
             raise RuntimeError(
                 "Pi wrapper returned no output. "
                 f"exit={result.code} stderr={result.stderr[-2000:]}"
             )

@mmabrouk

Copy link
Copy Markdown
Member Author

Superseded. Replacing the path-based stack with PRs sliced by functional area showing final code only, so reviewers don't comment on intermediate scaffolding that a later PR rewrites. See the new set.

@mmabrouk mmabrouk closed this Jun 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation feature size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants