Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/docs/api/appkit/Variable.agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ const agents: ToPlugin<typeof AgentsPlugin, AgentsPluginConfig, string>;

Plugin factory for the agents plugin. Reads `config/agents/*.md` by default,
resolves toolkits/tools from registered plugins, exposes `appkit.agents.*`
runtime API and mounts `/invocations`.
runtime API and mounts `POST /invocations` and `POST /responses` (aliased
non-streaming invoke endpoints) plus `POST /chat` (streaming, HITL-capable).

## Example

Expand Down
2 changes: 1 addition & 1 deletion docs/docs/api/appkit/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ surface with `@databricks/appkit/beta`. Not meant for application imports.

| Variable | Description |
| ------ | ------ |
| [agents](Variable.agents.md) | Plugin factory for the agents plugin. Reads `config/agents/*.md` by default, resolves toolkits/tools from registered plugins, exposes `appkit.agents.*` runtime API and mounts `/invocations`. |
| [agents](Variable.agents.md) | Plugin factory for the agents plugin. Reads `config/agents/*.md` by default, resolves toolkits/tools from registered plugins, exposes `appkit.agents.*` runtime API and mounts `POST /invocations` and `POST /responses` (aliased non-streaming invoke endpoints) plus `POST /chat` (streaming, HITL-capable). |
| [READ\_ACTIONS](Variable.READ_ACTIONS.md) | Actions that only read data. |
| [sql](Variable.sql.md) | SQL helper namespace |
| [WRITE\_ACTIONS](Variable.WRITE_ACTIONS.md) | Actions that mutate data. |
Expand Down
12 changes: 8 additions & 4 deletions docs/docs/plugins/agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This plugin is currently **beta**. APIs may change between minor releases. Impor
:::
<!-- AUTO-GENERATED: stability-banner-end -->

The `agents` plugin turns a Databricks AppKit app into an AI-agent host. It loads agent definitions from markdown on disk (one folder per agent: `config/agents/<id>/agent.md`), from TypeScript (`createAgent(def)`), or both, and exposes them at `POST /invocations` alongside routes for chat, thread management, and cancellation.
The `agents` plugin turns a Databricks AppKit app into an AI-agent host. It loads agent definitions from markdown on disk (one folder per agent: `config/agents/<id>/agent.md`), from TypeScript (`createAgent(def)`), or both, and exposes them at `POST /invocations` and `POST /responses` (non-streaming, aliases) alongside `POST /chat` (streaming) and routes for thread management, cancellation, and HITL approval.

This page covers the full lifecycle. For the hand-written primitives (`tool()`, `mcpServer()`), see [tools](./server.md).

