diff --git a/agent-service/src/agent/texera-agent.ts b/agent-service/src/agent/texera-agent.ts index 37eb12d8688..62cf20f145e 100644 --- a/agent-service/src/agent/texera-agent.ts +++ b/agent-service/src/agent/texera-agent.ts @@ -79,7 +79,7 @@ type ReActStepCallback = (step: ReActStep) => void; * workflow being edited (`WorkflowState`), cached operator execution results * (`WorkflowResultState`), and the tool surface exposed to the LLM. Each call * to `sendMessage` drives one multi-step generation via the Vercel AI SDK, - * streaming step updates to subscribed websockets. + * streaming step updates to subscribed clients. */ export class TexeraAgent { readonly agentId: string; @@ -95,7 +95,7 @@ export class TexeraAgent { private stepCounter = 0; private workflowResultState: WorkflowResultState; - private websockets: Set = new Set(); + private clients: Set = new Set(); private model: LanguageModel; private systemPrompt: string; @@ -266,16 +266,16 @@ export class TexeraAgent { return this.workflowResultState; } - getWebsockets(): Set { - return this.websockets; + getClients(): Set { + return this.clients; } - addWebsocket(ws: any): void { - this.websockets.add(ws); + addClient(ws: any): void { + this.clients.add(ws); } - removeWebsocket(ws: any): void { - this.websockets.delete(ws); + removeClient(ws: any): void { + this.clients.delete(ws); } getReActSteps(): ReActStep[] { @@ -831,7 +831,7 @@ export class TexeraAgent { this.workflowState.destroy(); - this.websockets.clear(); + this.clients.clear(); this.reActStepsByMessageId.clear(); this.stepsById.clear(); diff --git a/agent-service/src/agent/tools/result-formatting.test.ts b/agent-service/src/agent/tools/result-formatting.spec.ts similarity index 100% rename from agent-service/src/agent/tools/result-formatting.test.ts rename to agent-service/src/agent/tools/result-formatting.spec.ts diff --git a/agent-service/src/agent/tools/tools-utility.test.ts b/agent-service/src/agent/tools/tools-utility.spec.ts similarity index 100% rename from agent-service/src/agent/tools/tools-utility.test.ts rename to agent-service/src/agent/tools/tools-utility.spec.ts diff --git a/agent-service/src/agent/util/auto-layout.test.ts b/agent-service/src/agent/util/auto-layout.spec.ts similarity index 100% rename from agent-service/src/agent/util/auto-layout.test.ts rename to agent-service/src/agent/util/auto-layout.spec.ts diff --git a/agent-service/src/agent/workflow-result-state.test.ts b/agent-service/src/agent/workflow-result-state.spec.ts similarity index 100% rename from agent-service/src/agent/workflow-result-state.test.ts rename to agent-service/src/agent/workflow-result-state.spec.ts diff --git a/agent-service/src/agent/workflow-state.test.ts b/agent-service/src/agent/workflow-state.spec.ts similarity index 100% rename from agent-service/src/agent/workflow-state.test.ts rename to agent-service/src/agent/workflow-state.spec.ts diff --git a/agent-service/src/server.test.ts b/agent-service/src/server.spec.ts similarity index 59% rename from agent-service/src/server.test.ts rename to agent-service/src/server.spec.ts index b8de8736bd9..2024c634138 100644 --- a/agent-service/src/server.test.ts +++ b/agent-service/src/server.spec.ts @@ -17,8 +17,9 @@ * under the License. */ -import { beforeEach, describe, expect, test } from "bun:test"; -import { buildApp, _resetAgentStoreForTests } from "./server"; +import { beforeEach, describe, expect, spyOn, test } from "bun:test"; +import { buildApp, start, _resetAgentStoreForTests, _getAgentForTests } from "./server"; +import { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata"; import { env } from "./config/env"; const API = env.API_PREFIX; @@ -249,3 +250,157 @@ describe(`PATCH ${API}/agents/:id/settings`, () => { expect(reread.toolTimeoutSeconds).toBe(30); }); }); + +describe("agent creation edge cases", () => { + test("rejects an empty modelType", async () => { + // The body schema accepts any string, so the handler's own guard runs. + const res = await postJson(`${API}/agents`, { modelType: "" }, { Authorization: `Bearer ${TOKEN}` }); + expect(res.status).toBe(400); + expect((await readJson<{ error: string }>(res)).error).toContain("modelType"); + }); + + test("applies initial settings supplied at creation time", async () => { + const res = await createAgent({ settings: { maxSteps: 9, toolTimeoutSeconds: 12 } }); + expect(res.status).toBe(200); + const body = await readJson<{ settings: { maxSteps: number; toolTimeoutSeconds: number } }>(res); + expect(body.settings.maxSteps).toBe(9); + expect(body.settings.toolTimeoutSeconds).toBe(12); + }); + + test("creates the agent even when the workflow load fails (non-fatal)", async () => { + // retrieveWorkflow targets the (unavailable) dashboard service; the failure + // is caught and the agent is still created. + const res = await createAgent({ workflowId: 123 }); + expect(res.status).toBe(200); + }); + + test("masks the delegate token in agent info", async () => { + const id = (await readJson<{ id: string }>(await createAgent())).id; + _getAgentForTests(id)!.setDelegateConfig({ + userToken: "super-secret", + userInfo: { uid: 1, email: "tester@example.com" }, + workflowId: 5, + workflowName: "My Flow", + computingUnitId: 2, + } as any); + + const info = await readJson<{ delegate?: { userToken: string; workflowName: string } }>( + await getJson(`${API}/agents/${id}`) + ); + expect(info.delegate?.userToken).toBe("***"); + expect(info.delegate?.workflowName).toBe("My Flow"); + }); +}); + +describe("agent read routes", () => { + let id: string; + beforeEach(async () => { + id = (await readJson<{ id: string }>(await createAgent())).id; + }); + + test("GET /:id/react-steps returns steps and state", async () => { + const body = await readJson<{ steps: unknown[]; state: string }>(await getJson(`${API}/agents/${id}/react-steps`)); + expect(Array.isArray(body.steps)).toBe(true); + expect(body.state).toBe("AVAILABLE"); + }); + + test("GET /:id/system-info responds", async () => { + const res = await getJson(`${API}/agents/${id}/system-info`); + expect(res.status).toBe(200); + }); + + test("GET /:id/operator-types returns a list", async () => { + const res = await getJson(`${API}/agents/${id}/operator-types`); + expect(res.status).toBe(200); + expect(Array.isArray(await readJson(res))).toBe(true); + }); + + test("POST /:id/steps-by-operators returns steps", async () => { + const res = await postJson(`${API}/agents/${id}/steps-by-operators`, { operatorIds: [] }); + expect(res.status).toBe(200); + expect(Array.isArray((await readJson<{ steps: unknown[] }>(res)).steps)).toBe(true); + }); + + test("GET /:id/operator-results maps the visible operator results", async () => { + const agent = _getAgentForTests(id)!; + (agent as any).getWorkflowResultState = () => ({ + getAllVisible: () => + new Map([ + [ + "op-1", + { + operatorInfo: { + state: "COMPLETED", + inputTuples: 1, + outputTuples: 2, + inputPortShapes: [], + result: [{ a: 1 }], + error: undefined, + warnings: [], + consoleLogs: [], + totalRowCount: 2, + resultStatistics: {}, + }, + }, + ], + ]), + }); + + const body = await readJson<{ results: Record }>( + await getJson(`${API}/agents/${id}/operator-results`) + ); + expect(body.results["op-1"].outputTuples).toBe(2); + expect(body.results["op-1"].outputColumns).toBe(1); + }); +}); + +describe("checkout route", () => { + test("broadcasts and survives a websocket whose send throws", async () => { + const id = (await readJson<{ id: string }>(await createAgent())).id; + const agent = _getAgentForTests(id)!; + (agent as any).checkout = () => true; + (agent as any).getAllSteps = () => []; + // A failing socket must be dropped inside broadcastToAgentClients, not crash the request. + agent.addClient({ + send: () => { + throw new Error("send failed"); + }, + } as any); + + const res = await postJson(`${API}/agents/${id}/checkout`, { stepId: "step-1" }); + expect(res.status).toBe(200); + expect((await readJson<{ headId: string }>(res)).headId).toBe("step-1"); + }); + + test("returns 500 when the step cannot be found", async () => { + const id = (await readJson<{ id: string }>(await createAgent())).id; + (_getAgentForTests(id) as any).checkout = () => false; + const res = await postJson(`${API}/agents/${id}/checkout`, { stepId: "missing" }); + expect(res.status).toBe(500); + }); +}); + +describe("non-router routes", () => { + test("unknown routes fall through to the catch-all error handler", async () => { + const res = await getJson("/no-such-route"); + expect(res.status).toBe(500); + }); +}); + +describe("start()", () => { + test("boots a listening app and prints the startup banner", async () => { + const booted = await start(); + expect(typeof booted.server?.port).toBe("number"); + await booted.stop(); + }); + + test("tolerates a metadata-initialization failure", async () => { + const spy = spyOn(WorkflowSystemMetadata, "initializeGlobal").mockImplementation(async () => { + throw new Error("metadata unavailable"); + }); + const booted = await start(); + await booted.stop(); + expect(spy).toHaveBeenCalled(); + spy.mockRestore(); + }); +}); diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index 0da3f693797..9370e7b5c18 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -39,7 +39,16 @@ import type { AgentSettingsApi, ReActStep, } from "./types/agent"; -import { OperatorResultSerializationMode } from "./types/agent"; +import { AgentState, OperatorResultSerializationMode } from "./types/agent"; +import type { WsClientCommand, WsServerEvent } from "./types/ws"; +import { + WsServerSnapshotEvent, + WsServerStepEvent, + WsServerStatusEvent, + WsServerErrorEvent, + WsServerHeadChangeEvent, +} from "./types/ws"; +import type { OperatorResultSummary } from "./types/execution"; const agentStore = new Map(); let agentCounter = 0; @@ -319,13 +328,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) const allSteps = agent.getAllSteps(); const workflowContent = agent.getWorkflowState().getWorkflowContent(); - broadcastToAgent(id, { - type: "headChange", - headId: stepId, - steps: allSteps, - workflowContent, - operatorResults: getOperatorResultSummaries(agent), - }); + broadcastToAgentClients(id, new WsServerHeadChangeEvent(stepId, allSteps, workflowContent)); return { status: "checked out", @@ -410,41 +413,10 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) } ); -interface WsMessage { - type: "message" | "stop"; - content?: string; - messageSource?: "chat" | "feedback"; -} - -interface OperatorResultSummaryWs { - state: string; - inputTuples: number; - outputTuples: number; - inputPortShapes?: { portIndex: number; rows: number; columns: number }[]; - outputColumns?: number; - error?: string; - warnings?: string[]; - consoleLogCount?: number; - totalRowCount?: number; - sampleRecords?: Record[]; - resultStatistics?: Record; -} - -interface WsOutgoingMessage { - type: "step" | "state" | "error" | "complete" | "init" | "headChange"; - step?: ReActStep; - state?: string; - error?: string; - steps?: ReActStep[]; - headId?: string; - operatorResults?: Record; - workflowContent?: any; -} - -function getOperatorResultSummaries(agent: TexeraAgent): Record { +function getOperatorResultSummaries(agent: TexeraAgent): Record { const resultState = agent.getWorkflowResultState(); const visible = resultState.getAllVisible(); - const results: Record = {}; + const results: Record = {}; for (const [opId, entry] of visible) { const info = entry.operatorInfo; results[opId] = { @@ -464,17 +436,24 @@ function getOperatorResultSummaries(agent: TexeraAgent): Record { - const hasToolCalls = step.toolCalls && step.toolCalls.length > 0; - broadcastToAgent(agentId, { - type: "step", - step, - ...(hasToolCalls ? { operatorResults: getOperatorResultSummaries(agent) } : {}), - }); - }); - - broadcastToAgent(agentId, { type: "state", state: "GENERATING" }); - - try { - const result = await agent.sendMessage(msg.content, msg.messageSource); - - agent.setStepCallback(null); - - const allSteps = agent.getReActSteps(); - const lastStep = allSteps[allSteps.length - 1]; - if (lastStep && lastStep.isEnd) { - broadcastToAgent(agentId, { type: "step", step: lastStep }); + case "WsClientPromptCommand": { + if (!msg.content || typeof msg.content !== "string") { + sendEventToClient(ws, new WsServerErrorEvent("Message content is required")); + return; } - broadcastToAgent(agentId, { - type: "complete", - state: agent.getState(), - operatorResults: getOperatorResultSummaries(agent), + wsLog.info({ agentId, preview: msg.content.substring(0, 50) }, "received command"); + + agent.setStepCallback((step: ReActStep) => { + broadcastToAgentClients(agentId, new WsServerStepEvent(step)); }); - wsLog.info({ agentId, steps: result.messages.length }, "agent run complete"); - } catch (error: any) { - agent.setStepCallback(null); - broadcastToAgent(agentId, { type: "error", error: error.message }); + broadcastToAgentClients(agentId, new WsServerStatusEvent(AgentState.GENERATING)); + + try { + const result = await agent.sendMessage(msg.content, msg.messageSource); + + agent.setStepCallback(null); + + const allSteps = agent.getReActSteps(); + const lastStep = allSteps[allSteps.length - 1]; + if (lastStep && lastStep.isEnd) { + broadcastToAgentClients(agentId, new WsServerStepEvent(lastStep)); + } + + wsLog.info({ agentId, steps: result.messages.length }, "agent run complete"); + } catch (error: any) { + agent.setStepCallback(null); + broadcastToAgentClients(agentId, new WsServerErrorEvent(error.message)); + } finally { + // The run is over (success or failure) and TexeraAgent.sendMessage has + // reset the agent to its resting state (AVAILABLE) in its own finally. + // This status frame is the run-end signal (it also unsticks the client + // from GENERATING after errors). + broadcastToAgentClients(agentId, new WsServerStatusEvent(agent.getState())); + } + return; } + + default: + // Frames are parsed from untrusted JSON; reject unknown discriminators + // explicitly instead of silently no-op'ing, so client/server mismatches + // are easy to diagnose. + sendEventToClient(ws, new WsServerErrorEvent(`Unknown message type: ${(msg as { type?: unknown }).type}`)); } }, @@ -587,7 +562,7 @@ export function buildApp() { const agent = agentStore.get(agentId); if (agent) { - agent.removeWebsocket(ws); + agent.removeClient(ws); } }, }) @@ -605,6 +580,12 @@ export function _resetAgentStoreForTests(): void { agentCounter = 0; } +// Look up an agent instance by id. Used by tests to stub agent behavior (e.g. +// `sendMessage`) when exercising the WebSocket handlers. +export function _getAgentForTests(agentId: string): TexeraAgent | undefined { + return agentStore.get(agentId); +} + function printStartupMessage(app: ReturnType) { const LINE = "=".repeat(60); console.log(LINE); @@ -630,9 +611,11 @@ function printStartupMessage(app: ReturnType) { for (const route of wsRoutes) { console.log(` WS ${route.path}`); } - console.log(" Send: { type: 'message', content: '...' }"); - console.log(" Send: { type: 'stop' }"); - console.log(" Recv: { type: 'step' | 'state' | 'complete' | 'error' | 'init', ... }"); + console.log(" Send: { type: 'WsClientPromptCommand', content: '...' }"); + console.log(" Send: { type: 'WsClientStopCommand' }"); + console.log( + " Recv: { type: 'WsServerSnapshotEvent' | 'WsServerStepEvent' | 'WsServerStatusEvent' | 'WsServerErrorEvent' | 'WsServerHeadChangeEvent', ... }" + ); } console.log(""); @@ -665,6 +648,4 @@ export async function start() { // Run the server only when this file is the entry point, not when it is // imported by tests or other modules. -if (import.meta.main) { - start(); -} +if (import.meta.main) start(); diff --git a/agent-service/src/server.ws.spec.ts b/agent-service/src/server.ws.spec.ts new file mode 100644 index 00000000000..cbb1ffb8ebd --- /dev/null +++ b/agent-service/src/server.ws.spec.ts @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Exercises the /agents/:id/react WebSocket protocol end to end: the snapshot +// sent on connect, the status lifecycle frames, the stop command, the prompt +// request (with a stubbed run), and the error paths. These drive the real +// socket via app.listen + a WebSocket client, since app.handle() does not +// perform WS upgrades. + +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test } from "bun:test"; +import { buildApp, _resetAgentStoreForTests, _getAgentForTests } from "./server"; +import { env } from "./config/env"; + +const API = env.API_PREFIX; + +let app: ReturnType; +let port: number; +const openSockets: WebSocket[] = []; + +function mintTestToken(): string { + const header = Buffer.from(JSON.stringify({ alg: "HS256", typ: "JWT" })).toString("base64url"); + const payload = Buffer.from( + JSON.stringify({ + sub: "tester", + userId: 1, + email: "tester@example.com", + role: "REGULAR", + exp: Math.floor(Date.now() / 1000) + 3600, + }) + ).toString("base64url"); + return `${header}.${payload}.test-signature`; +} + +const TOKEN = mintTestToken(); + +async function createAgent(): Promise { + const res = await app.handle( + new Request(`http://localhost${API}/agents`, { + method: "POST", + headers: { "Content-Type": "application/json", Authorization: `Bearer ${TOKEN}` }, + body: JSON.stringify({ modelType: "test-model" }), + }) + ); + const body = (await res.json()) as { id: string }; + return body.id; +} + +interface Collector { + waitFor(predicate: (m: any) => boolean, timeoutMs?: number): Promise; +} + +// Attaches a message listener immediately (before `open`) so no frame — not even +// the snapshot the server sends on connect — is missed, then resolves waiters +// from a buffer. +function collect(ws: WebSocket): Collector { + const buffer: any[] = []; + const waiters: { predicate: (m: any) => boolean; resolve: (m: any) => void }[] = []; + ws.addEventListener("message", ev => { + let data: any; + try { + data = JSON.parse(ev.data as string); + } catch { + return; + } + buffer.push(data); + const i = waiters.findIndex(w => w.predicate(data)); + if (i >= 0) { + waiters[i].resolve(data); + waiters.splice(i, 1); + } + }); + return { + waitFor(predicate, timeoutMs = 2000) { + const found = buffer.find(predicate); + if (found) return Promise.resolve(found); + return new Promise((resolve, reject) => { + let timer: ReturnType; + const w = { + predicate, + resolve: (m: any) => { + clearTimeout(timer); + resolve(m); + }, + }; + waiters.push(w); + timer = setTimeout(() => { + const idx = waiters.indexOf(w); + if (idx >= 0) { + waiters.splice(idx, 1); + reject(new Error("timed out waiting for a matching WS frame")); + } + }, timeoutMs); + }); + }, + }; +} + +function connect(agentId: string): { ws: WebSocket; messages: Collector } { + const ws = new WebSocket(`ws://localhost:${port}${API}/agents/${agentId}/react`); + openSockets.push(ws); + return { ws, messages: collect(ws) }; +} + +function waitOpen(ws: WebSocket): Promise { + if (ws.readyState === WebSocket.OPEN) return Promise.resolve(); + return new Promise((resolve, reject) => { + ws.addEventListener("open", () => resolve(), { once: true }); + ws.addEventListener("error", () => reject(new Error("WS connection error")), { once: true }); + }); +} + +beforeAll(() => { + app = buildApp(); + app.listen(0); + port = app.server?.port ?? 0; +}); + +afterAll(() => { + app.stop(); +}); + +beforeEach(() => { + _resetAgentStoreForTests(); +}); + +afterEach(() => { + while (openSockets.length) { + try { + openSockets.pop()?.close(); + } catch { + // ignore + } + } +}); + +describe(`WS ${API}/agents/:id/react`, () => { + test("sends a results-free snapshot frame on connect", async () => { + const id = await createAgent(); + const { ws, messages } = connect(id); + await waitOpen(ws); + + const snapshot = await messages.waitFor(m => m.type === "WsServerSnapshotEvent"); + expect(snapshot.state).toBe("AVAILABLE"); + expect(Array.isArray(snapshot.steps)).toBe(true); + expect(typeof snapshot.headId).toBe("string"); + // Results are pulled on demand, never pushed on the snapshot. + expect("operatorResults" in snapshot).toBe(false); + }); + + test("errors and closes when connecting to an unknown agent", async () => { + const { messages } = connect("agent-does-not-exist"); + const err = await messages.waitFor(m => m.type === "WsServerErrorEvent"); + expect(err.error).toBe("Agent not found"); + }); + + test("a stop command broadcasts a STOPPING status frame", async () => { + const id = await createAgent(); + const { ws, messages } = connect(id); + await waitOpen(ws); + await messages.waitFor(m => m.type === "WsServerSnapshotEvent"); + + ws.send(JSON.stringify({ type: "WsClientStopCommand" })); + + const status = await messages.waitFor(m => m.type === "WsServerStatusEvent"); + expect(status.state).toBe("STOPPING"); + }); + + test("a prompt with empty content yields an error frame", async () => { + const id = await createAgent(); + const { ws, messages } = connect(id); + await waitOpen(ws); + await messages.waitFor(m => m.type === "WsServerSnapshotEvent"); + + ws.send(JSON.stringify({ type: "WsClientPromptCommand", content: "" })); + + const err = await messages.waitFor(m => m.type === "WsServerErrorEvent"); + expect(err.error).toBe("Message content is required"); + }); + + test("a malformed (non-JSON) frame yields an error frame", async () => { + const id = await createAgent(); + const { ws, messages } = connect(id); + await waitOpen(ws); + await messages.waitFor(m => m.type === "WsServerSnapshotEvent"); + + ws.send("this is not json"); + + const err = await messages.waitFor(m => m.type === "WsServerErrorEvent"); + expect(err.error).toBe("Invalid message format"); + }); + + test("an unknown message type yields an error frame", async () => { + const id = await createAgent(); + const { ws, messages } = connect(id); + await waitOpen(ws); + await messages.waitFor(m => m.type === "WsServerSnapshotEvent"); + + ws.send(JSON.stringify({ type: "bogus" })); + + const err = await messages.waitFor(m => m.type === "WsServerErrorEvent"); + expect(err.error).toBe("Unknown message type: bogus"); + }); + + test("a prompt run streams GENERATING -> step -> resting status (no result frames)", async () => { + const id = await createAgent(); + + // Stub the agent's run so no live LLM is needed: emit one ending step via + // the registered step callback, then return. + const agent = _getAgentForTests(id)!; + (agent as any).sendMessage = async function (this: any) { + this.stepCallback?.({ + id: "step-1", + parentId: "init", + messageId: "m1", + stepId: 1, + timestamp: 0, + role: "agent", + content: "done", + isBegin: true, + isEnd: true, + }); + return { + response: "done", + messages: [], + usage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, + stopped: false, + }; + }; + // The server re-broadcasts the final step (with isEnd) after the run. + (agent as any).getReActSteps = () => [ + { + id: "step-1", + parentId: "init", + messageId: "m1", + stepId: 1, + timestamp: 0, + role: "agent", + content: "done", + isBegin: true, + isEnd: true, + }, + ]; + + const { ws, messages } = connect(id); + await waitOpen(ws); + await messages.waitFor(m => m.type === "WsServerSnapshotEvent"); + + ws.send(JSON.stringify({ type: "WsClientPromptCommand", content: "hello" })); + + const generating = await messages.waitFor(m => m.type === "WsServerStatusEvent" && m.state === "GENERATING"); + expect(generating.state).toBe("GENERATING"); + + const step = await messages.waitFor(m => m.type === "WsServerStepEvent"); + expect(step.step.content).toBe("done"); + expect("operatorResults" in step).toBe(false); + + const resting = await messages.waitFor(m => m.type === "WsServerStatusEvent" && m.state === "AVAILABLE"); + expect(resting.state).toBe("AVAILABLE"); + }); + + test("a failed run emits an error frame and still returns to a resting status", async () => { + const id = await createAgent(); + + const agent = _getAgentForTests(id)!; + (agent as any).sendMessage = async function () { + throw new Error("boom"); + }; + + const { ws, messages } = connect(id); + await waitOpen(ws); + await messages.waitFor(m => m.type === "WsServerSnapshotEvent"); + + ws.send(JSON.stringify({ type: "WsClientPromptCommand", content: "hello" })); + + await messages.waitFor(m => m.type === "WsServerStatusEvent" && m.state === "GENERATING"); + + const err = await messages.waitFor(m => m.type === "WsServerErrorEvent"); + expect(err.error).toBe("boom"); + + // The end-of-run status frame must still fire after a failure, so the client + // is not left stuck on GENERATING. + const resting = await messages.waitFor(m => m.type === "WsServerStatusEvent" && m.state === "AVAILABLE"); + expect(resting.state).toBe("AVAILABLE"); + }); + + test("a message for an agent that no longer exists yields an error frame", async () => { + const id = await createAgent(); + const { ws, messages } = connect(id); + await waitOpen(ws); + await messages.waitFor(m => m.type === "WsServerSnapshotEvent"); + + // Drop the agent while the socket stays open; the message handler re-looks it up. + _resetAgentStoreForTests(); + ws.send(JSON.stringify({ type: "WsClientPromptCommand", content: "hello" })); + + const err = await messages.waitFor(m => m.type === "WsServerErrorEvent"); + expect(err.error).toBe("Agent not found"); + }); + + test("runs the close handler when the client disconnects", async () => { + const id = await createAgent(); + const { ws, messages } = connect(id); + await waitOpen(ws); + await messages.waitFor(m => m.type === "WsServerSnapshotEvent"); + + const closed = new Promise(resolve => ws.addEventListener("close", () => resolve(), { once: true })); + ws.close(); + await closed; + // Let the server process the disconnect (its close handler runs here). + await new Promise(resolve => setTimeout(resolve, 50)); + + expect(ws.readyState).toBe(WebSocket.CLOSED); + }); +}); diff --git a/agent-service/src/types/execution.ts b/agent-service/src/types/execution.ts index f93be5c583e..d638a889d47 100644 --- a/agent-service/src/types/execution.ts +++ b/agent-service/src/types/execution.ts @@ -51,3 +51,22 @@ export interface SyncExecutionResult { compilationErrors?: Record; errors?: string[]; } + +/** + * Wire projection of one operator's execution result, summarized for the + * client: counts and a small record sample instead of full payloads. Returned + * by the REST route `GET /agents/:id/operator-results`. + */ +export interface OperatorResultSummary { + state: string; + inputTuples: number; + outputTuples: number; + inputPortShapes?: PortShape[]; + outputColumns?: number; + error?: string; + warnings?: string[]; + consoleLogCount?: number; + totalRowCount?: number; + sampleRecords?: Record[]; + resultStatistics?: Record; +} diff --git a/agent-service/src/types/index.ts b/agent-service/src/types/index.ts index c6d7291e51d..498f5a9c9af 100644 --- a/agent-service/src/types/index.ts +++ b/agent-service/src/types/index.ts @@ -20,3 +20,4 @@ export * from "./workflow"; export * from "./execution"; export * from "./agent"; +export * from "./ws"; diff --git a/agent-service/src/types/ws/client.ts b/agent-service/src/types/ws/client.ts new file mode 100644 index 00000000000..2983ed1d6c5 --- /dev/null +++ b/agent-service/src/types/ws/client.ts @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Client -> server WebSocket frames for this service's protocol +// (`/agents/:id/react`). Each frame is a class whose `type` discriminator +// equals its class name, so `new WsClientPromptCommand(...)` sets the wire tag +// for you. `WsClientCommand` is their discriminated union. + +/** Send a prompt to the agent to start (or continue) its ReAct loop. */ +export class WsClientPromptCommand { + readonly type = "WsClientPromptCommand"; + constructor( + readonly content: string, + readonly messageSource?: "chat" | "feedback" + ) {} +} + +/** Stop the agent's in-flight ReAct loop. Carries no payload. */ +export class WsClientStopCommand { + readonly type = "WsClientStopCommand"; +} + +/** Discriminated union of every client -> server frame. */ +export type WsClientCommand = WsClientPromptCommand | WsClientStopCommand; diff --git a/agent-service/src/types/ws/index.ts b/agent-service/src/types/ws/index.ts new file mode 100644 index 00000000000..38218f34460 --- /dev/null +++ b/agent-service/src/types/ws/index.ts @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// WebSocket frames for this service's own protocol (/agents/:id/react): +// inbound client commands and the outbound server events it pushes back. + +export * from "./client"; +export * from "./server"; diff --git a/agent-service/src/types/ws/server.ts b/agent-service/src/types/ws/server.ts new file mode 100644 index 00000000000..019d2a1717a --- /dev/null +++ b/agent-service/src/types/ws/server.ts @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Server -> client WebSocket frames for this service's protocol +// (`/agents/:id/react`). Each frame is a class whose `type` discriminator +// equals its class name, so `new WsServerStatusEvent(...)` sets the wire tag +// for you. `WsServerEvent` is their discriminated union. + +import type { AgentState, ReActStep } from "../agent"; +import type { WorkflowContent } from "../workflow"; + +/** + * Full state pushed once when a client connects: the agent's current lifecycle + * state, the complete step list, and the HEAD pointer. Operator results are not + * included — they are pulled on demand via `GET /agents/:id/operator-results`. + */ +export class WsServerSnapshotEvent { + readonly type = "WsServerSnapshotEvent"; + constructor( + readonly state: AgentState, + readonly steps: ReActStep[], + readonly headId: string + ) {} +} + +/** A single ReAct step, streamed live as the agent runs. */ +export class WsServerStepEvent { + readonly type = "WsServerStepEvent"; + constructor(readonly step: ReActStep) {} +} + +/** + * An agent lifecycle transition (e.g. GENERATING when a run starts, the resting + * state when it ends, STOPPING on stop). + */ +export class WsServerStatusEvent { + readonly type = "WsServerStatusEvent"; + constructor(readonly state: AgentState) {} +} + +/** An error surfaced to the client (agent not found, bad request, failed run). */ +export class WsServerErrorEvent { + readonly type = "WsServerErrorEvent"; + constructor(readonly error: string) {} +} + +/** + * Emitted after a checkout: HEAD moved, carrying the full step list and the + * workflow snapshot at the new head. + * + * @deprecated Unused by the frontend/UI — nothing invokes the client's + * `checkoutStep()`. The `/agents/:id/checkout` route (and its tests) still + * broadcast it, so it remains reachable; do not build new UI on it. + */ +export class WsServerHeadChangeEvent { + readonly type = "WsServerHeadChangeEvent"; + constructor( + readonly headId: string, + readonly steps: ReActStep[], + readonly workflowContent?: WorkflowContent + ) {} +} + +/** Discriminated union of every server -> client frame. */ +export type WsServerEvent = + | WsServerSnapshotEvent + | WsServerStepEvent + | WsServerStatusEvent + | WsServerErrorEvent + | WsServerHeadChangeEvent; diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.spec.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.spec.ts index 81a84081a5d..7250b8686c8 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.spec.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.spec.ts @@ -28,6 +28,7 @@ import { workflowEditorTestImports, workflowEditorTestProviders } from "./workfl import { OperatorMetadataService } from "../../service/operator-metadata/operator-metadata.service"; import { StubOperatorMetadataService } from "../../service/operator-metadata/stub-operator-metadata.service"; import { JointUIService } from "../../service/joint-ui/joint-ui.service"; +import { AgentService } from "../../service/agent/agent.service"; import { NzModalModule } from "ng-zorro-antd/modal"; import { Overlay } from "@angular/cdk/overlay"; import * as joint from "jointjs"; @@ -291,6 +292,34 @@ describe("WorkflowEditorComponent", () => { expect(jointHighlighterElementAfterUnhighlight.length).toEqual(0); }); + it("pulls the active agent's operator results when an operator's chat popover opens", () => { + workflowActionService.addOperator(mockScanPredicate, mockPoint); + const jointCellView = component.paper.findViewByModel(mockScanPredicate.operatorID); + + const agentService = TestBed.inject(AgentService); + vi.spyOn(agentService, "getActivelyConnectedAgentIds").mockReturnValue(["agent-1"]); + const fetchSpy = vi.spyOn(agentService, "fetchOperatorResults").mockImplementation(() => {}); + + // The operator's chat button fires `element:chat` (cell view, DOM event, x, y); + // opening the popover should pull the active agent's results on demand. + (component.paper as any).trigger("element:chat", jointCellView, new Event("click"), 0, 0); + + expect(fetchSpy).toHaveBeenCalledWith("agent-1"); + }); + + it("does not pull operator results when no agent is connected", () => { + workflowActionService.addOperator(mockScanPredicate, mockPoint); + const jointCellView = component.paper.findViewByModel(mockScanPredicate.operatorID); + + const agentService = TestBed.inject(AgentService); + vi.spyOn(agentService, "getActivelyConnectedAgentIds").mockReturnValue([]); + const fetchSpy = vi.spyOn(agentService, "fetchOperatorResults").mockImplementation(() => {}); + + (component.paper as any).trigger("element:chat", jointCellView, new Event("click"), 0, 0); + + expect(fetchSpy).not.toHaveBeenCalled(); + }); + it("should react to operator validation and change the color of operator box if the operator is valid ", () => { workflowActionService.getJointGraphWrapper(); workflowActionService.addOperator(mockScanPredicate, mockPoint); diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index 1a9eb89e05f..230b867c11e 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -1616,6 +1616,12 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy displayName, position, }; + // Results are pulled on demand (not pushed over the socket); refresh + // the active agent's summaries so the popover shows current data. + const activeAgentId = this.agentService.getActivelyConnectedAgentIds()[0]; + if (activeAgentId) { + this.agentService.fetchOperatorResults(activeAgentId); + } } } this.changeDetectorRef.detectChanges(); diff --git a/frontend/src/app/workspace/service/agent/agent.service.spec.ts b/frontend/src/app/workspace/service/agent/agent.service.spec.ts index cacf82c40d4..1f9bcd82591 100644 --- a/frontend/src/app/workspace/service/agent/agent.service.spec.ts +++ b/frontend/src/app/workspace/service/agent/agent.service.spec.ts @@ -19,7 +19,7 @@ import { TestBed } from "@angular/core/testing"; import { HttpClientTestingModule, HttpTestingController } from "@angular/common/http/testing"; -import { AgentService, AgentInfo } from "./agent.service"; +import { AgentService, AgentInfo, OperatorResultSummary } from "./agent.service"; import { NotificationService } from "../../../common/service/notification/notification.service"; import { WorkflowPersistService } from "../../../common/service/workflow-persist/workflow-persist.service"; import { ComputingUnitStatusService } from "../../../common/service/computing-unit/computing-unit-status/computing-unit-status.service"; @@ -92,4 +92,61 @@ describe("AgentService", () => { req.flush(apiAgent); }); }); + + describe("fetchOperatorResults", () => { + it("pulls operator results over REST and pushes them to operatorResultSummaries$", () => { + let latest: Map | undefined; + service.operatorResultSummaries$.subscribe(m => (latest = m)); + + service.fetchOperatorResults("agent-1"); + + const req = httpMock.expectOne(r => r.method === "GET" && r.url === "/api/agents/agent-1/operator-results"); + req.flush({ + results: { + "op-1": { sampleRecords: [{ a: 1 }], resultStatistics: { a: "{}" } }, + }, + }); + + expect(latest?.has("op-1")).toBe(true); + expect(latest?.get("op-1")?.sampleRecords).toEqual([{ a: 1 }]); + }); + + it("falls back to empty results when the request fails", () => { + let latest: Map | undefined; + service.operatorResultSummaries$.subscribe(m => (latest = m)); + + service.fetchOperatorResults("agent-1"); + + httpMock + .expectOne(r => r.method === "GET" && r.url === "/api/agents/agent-1/operator-results") + .flush("boom", { status: 500, statusText: "Server Error" }); + + expect(latest?.size).toBe(0); + }); + }); + + describe("stopGeneration", () => { + it("sends a stop command over the websocket when one is open", () => { + const send = vi.fn(); + (service as any).agentStateTracking.set("agent-1", { + websocket: { readyState: WebSocket.OPEN, send }, + }); + + service.stopGeneration("agent-1"); + + expect(send).toHaveBeenCalledWith(JSON.stringify({ type: "WsClientStopCommand" })); + }); + + it("falls back to the REST stop endpoint when no websocket is open", () => { + (service as any).agentStateTracking.set("agent-1", { + websocket: { readyState: WebSocket.CLOSED, send: vi.fn() }, + }); + + service.stopGeneration("agent-1"); + + httpMock + .expectOne(r => r.method === "POST" && r.url === "/api/agents/agent-1/stop") + .flush({ status: "stopping" }); + }); + }); }); diff --git a/frontend/src/app/workspace/service/agent/agent.service.ts b/frontend/src/app/workspace/service/agent/agent.service.ts index 462e7679ce5..76ccbd4b9b4 100644 --- a/frontend/src/app/workspace/service/agent/agent.service.ts +++ b/frontend/src/app/workspace/service/agent/agent.service.ts @@ -449,7 +449,7 @@ export class AgentService { */ private handleWebSocketMessage(agentId: string, tracking: AgentStateTracking, message: any): void { switch (message.type) { - case "init": + case "WsServerSnapshotEvent": // Initial state and steps if (message.state) { tracking.stateSubject.next(this.mapStateToAgentState(message.state)); @@ -471,13 +471,9 @@ export class AgentService { }; tracking.workflowSubject.next(workflow as Workflow); } - // Handle initial operator results - if (message.operatorResults) { - this.updateOperatorResultSummaries(message.operatorResults); - } break; - case "step": + case "WsServerStepEvent": // New step received - update existing step or append new one if (message.step) { const convertedStep = this.convertApiReActStep(message.step); @@ -516,25 +512,14 @@ export class AgentService { } break; - case "state": + case "WsServerStatusEvent": // State update if (message.state) { tracking.stateSubject.next(this.mapStateToAgentState(message.state)); } break; - case "complete": - // Message processing complete - if (message.state) { - tracking.stateSubject.next(this.mapStateToAgentState(message.state)); - } - // Update operator results on completion - if (message.operatorResults) { - this.updateOperatorResultSummaries(message.operatorResults); - } - break; - - case "headChange": + case "WsServerHeadChangeEvent": // HEAD moved (checkout) — update HEAD, visible steps, and workflow if (message.headId !== undefined) { tracking.headIdSubject.next(message.headId); @@ -552,13 +537,9 @@ export class AgentService { }; tracking.workflowSubject.next(workflow as Workflow); } - // Update operator results on HEAD change - if (message.operatorResults) { - this.updateOperatorResultSummaries(message.operatorResults); - } break; - case "error": + case "WsServerErrorEvent": // Error occurred console.error(`Agent ${agentId} error:`, message.error); @@ -909,7 +890,7 @@ export class AgentService { } const wsMessage = { - type: "message", + type: "WsClientPromptCommand", content: message, messageSource, }; @@ -967,7 +948,7 @@ export class AgentService { if (tracking?.websocket && tracking.websocket.readyState === WebSocket.OPEN) { // Send stop via WebSocket for immediate effect try { - tracking.websocket.send(JSON.stringify({ type: "stop" })); + tracking.websocket.send(JSON.stringify({ type: "WsClientStopCommand" })); } catch (error) { console.error("Failed to send stop command:", error); } @@ -1273,34 +1254,12 @@ export class AgentService { // Operator Result Annotation Methods // ============================================================================ - /** Whether operator result annotations are currently visible */ - private resultAnnotationsVisibleSubject = new BehaviorSubject(false); - public resultAnnotationsVisible$ = this.resultAnnotationsVisibleSubject.asObservable(); - /** Current operator result summaries (operatorId → summary) */ private operatorResultSummariesSubject = new BehaviorSubject>(new Map()); public operatorResultSummaries$ = this.operatorResultSummariesSubject.asObservable(); /** - * Toggle operator result annotations on/off. - * When toggling on, fetches the latest results from the active agent. - */ - public toggleResultAnnotations(agentId?: string): void { - const newState = !this.resultAnnotationsVisibleSubject.getValue(); - if (newState) { - const id = agentId ?? this.getActivelyConnectedAgentIds()[0]; - if (!id) { - // No active agent — nothing to fetch - return; - } - this.fetchOperatorResults(id); - } else { - this.resultAnnotationsVisibleSubject.next(false); - } - } - - /** - * Update operator result summaries from a WebSocket or API response. + * Update operator result summaries from an API response. */ private updateOperatorResultSummaries(results: Record): void { const summaries = new Map(); @@ -1311,7 +1270,10 @@ export class AgentService { } /** - * Fetch operator results from the backend (fallback if WebSocket data not available). + * Pull the agent's latest operator result summaries from the backend and push + * them to `operatorResultSummaries$`. Called on demand when the UI needs to + * show results (e.g. opening an operator's popover); results are no longer + * pushed over the WebSocket. */ public fetchOperatorResults(agentId: string): void { this.http @@ -1322,14 +1284,6 @@ export class AgentService { .pipe(catchError(() => of({ results: {} as Record }))) .subscribe(response => { this.updateOperatorResultSummaries(response.results); - this.resultAnnotationsVisibleSubject.next(true); }); } - - /** - * Get current result annotations visibility. - */ - public getResultAnnotationsVisible(): boolean { - return this.resultAnnotationsVisibleSubject.getValue(); - } }