Expand All @@ -31,7 +31,7 @@ await createApp({
});
```

That alone gives you a live HTTP server with `POST /invocations` wired to a markdown-driven agent.
That alone gives you a live HTTP server with `POST /invocations` (and its alias `POST /responses`) wired to a markdown-driven agent. Use `POST /chat` instead when you want the streaming, HITL-capable surface.

## Level 1: drop a markdown agent package

Expand Down Expand Up @@ -65,7 +65,11 @@ On startup the plugin:

The agent starts with **no tools**. Tools are opt-in — declare them in frontmatter (Level 2 below) or opt into auto-inherit explicitly with `agents({ autoInheritTools: { file: true } })`. See "Auto-inherit posture" further down for what that costs and why it's off by default.

Requests land at `POST /invocations` with an OpenAI Responses-compatible body. Every tool call runs through `asUser(req)` so SQL executes as the requesting user, file access respects Unity Catalog ACLs, and telemetry spans are created automatically.
Requests land at `POST /invocations` (or its alias `POST /responses`) with an OpenAI Responses-compatible body. These endpoints run the agent to completion and return a single JSON response — no SSE. Streaming clients should use `POST /chat`. Every tool call runs through `asUser(req)` so SQL executes as the requesting user, file access respects Unity Catalog ACLs, and telemetry spans are created automatically.

:::warning No HITL on `/invocations` and `/responses`
The non-streaming invoke surface has no way to surface a mid-call approval prompt back to the caller. When `approval.requireForDestructive` is enabled (default) and the resolved agent has any tool annotated with a mutating effect (`effect: "write" | "update" | "destructive"`, or the legacy `destructive: true`), `POST /invocations` and `POST /responses` reject the request with HTTP 400 before the adapter runs. Move HITL-capable agents to `POST /chat`, or disable approval via `agents({ approval: { requireForDestructive: false } })` for autonomous back-office agents.
:::

## Level 2: scope tools in frontmatter

Expand Down Expand Up @@ -370,7 +374,7 @@ The route enforces that the decider is the stream owner: an approve from a diffe

The plugin enforces a handful of caps to protect a single-instance deployment from runaway prompts, misbehaving clients, or prompt-injected delegation cycles. Some are static (enforced by the request schema) and some are configurable via `agents({ limits: { ... } })`.

**Static caps** (applied at `POST /chat` and `POST /invocations` request parsing):
**Static caps** (applied at `POST /chat`, `POST /invocations`, and `POST /responses` request parsing):

| Field | Cap | Why |
|---|---|---|
Expand Down
238 changes: 223 additions & 15 deletions packages/appkit/src/plugins/agents/agents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
IAppRouter,
Message,
PluginPhase,
ResponseOutputMessage,
ResponseStreamEvent,
Thread,
ToolAnnotations,
Expand Down Expand Up @@ -275,7 +276,7 @@ export class AgentsPlugin extends Plugin implements ToolProvider {
const { agents, defaultAgentName } = await this.buildAgentRegistry();
this.agents = agents;
this.defaultAgentName = defaultAgentName;
this.mountInvocationsRoute();
this.mountInvokeRoutes();
this.printRegistry();
}

Expand Down Expand Up @@ -762,15 +763,19 @@ export class AgentsPlugin extends Plugin implements ToolProvider {

// ----------------- Route mounting and handlers ---------------------------

private mountInvocationsRoute() {
/**
* Mount the non-streaming invoke endpoints outside the `/api/<plugin>`
* namespace. `/invocations` and `/responses` are aliases — both run the
* default agent to completion and return a single JSON response. Streaming
* lives on `POST /chat` (mounted in `injectRoutes`).
*/
private mountInvokeRoutes() {
if (!this.context) return;
this.context.addRoute(
"post",
"/invocations",
(req: express.Request, res: express.Response) => {
this._handleInvocations(req, res);
},
);
const handler = (req: express.Request, res: express.Response) => {
this._handleInvoke(req, res);
};
this.context.addRoute("post", "/invocations", handler);
this.context.addRoute("post", "/responses", handler);
}

injectRoutes(router: IAppRouter) {
Expand Down Expand Up @@ -896,10 +901,41 @@ export class AgentsPlugin extends Plugin implements ToolProvider {
return this._streamAgent(req, res, registered, thread, userId);
}

private async _handleInvocations(
req: express.Request,
res: express.Response,
) {
/**
* Returns the names of tools in `registered.toolIndex` whose annotations
* would trip the approval gate. Used by the non-streaming invoke path
* (`/invocations`, `/responses`) to fail-fast before the adapter runs:
* those endpoints have no channel back to the user mid-call, so an agent
* whose tool surface includes approval-gated tools cannot be served.
*
* Returns an empty list when the plugin is configured with
* `approval.requireForDestructive: false` — operators who explicitly
* disabled HITL keep the invoke surface unrestricted.
*/
private collectApprovalRequiredToolNames(
registered: RegisteredAgent,
): string[] {
if (!this.resolvedApprovalPolicy.requireForDestructive) return [];
const names: string[] = [];
for (const entry of registered.toolIndex.values()) {
if (requiresApproval(entry.def.annotations)) {
names.push(entry.def.name);
}
}
return names;
}

/**
* Shared handler for `POST /invocations` and `POST /responses`. Runs the
* default agent to completion and returns a single JSON response in the
* OpenAI Responses non-streaming shape. The two endpoints are aliases —
* streaming clients must use `POST /chat`.
*
* Rejects with HTTP 400 when the resolved agent has any approval-gated
* tool in scope: HITL requires a live SSE channel, which this surface
* does not provide. See {@link collectApprovalRequiredToolNames}.
*/
private async _handleInvoke(req: express.Request, res: express.Response) {
const parsed = invocationsRequestSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
Expand All @@ -914,6 +950,24 @@ export class AgentsPlugin extends Plugin implements ToolProvider {
res.status(400).json({ error: "No agent registered" });
return;
}

// Pre-flight HITL gate. The non-streaming invoke surface has no way to
// surface an approval prompt back to the caller and no way to receive
// a decision mid-run, so we reject up-front instead of having the
// approval gate auto-deny mid-stream (which would leave the caller
// with a confusing "denied by user" tool result in the final text).
const approvalGated = this.collectApprovalRequiredToolNames(registered);
if (approvalGated.length > 0) {
res.status(400).json({
error:
`Agent '${registered.name}' exposes ${approvalGated.length} approval-gated tool(s) ` +
`(${approvalGated.join(", ")}); /invocations and /responses are non-streaming and ` +
"cannot run HITL. Use POST /chat for HITL-capable agents, or disable approval via " +
"agents({ approval: { requireForDestructive: false } }).",
});
return;
}

const userId = this.resolveUserId(req);

// Match the rate-limit gate on /chat. Without this, a client can bypass
Expand Down Expand Up @@ -962,7 +1016,7 @@ export class AgentsPlugin extends Plugin implements ToolProvider {
return;
}

return this._streamAgent(req, res, registered, thread, userId);
return this._runAgentNonStreaming(req, res, registered, thread, userId);
}

private async _streamAgent(
Expand Down Expand Up @@ -1123,6 +1177,159 @@ export class AgentsPlugin extends Plugin implements ToolProvider {
);
}

/**
* Non-streaming counterpart to {@link _streamAgent} used by `/invocations`
* and `/responses`. Drives the adapter to completion, persists the
* assistant turn to the thread store, and returns a single JSON envelope
* shaped like the OpenAI Responses non-streaming API.
*
* No `EventChannel`, no `AgentEventTranslator`, no SSE — the caller is
* waiting on one HTTP response. The approval gate is force-disabled in
* the per-run state as defense-in-depth: `_handleInvoke` already rejects
* up-front if any tool in scope would require approval, but pinning
* `requireForDestructive: false` here means a tool that somehow slips
* past the precheck (e.g. annotations mutated at runtime) still won't
* stall the request waiting for an approval prompt that no one can
* answer.
*
* The `RunState` shape is otherwise unchanged so {@link dispatchToolCall}
* — including sub-agent recursion via {@link runSubAgent} — keeps the
* same tool-call budget, abort signal, and timeout enforcement as the
* streaming path. A still-typed translator is constructed but only
* consulted for `finalize()` so any in-flight `approval_pending` event
* synthesis (which would have been a coding bug given the precheck) is
* a dropped no-op instead of a runtime crash.
*/
private async _runAgentNonStreaming(
req: express.Request,
res: express.Response,
registered: RegisteredAgent,
thread: Thread,
userId: string,
): Promise<void> {
const abortController = new AbortController();
const signal = abortController.signal;
const requestId = randomUUID();
this.trackStream(requestId, userId, abortController);

const tools = Array.from(registered.toolIndex.values()).map((e) => e.def);
const limits = this.resolvedLimits;

const runState: RunState = {
req,
userId,
requestId,
abortController,
signal,
// Force approval off for the non-streaming invoke surface. The
// precheck in `_handleInvoke` already guarantees no approval-gated
// tool is reachable; this is belt-and-braces.
approvalPolicy: { requireForDestructive: false, timeoutMs: 0 },
limits,
translator: new AgentEventTranslator(),
outboundEvents: new EventChannel<ResponseStreamEvent>(),
toolCallsUsed: { count: 0 },
};

const executeTool = (name: string, args: unknown): Promise<unknown> =>
this.dispatchToolCall(runState, registered.toolIndex, name, args, 0);

let fullContent = "";
try {
const pluginNames = this.context
? this.context
.getPluginNames()
.filter((n) => n !== this.name && n !== "server")
: [];
const fullPrompt = composePromptForAgent(
registered,
this.config.baseSystemPrompt,
{
agentName: registered.name,
pluginNames,
toolNames: tools.map((t) => t.name),
},
);

const messagesWithSystem: Message[] = [
{
id: "system",
role: "system",
content: fullPrompt,
createdAt: new Date(),
},
...thread.messages,
];

const stream = registered.adapter.run(
{
messages: messagesWithSystem,
tools,
threadId: thread.id,
signal,
},
{ executeTool, signal },
);

fullContent = await consumeAdapterStream(stream, { signal });

if (fullContent) {
await this.threadStore.addMessage(thread.id, userId, {
id: randomUUID(),
role: "assistant",
content: fullContent,
createdAt: new Date(),
});
}
} catch (error) {
if (signal.aborted) {
res.status(499).json({ error: "Request aborted" });
return;
}
logger.error("Agent invoke error: %O", error);
const message =
process.env.NODE_ENV === "production"
? "Internal server error"
: error instanceof Error
? error.message
: String(error);
res.status(500).json({ error: message });
return;
} finally {
this.approvalGate.abortStream(requestId);
this.untrackStream(requestId);
if (registered.ephemeral) {
try {
await this.threadStore.delete(thread.id, userId);
} catch (err) {
logger.warn(
"Failed to delete ephemeral thread %s: %O",
thread.id,
err,
);
}
}
}

const responseId = `resp_${randomUUID()}`;
const messageId = `msg_${randomUUID()}`;
const message: ResponseOutputMessage = {
type: "message",
id: messageId,
status: "completed",
role: "assistant",
content: [{ type: "output_text", text: fullContent }],
};
res.json({
id: responseId,
object: "response",
created_at: Math.floor(Date.now() / 1000),
status: "completed",
thread_id: thread.id,
output: [message],
});
}

/**
* Dispatch a single tool call from either the top-level adapter or a
* sub-agent. Centralising this in one method is what makes the budget
Expand Down Expand Up @@ -1528,7 +1735,8 @@ function composePromptForAgent(
/**
* Plugin factory for the agents plugin. Reads `config/agents/*.md` by default,
* resolves toolkits/tools from registered plugins, exposes `appkit.agents.*`
* runtime API and mounts `/invocations`.
* runtime API and mounts `POST /invocations` and `POST /responses` (aliased
* non-streaming invoke endpoints) plus `POST /chat` (streaming, HITL-capable).
*
* @example
* ```ts
Expand Down
1 change: 0 additions & 1 deletion packages/appkit/src/plugins/agents/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ export const invocationsRequestSchema = z.object({
`input array exceeds the ${MAX_INVOCATIONS_INPUT_ITEMS}-item limit`,
),
]),
stream: z.boolean().optional().default(true),
model: z.string().optional(),
});

Expand Down
4 changes: 2 additions & 2 deletions packages/appkit/src/plugins/agents/tests/dos-limits.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ describe("POST /chat — per-user concurrent-stream limit", () => {
const { res, setHeader, json } = mockRes();
await (
plugin as unknown as {
_handleInvocations: (
_handleInvoke: (
r: express.Request,
w: express.Response,
) => Promise<void>;
}
)._handleInvocations(mockReq({ input: "hi" }, "alice"), res);
)._handleInvoke(mockReq({ input: "hi" }, "alice"), res);

expect(res.status).toHaveBeenCalledWith(429);
expect(setHeader).toHaveBeenCalledWith("Retry-After", "5");
Expand Down
Loading
Loading