From 464381c0052e02621cd24d1e4690b4fba1e9d4d3 Mon Sep 17 00:00:00 2001 From: Jiadong Bai <43344272+bobbai00@users.noreply.github.com> Date: Sun, 28 Jun 2026 17:29:00 -0700 Subject: [PATCH 1/5] refactor(agent-service): redesign sync-execution result and error model MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this PR? Restructures the per-operator summary the sync-execution backend returns and the agent-service / frontend consume, for a leaner, consistent wire contract. This is a focused re-do of #5927 cut directly from `main` (no foundation stack): it changes only the execution result/error model and its consumers. - Replace the flat `OperatorInfo` with `OperatorExecutionSummary` (orthogonal sub-summaries: `state`, `errorMessages`, `resultSummary?`, `consoleLogsSummary?`); rename `SyncExecutionResult` → `WorkflowExecutionSummary`. - `resultSummary.sampleTuples` is now `SampleRow[]` (`{ rowIndex, tuple }`) instead of JSON rows with an embedded `__row_index__`; drop the table-shape types (the agent derives input-port shapes from the DAG). - Move `WorkflowFatalError` into `types/execution.ts` and reuse it for per-operator errors — the same type the workflow-compiling service returns for compilation errors, so compile and execution errors share one wire shape; `api/compile-api.ts` re-exports it so its existing importers are unchanged. - `errorMessages` / `errors` are non-optional (empty = none); drop `compilationErrors`; collapse the console-message types and derive warnings from `WARNING:`-titled messages. - Operator results are still pulled on demand via `GET /agents/:id/operator-results` (transport unchanged); that REST payload now carries the canonical `OperatorExecutionSummary`, and the frontend maps it to its flat display type (re-flattening `sampleTuples` so the display components are unchanged). Touches the Scala producer (`SyncExecutionResource`), the agent-service consumers (`result-formatting`, `workflow-execution-tools`, `workflow-result-state`, `server`), and the frontend mapping. Representation/type-level; behavior preserved (input-port shape lines are now derived rather than explicitly rendered). ### Any related issues, documentation, discussions? Closes #5750 Part of #5747. Supersedes #5927. ### How was this PR tested? - agent-service: `tsc --noEmit` clean, `bun test` 110/110 pass, `prettier --check` clean. - The Scala producer (`SyncExecutionResource`) is unchanged from #5927, which verified it via `sbt WorkflowExecutionService/compile` and a full-stack end-to-end run (a Claude Haiku 4.5 agent built and executed a CSV workflow; `/operator-results` returned the new shape — `resultSummary.sampleTuples: [{ rowIndex, tuple }]`, `errorMessages: []`). ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.8 (1M context) --- agent-service/src/agent/texera-agent.ts | 2 +- .../src/agent/tools/result-formatting.spec.ts | 197 ++++---------- .../src/agent/tools/result-formatting.ts | 93 ++----- .../src/agent/tools/tools-utility.ts | 8 + .../agent/tools/workflow-execution-tools.ts | 121 +++------ .../src/agent/workflow-result-state.spec.ts | 21 +- .../src/agent/workflow-result-state.ts | 8 +- agent-service/src/api/compile-api.ts | 11 +- agent-service/src/server.spec.ts | 29 +- agent-service/src/server.ts | 22 +- agent-service/src/types/execution.ts | 119 ++++++--- .../web/resource/SyncExecutionResource.scala | 247 +++++++----------- .../workspace/service/agent/agent.service.ts | 42 ++- 13 files changed, 382 insertions(+), 538 deletions(-) diff --git a/agent-service/src/agent/texera-agent.ts b/agent-service/src/agent/texera-agent.ts index ccd0545919a..8a2eaec273c 100644 --- a/agent-service/src/agent/texera-agent.ts +++ b/agent-service/src/agent/texera-agent.ts @@ -718,7 +718,7 @@ export class TexeraAgent { const result = new Map(); const visible = this.workflowResultState.getAllVisible(); for (const [operatorId, entry] of visible) { - result.set(operatorId, formatOperatorResult(operatorId, entry.operatorInfo, this.workflowState)); + result.set(operatorId, formatOperatorResult(operatorId, entry.operatorInfo)); } return result; } diff --git a/agent-service/src/agent/tools/result-formatting.spec.ts b/agent-service/src/agent/tools/result-formatting.spec.ts index e6d1afdf2e3..1d518b43e9f 100644 --- a/agent-service/src/agent/tools/result-formatting.spec.ts +++ b/agent-service/src/agent/tools/result-formatting.spec.ts @@ -19,89 +19,89 @@ import { describe, expect, test } from "bun:test"; import { formatOperatorResult } from "./result-formatting"; -import { WorkflowState } from "../workflow-state"; -import type { OperatorInfo } from "../../types/execution"; -import type { OperatorPredicate, OperatorLink, PortDescription } from "../../types/workflow"; - -function makeOpInfo(overrides: Partial = {}): OperatorInfo { - return { - state: "completed", - inputTuples: 0, - outputTuples: 0, - resultMode: "table", - ...overrides, - }; +import type { OperatorExecutionSummary, OperatorState, SampleRow } from "../../types/execution"; + +// Convert flat test rows (with an optional embedded __row_index__) into the +// structured SampleRow[] the summary now carries. +function toSampleRows(rows: Record[]): SampleRow[] { + return rows.map((row, i) => { + const { __row_index__, ...tuple } = row; + return { rowIndex: typeof __row_index__ === "number" ? __row_index__ : i, tuple }; + }); } -function makeOperator(id: string, inputPortIDs: string[] = []): OperatorPredicate { - const inputPorts: PortDescription[] = inputPortIDs.map((portID, i) => ({ - portID, - displayName: `Input ${i}`, - })); - return { - operatorID: id, - operatorType: "TestOp", - operatorVersion: "1.0", - operatorProperties: {}, - inputPorts, - outputPorts: [{ portID: "output-0", displayName: "Output 0" }], - showAdvanced: false, - }; +// Test convenience: accept the (old) flat fields and assemble the structured +// OperatorExecutionSummary, so the cases below stay terse. +interface OpInfoOverrides { + state?: OperatorState; + error?: string; + outputTuples?: number; + totalRowCount?: number; + warnings?: string[]; + result?: Record[]; } -function makeLink(linkID: string, source: [string, string], target: [string, string]): OperatorLink { - return { - linkID, - source: { operatorID: source[0], portID: source[1] }, - target: { operatorID: target[0], portID: target[1] }, +function makeOpInfo(overrides: OpInfoOverrides = {}): OperatorExecutionSummary { + const summary: OperatorExecutionSummary = { + state: overrides.state ?? "Completed", + errorMessages: overrides.error ? [{ type: "EXECUTION_FAILURE", message: overrides.error }] : [], }; + // The result summary is present only when the operator produced a result. + if (overrides.result !== undefined) { + summary.resultSummary = { + resultMode: "table", + // Non-arrays are passed through to exercise the "(no result data)" guard. + sampleTuples: Array.isArray(overrides.result) ? toSampleRows(overrides.result) : (overrides.result as any), + totalRowCount: overrides.totalRowCount ?? overrides.outputTuples ?? 0, + }; + } + if (overrides.warnings) { + // Warnings are derived from console messages whose title is "WARNING: ...". + summary.consoleLogsSummary = { + messages: overrides.warnings.map(w => ({ msgType: "PRINT", title: w, message: "" })), + }; + } + return summary; } -const EMPTY_STATE = new WorkflowState(); - describe("formatOperatorResult - early returns", () => { test("returns [ERROR] prefix when error field is set", () => { - const out = formatOperatorResult("op1", makeOpInfo({ error: "boom" }), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo({ error: "boom" })); expect(out).toBe("[ERROR] boom"); }); test("treats empty-string error as falsy and continues to result path", () => { - const out = formatOperatorResult("op1", makeOpInfo({ error: "" }), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo({ error: "" })); expect(out).not.toContain("[ERROR]"); expect(out).toContain("(no result data)"); }); test("returns (no result data) when result is undefined", () => { - const out = formatOperatorResult("op1", makeOpInfo(), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo()); expect(out).toBe("(no result data)"); }); test("returns (no result data) when result is not an array", () => { - const out = formatOperatorResult( - "op1", - makeOpInfo({ result: { rows: [] } as unknown as Record[] }), - EMPTY_STATE - ); + const out = formatOperatorResult("op1", makeOpInfo({ result: { rows: [] } as unknown as Record[] })); expect(out).toBe("(no result data)"); }); test("empty array result emits brief summary plus zero-column shape only", () => { - const out = formatOperatorResult("op1", makeOpInfo({ result: [], outputTuples: 0 }), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo({ result: [], outputTuples: 0 })); expect(out.split("\n")).toEqual(["Executed operator op1", "Output table shape: (0, 0)"]); }); }); describe("formatOperatorResult - table shape and metadata", () => { test("uses outputTuples for row count when totalRowCount missing", () => { - const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 7, result: [{ a: 1, b: 2 }] }), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 7, result: [{ a: 1, b: 2 }] })); expect(out).toContain("Output table shape: (7, 2)"); }); test("totalRowCount overrides outputTuples in output shape", () => { const out = formatOperatorResult( "op1", - makeOpInfo({ outputTuples: 7, totalRowCount: 999, result: [{ a: 1, b: 2 }] }), - EMPTY_STATE + makeOpInfo({ outputTuples: 7, totalRowCount: 999, result: [{ a: 1, b: 2 }] }) ); expect(out).toContain("Output table shape: (999, 2)"); }); @@ -112,8 +112,7 @@ describe("formatOperatorResult - table shape and metadata", () => { makeOpInfo({ outputTuples: 1, result: [{ __is_visualization__: true, "html-content": "" }], - }), - EMPTY_STATE + }) ); // 1 visible column ("html-content") since __is_visualization__ is filtered. expect(out).toContain("Output table shape: (1, 1)"); @@ -125,85 +124,14 @@ describe("formatOperatorResult - table shape and metadata", () => { makeOpInfo({ outputTuples: 1, result: [{ a: 1 }], - warnings: ["truncated to 1 row", "something else"], - }), - EMPTY_STATE + warnings: ["WARNING: truncated to 1 row", "WARNING: something else"], + }) ); const lines = out.split("\n"); expect(lines[0]).toBe("Executed operator op1"); expect(lines[1]).toBe("Output table shape: (1, 1)"); - expect(lines[2]).toBe("truncated to 1 row"); - expect(lines[3]).toBe("something else"); - }); -}); - -describe("formatOperatorResult - input port metadata", () => { - test("omits input metadata when inputPortShapes is missing", () => { - const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, result: [{ a: 1 }] }), EMPTY_STATE); - expect(out).not.toContain("Input operator"); - }); - - test("omits input metadata when inputPortShapes is empty", () => { - const out = formatOperatorResult( - "op1", - makeOpInfo({ outputTuples: 1, result: [{ a: 1 }], inputPortShapes: [] }), - EMPTY_STATE - ); - expect(out).not.toContain("Input operator"); - }); - - test("falls back to inputN placeholder when no upstream link matches the port", () => { - const out = formatOperatorResult( - "op1", - makeOpInfo({ - outputTuples: 1, - result: [{ a: 1 }], - inputPortShapes: [{ portIndex: 0, rows: 5, columns: 3 }], - }), - EMPTY_STATE - ); - expect(out).toContain("Input operator(table shape): input0(5, 3)"); - }); - - test("uses upstream operator id when an input link matches the port", () => { - const state = new WorkflowState(); - state.addOperator(makeOperator("upstream")); - state.addOperator(makeOperator("op1", ["input-0"])); - state.addLink(makeLink("l1", ["upstream", "output-0"], ["op1", "input-0"])); - - const out = formatOperatorResult( - "op1", - makeOpInfo({ - outputTuples: 4, - result: [{ a: 1, b: 2 }], - inputPortShapes: [{ portIndex: 0, rows: 10, columns: 2 }], - }), - state - ); - expect(out).toContain("Input operator(table shape): upstream(10, 2)"); - }); - - test("sorts multiple input ports by portIndex regardless of input order", () => { - const state = new WorkflowState(); - state.addOperator(makeOperator("up0")); - state.addOperator(makeOperator("up1")); - state.addOperator(makeOperator("op1", ["input-0", "input-1"])); - state.addLink(makeLink("l0", ["up0", "output-0"], ["op1", "input-0"])); - state.addLink(makeLink("l1", ["up1", "output-0"], ["op1", "input-1"])); - - const out = formatOperatorResult( - "op1", - makeOpInfo({ - outputTuples: 1, - result: [{ a: 1 }], - inputPortShapes: [ - { portIndex: 1, rows: 2, columns: 2 }, - { portIndex: 0, rows: 1, columns: 1 }, - ], - }), - state - ); - expect(out).toContain("Input operator(table shape): up0(1, 1), up1(2, 2)"); + expect(lines[2]).toBe("WARNING: truncated to 1 row"); + expect(lines[3]).toBe("WARNING: something else"); }); }); @@ -221,8 +149,7 @@ describe("formatOperatorResult - visualization rows", () => { label: "chart", }, ], - }), - EMPTY_STATE + }) ); expect(out).toContain(""); expect(out).not.toContain("
hidden
"); @@ -236,8 +163,7 @@ describe("formatOperatorResult - visualization rows", () => { makeOpInfo({ outputTuples: 1, result: [{ __is_visualization__: false, "html-content": "" }], - }), - EMPTY_STATE + }) ); expect(out).toContain(""); expect(out).not.toContain(""); @@ -249,8 +175,7 @@ describe("formatOperatorResult - visualization rows", () => { makeOpInfo({ outputTuples: 1, result: [{ __is_visualization__: false, value: 1 }], - }), - EMPTY_STATE + }) ); const lines = out.split("\n"); expect(out).toContain("Output table shape: (1, 1)"); @@ -262,8 +187,8 @@ describe("formatOperatorResult - visualization rows", () => { }); describe("jsonToTableFormat - cell coercion via formatOperatorResult", () => { - function tableLines(opInfo: Partial): string[] { - const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, ...opInfo }), EMPTY_STATE); + function tableLines(opInfo: OpInfoOverrides): string[] { + const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, ...opInfo })); // Skip brief summary + shape line. return out.split("\n").slice(2); } @@ -305,8 +230,7 @@ describe("jsonToTableFormat - row index gaps", () => { { __row_index__: 0, v: "a" }, { __row_index__: 5, v: "b" }, ], - }), - EMPTY_STATE + }) ); const lines = out.split("\n"); // header, row0, gap marker, row5 @@ -325,18 +249,13 @@ describe("jsonToTableFormat - row index gaps", () => { { __row_index__: 0, v: "a" }, { __row_index__: 1, v: "b" }, ], - }), - EMPTY_STATE + }) ); expect(out).not.toContain("...\t..."); }); test("non-zero starting __row_index__ does not emit a leading gap marker", () => { - const out = formatOperatorResult( - "op1", - makeOpInfo({ outputTuples: 1, result: [{ __row_index__: 9, v: "z" }] }), - EMPTY_STATE - ); + const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, result: [{ __row_index__: 9, v: "z" }] })); expect(out).not.toContain("...\t..."); expect(out.endsWith("9\tz")).toBe(true); }); diff --git a/agent-service/src/agent/tools/result-formatting.ts b/agent-service/src/agent/tools/result-formatting.ts index 5ed4aacc5d4..58757c4b098 100644 --- a/agent-service/src/agent/tools/result-formatting.ts +++ b/agent-service/src/agent/tools/result-formatting.ts @@ -17,100 +17,63 @@ * under the License. */ -import type { OperatorInfo } from "../../types/execution"; -import type { WorkflowState } from "../workflow-state"; -import { formatExecuteOperatorResult, getVisibleResultHeaders } from "./tools-utility"; +import type { OperatorExecutionSummary, SampleRow } from "../../types/execution"; +import { formatExecuteOperatorResult, getOperatorWarnings, getVisibleResultHeaders } from "./tools-utility"; -export function formatOperatorResult(operatorId: string, opInfo: OperatorInfo, workflowState: WorkflowState): string { - if (opInfo.error) { - return `[ERROR] ${opInfo.error}`; +export function formatOperatorResult(operatorId: string, opInfo: OperatorExecutionSummary): string { + const errorText = opInfo.errorMessages.map(e => e.message).join("; "); + if (errorText) { + return `[ERROR] ${errorText}`; } - if (!opInfo.result || !Array.isArray(opInfo.result)) { + const sampleTuples = opInfo.resultSummary?.sampleTuples; + if (!sampleTuples || !Array.isArray(sampleTuples)) { return "(no result data)"; } - const jsonArray = opInfo.result as Record[]; - const headers = jsonArray.length > 0 ? getVisibleResultHeaders(jsonArray[0]) : []; + const headers = sampleTuples.length > 0 ? getVisibleResultHeaders(sampleTuples[0].tuple) : []; const columns = headers.length; - const isViz = jsonArray.length > 0 && jsonArray[0]["__is_visualization__"] === true; - const serializableArray = isViz - ? jsonArray.map(row => { + const isViz = sampleTuples.length > 0 && sampleTuples[0].tuple["__is_visualization__"] === true; + const rows: SampleRow[] = isViz + ? sampleTuples.map(({ rowIndex, tuple }) => { const cleaned: Record = {}; - for (const key of Object.keys(row)) { + for (const key of Object.keys(tuple)) { if (key === "__is_visualization__") continue; if (key === "html-content" || key === "json-content") { cleaned[key] = ""; } else { - cleaned[key] = row[key]; + cleaned[key] = tuple[key]; } } - return cleaned; + return { rowIndex, tuple: cleaned }; }) - : jsonArray; + : sampleTuples; - const dataString = jsonToTableFormat(serializableArray); + const dataString = jsonToTableFormat(rows); - const metadataLines = [ - formatInputOutputMetadata(workflowState, operatorId, opInfo, columns), - ...(opInfo.warnings ?? []), - ].filter(Boolean); + // Output shape only; input-port shapes are derivable by the agent from the DAG + // links plus each upstream operator's own output shape shown in context. + const outputRows = opInfo.resultSummary?.totalRowCount ?? 0; + const metadataLines = [`Output table shape: (${outputRows}, ${columns})`, ...getOperatorWarnings(opInfo)].filter( + Boolean + ); const briefSummary = formatExecuteOperatorResult(operatorId); return [briefSummary, ...metadataLines, dataString].filter(Boolean).join("\n"); } -function formatInputOutputMetadata( - workflowState: WorkflowState, - operatorId: string, - opInfo: OperatorInfo, - outputColumns: number -): string { - const outputRows = opInfo.totalRowCount ?? opInfo.outputTuples; - const outputLine = `Output table shape: (${outputRows}, ${outputColumns})`; +function jsonToTableFormat(rows: SampleRow[]): string { + if (!rows || rows.length === 0) return ""; - const inputShapes = opInfo.inputPortShapes; - if (!inputShapes || inputShapes.length === 0) { - return outputLine; - } - - const inputLinks = workflowState.getAllLinks().filter(l => l.target.operatorID === operatorId); - const portIndexToUpstream = new Map(); - const op = workflowState.getOperator(operatorId); - for (const link of inputLinks) { - const portIdx = op?.inputPorts.findIndex(p => p.portID === link.target.portID) ?? -1; - if (portIdx >= 0) { - portIndexToUpstream.set(portIdx, link.source.operatorID); - } - } - - const inputPart = inputShapes - .sort((a, b) => a.portIndex - b.portIndex) - .map(p => { - const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`; - return `${name}(${p.rows}, ${p.columns})`; - }) - .join(", "); - - return `Input operator(table shape): ${inputPart}\n${outputLine}`; -} - -function jsonToTableFormat(jsonResult: Record[]): string { - if (!jsonResult || jsonResult.length === 0) return ""; - - const hasRowIndex = "__row_index__" in jsonResult[0]; - const headers = getVisibleResultHeaders(jsonResult[0]); + const headers = getVisibleResultHeaders(rows[0].tuple); if (headers.length === 0) return ""; const headerLine = "\t" + headers.join("\t"); const formattedRows: string[] = []; let prevIndex = -1; - for (let i = 0; i < jsonResult.length; i++) { - const row = jsonResult[i]; - const rowIndex = hasRowIndex ? (row["__row_index__"] as number) : i; - + for (const { rowIndex, tuple } of rows) { if (prevIndex >= 0 && rowIndex > prevIndex + 1) { const dots = headers.map(() => "...").join("\t"); formattedRows.push(`...\t${dots}`); @@ -118,7 +81,7 @@ function jsonToTableFormat(jsonResult: Record[]): string { prevIndex = rowIndex; const cells = headers.map(h => { - const val = row[h]; + const val = tuple[h]; if (val === null) return "NaN"; if (val === undefined) return ""; if (typeof val === "number" || typeof val === "boolean") return String(val); diff --git a/agent-service/src/agent/tools/tools-utility.ts b/agent-service/src/agent/tools/tools-utility.ts index 6c9ab004f6e..421d1e0dbde 100644 --- a/agent-service/src/agent/tools/tools-utility.ts +++ b/agent-service/src/agent/tools/tools-utility.ts @@ -17,12 +17,20 @@ * under the License. */ +import type { OperatorExecutionSummary } from "../../types/execution"; + export const INTERNAL_RESULT_KEYS: ReadonlySet = new Set(["__row_index__", "__is_visualization__"]); export function getVisibleResultHeaders(row: Record): string[] { return Object.keys(row).filter(k => !INTERNAL_RESULT_KEYS.has(k)); } +// Warnings are the console messages the engine tags with a "WARNING: " title +// prefix; derive them rather than carrying a separate field on the summary. +export function getOperatorWarnings(opInfo: OperatorExecutionSummary): string[] { + return (opInfo.consoleLogsSummary?.messages ?? []).filter(m => m.title.startsWith("WARNING: ")).map(m => m.title); +} + export function createToolResult(message: string): string { return message; } diff --git a/agent-service/src/agent/tools/workflow-execution-tools.ts b/agent-service/src/agent/tools/workflow-execution-tools.ts index 78c6cfa3d55..c52e92cbb21 100644 --- a/agent-service/src/agent/tools/workflow-execution-tools.ts +++ b/agent-service/src/agent/tools/workflow-execution-tools.ts @@ -19,12 +19,17 @@ import { z } from "zod"; import { tool } from "ai"; -import { createErrorResult, formatExecuteOperatorResult, getVisibleResultHeaders } from "./tools-utility"; +import { + createErrorResult, + formatExecuteOperatorResult, + getOperatorWarnings, + getVisibleResultHeaders, +} from "./tools-utility"; import type { WorkflowState } from "../workflow-state"; import { getBackendConfig } from "../../api/backend-api"; import { env } from "../../config/env"; import type { LogicalPlan, LogicalLink } from "../../api/execution-api"; -import type { OperatorInfo, SyncExecutionResult } from "../../types/execution"; +import type { OperatorExecutionSummary, SampleRow, WorkflowExecutionSummary } from "../../types/execution"; import { WorkflowSystemMetadata } from "../util/workflow-system-metadata"; import { DEFAULT_AGENT_SETTINGS } from "../../types/agent"; import { createLogger } from "../../logger"; @@ -255,7 +260,7 @@ async function executeWorkflowHttp( config: ExecutionConfig, logicalPlan: LogicalPlan, options: { abortSignal?: AbortSignal } = {} -): Promise { +): Promise { const backendConfig = getBackendConfig(); const workflowId = config.workflowId; @@ -312,7 +317,7 @@ async function executeWorkflowHttp( throw new Error(`Execution request failed: ${response.status} ${response.statusText} - ${errorText}`); } - return (await response.json()) as SyncExecutionResult; + return (await response.json()) as WorkflowExecutionSummary; } catch (error) { if (error instanceof Error && error.name === "AbortError") { throw error; @@ -327,55 +332,12 @@ async function executeWorkflowHttp( } } -function formatInputOutput( - workflowState: WorkflowState, - operatorId: string, - opInfo: OperatorInfo, - outputColumns: number -): string { - const outputRows = opInfo.totalRowCount ?? opInfo.outputTuples; - const outputLine = `Output table shape: (${outputRows}, ${outputColumns})`; - - const inputShapes = opInfo.inputPortShapes; - if (!inputShapes || inputShapes.length === 0) { - return outputLine; - } - - const inputLinks = workflowState.getAllLinks().filter(l => l.target.operatorID === operatorId); - const portIndexToUpstream = new Map(); - const op = workflowState.getOperator(operatorId); - for (const link of inputLinks) { - const portIdx = op?.inputPorts.findIndex(p => p.portID === link.target.portID) ?? -1; - if (portIdx >= 0) { - portIndexToUpstream.set(portIdx, link.source.operatorID); - } - } - - const inputPart = inputShapes - .sort((a, b) => a.portIndex - b.portIndex) - .map(p => { - const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`; - return `${name}(${p.rows}, ${p.columns})`; - }) - .join(", "); - - return `Input operator(table shape): ${inputPart}\n${outputLine}`; -} - function formatExecutionError( - compilationErrors?: Record, operatorErrors?: Array<{ operatorId: string; error: string }>, generalErrors?: string[] ): string { const lines: string[] = ["Execution failed due to the following error:"]; - if (compilationErrors && Object.keys(compilationErrors).length > 0) { - lines.push("Compilation error:"); - for (const [key, value] of Object.entries(compilationErrors)) { - lines.push(` ${key}: ${value}`); - } - } - if (operatorErrors && operatorErrors.length > 0) { lines.push("Execution error:"); for (const { operatorId, error } of operatorErrors) { @@ -393,11 +355,10 @@ function formatExecutionError( return lines.join("\n"); } -function jsonToTableFormat(jsonResult: Record[]): string { - if (!jsonResult || jsonResult.length === 0) return ""; +function jsonToTableFormat(rows: SampleRow[]): string { + if (!rows || rows.length === 0) return ""; - const hasRowIndex = jsonResult.length > 0 && "__row_index__" in jsonResult[0]; - const headers = getVisibleResultHeaders(jsonResult[0]); + const headers = getVisibleResultHeaders(rows[0].tuple); if (headers.length === 0) return ""; // Leading tab aligns headers with the index column (pandas __repr__ style). const headerLine = "\t" + headers.join("\t"); @@ -405,10 +366,7 @@ function jsonToTableFormat(jsonResult: Record[]): string { const formattedRows: string[] = []; let prevIndex = -1; - for (let i = 0; i < jsonResult.length; i++) { - const row = jsonResult[i]; - const rowIndex = hasRowIndex ? (row["__row_index__"] as number) : i; - + for (const { rowIndex, tuple } of rows) { if (prevIndex >= 0 && rowIndex > prevIndex + 1) { const dots = headers.map(() => "...").join("\t"); formattedRows.push(`...\t${dots}`); @@ -416,7 +374,7 @@ function jsonToTableFormat(jsonResult: Record[]): string { prevIndex = rowIndex; const cells = headers.map(h => { - const val = row[h]; + const val = tuple[h]; if (val === null) return "NaN"; if (val === undefined) return ""; if (typeof val === "number" || typeof val === "boolean") return String(val); @@ -438,7 +396,7 @@ export async function executeOperatorAndFormat( operatorId: string, options: { abortSignal?: AbortSignal; - onResult?: (operatorId: string, operatorInfo: OperatorInfo) => void; + onResult?: (operatorId: string, operatorInfo: OperatorExecutionSummary) => void; onResultLegacy?: (operatorId: string, backendStats?: Record) => void; } = {} ): Promise { @@ -466,34 +424,26 @@ export async function executeOperatorAndFormat( } } - const result: SyncExecutionResult = await executeWorkflowHttp(config, logicalPlan, { + const result: WorkflowExecutionSummary = await executeWorkflowHttp(config, logicalPlan, { abortSignal: options.abortSignal, }); if (!result.success) { - const compilationErrors = - result.state === "CompilationFailed" || result.state === "ValidationFailed" - ? result.compilationErrors - : undefined; - const operatorErrors = result.state === "Failed" ? Object.entries(result.operators) - .filter(([_, op]) => op.error) - .map(([opId, op]) => ({ operatorId: opId, error: op.error! })) + .filter(([_, op]) => op.errorMessages.length) + .map(([opId, op]) => ({ operatorId: opId, error: op.errorMessages.map(e => e.message).join("; ") })) : undefined; const generalErrors = result.state === "Killed" ? ["Workflow execution was killed (timeout)."] : result.errors; - const errorText = formatExecutionError(compilationErrors, operatorErrors, generalErrors); + const errorText = formatExecutionError(operatorErrors, generalErrors); if (options.onResult) { - const errorInfo: OperatorInfo = { - state: result.state, - inputTuples: 0, - outputTuples: 0, - resultMode: "table", - error: errorText, + const errorInfo: OperatorExecutionSummary = { + state: "Failed", + errorMessages: [{ type: "EXECUTION_FAILURE", message: errorText, operatorId }], }; options.onResult(operatorId, errorInfo); } @@ -503,36 +453,35 @@ export async function executeOperatorAndFormat( const opInfo = result.operators[operatorId]; if (!opInfo) { - return createErrorResult( - formatExecutionError(undefined, undefined, [`No result found for operator: ${operatorId}`]) - ); + return createErrorResult(formatExecutionError(undefined, [`No result found for operator: ${operatorId}`])); } - if (opInfo.error) { + if (opInfo.errorMessages.length) { if (options.onResult) { options.onResult(operatorId, opInfo); } - return createErrorResult(formatExecutionError(undefined, [{ operatorId, error: opInfo.error }])); + const opError = opInfo.errorMessages.map(e => e.message).join("; "); + return createErrorResult(formatExecutionError([{ operatorId, error: opError }])); } - if (!opInfo.result || !Array.isArray(opInfo.result)) { + const sampleTuples = opInfo.resultSummary?.sampleTuples; + if (!sampleTuples || !Array.isArray(sampleTuples)) { return "(no result data)"; } - const jsonArray = opInfo.result as Record[]; - const headers = jsonArray.length > 0 ? getVisibleResultHeaders(jsonArray[0]) : []; + const headers = sampleTuples.length > 0 ? getVisibleResultHeaders(sampleTuples[0].tuple) : []; const columns = headers.length; // Notify for every operator in the execution so upstream stats are also stored. if (options.onResult) { for (const [opId, info] of Object.entries(result.operators)) { - if (info && !info.error) { + if (info && !info.errorMessages.length) { options.onResult(opId, info); } } } - let dataString = jsonToTableFormat(jsonArray); + let dataString = jsonToTableFormat(sampleTuples); // Safety-net: TSV serialization may add padding beyond backend's raw-record budget. const charLimit = config.maxOperatorResultCharLimit ?? DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit; @@ -568,9 +517,11 @@ export async function executeOperatorAndFormat( dataString = [headerLine, ...keptRows].join("\n"); } - const shapeLine = formatInputOutput(workflowState, operatorId, opInfo, columns); + // Output shape only; the agent derives input-port shapes from the DAG + the + // upstream operators' own output shapes shown in context. + const shapeLine = `Output table shape: (${opInfo.resultSummary?.totalRowCount ?? 0}, ${columns})`; - const warningLines = opInfo.warnings?.map(w => w) ?? []; + const warningLines = getOperatorWarnings(opInfo); const metadataLines = [shapeLine, ...warningLines].filter(Boolean); @@ -589,7 +540,7 @@ export async function executeOperatorAndFormat( export function createExecuteOperatorTool( workflowState: WorkflowState, getConfig: () => ExecutionConfig, - onResult?: (operatorId: string, operatorInfo: OperatorInfo) => void + onResult?: (operatorId: string, operatorInfo: OperatorExecutionSummary) => void ) { return tool({ description: diff --git a/agent-service/src/agent/workflow-result-state.spec.ts b/agent-service/src/agent/workflow-result-state.spec.ts index b2e46fd0d9e..00f501087ba 100644 --- a/agent-service/src/agent/workflow-result-state.spec.ts +++ b/agent-service/src/agent/workflow-result-state.spec.ts @@ -19,14 +19,13 @@ import { describe, expect, test } from "bun:test"; import { WorkflowResultState } from "./workflow-result-state"; -import type { OperatorInfo } from "../types/execution"; +import type { OperatorExecutionSummary } from "../types/execution"; -function makeInfo(outputTuples: number): OperatorInfo { +function makeInfo(totalRowCount: number): OperatorExecutionSummary { return { state: "Completed", - inputTuples: 0, - outputTuples, - resultMode: "table", + errorMessages: [], + resultSummary: { resultMode: "table", sampleTuples: [], totalRowCount }, }; } @@ -40,15 +39,15 @@ describe("WorkflowResultState - ancestor walk", () => { state.set("op1", "step-C", makeInfo(3)); path = ["step-A", "step-B", "step-C"]; - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(3); + expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(3); // Rewind to step-B; step-C is no longer an ancestor. path = ["step-A", "step-B"]; - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(2); + expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(2); // Rewind further. path = ["step-A"]; - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(1); + expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(1); }); test("returns undefined when no ancestor has a result", () => { @@ -74,8 +73,8 @@ describe("WorkflowResultState - ancestor walk", () => { path = ["step-A", "step-B"]; const visible = state.getAllVisible(); expect(visible.size).toBe(2); - expect(visible.get("op1")?.operatorInfo.outputTuples).toBe(1); - expect(visible.get("op2")?.operatorInfo.outputTuples).toBe(7); + expect(visible.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(1); + expect(visible.get("op2")?.operatorInfo.resultSummary?.totalRowCount).toBe(7); }); test("clear drops all stored results", () => { @@ -90,6 +89,6 @@ describe("WorkflowResultState - ancestor walk", () => { const state = new WorkflowResultState(() => ["step-A"]); state.set("op1", "step-A", makeInfo(1)); state.set("op1", "step-A", makeInfo(42)); - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(42); + expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(42); }); }); diff --git a/agent-service/src/agent/workflow-result-state.ts b/agent-service/src/agent/workflow-result-state.ts index e6f13c2301a..34d604657e3 100644 --- a/agent-service/src/agent/workflow-result-state.ts +++ b/agent-service/src/agent/workflow-result-state.ts @@ -17,10 +17,10 @@ * under the License. */ -import type { OperatorInfo } from "../types/execution"; +import type { OperatorExecutionSummary } from "../types/execution"; interface ResultEntry { - operatorInfo: OperatorInfo; + operatorInfo: OperatorExecutionSummary; stepId: string; } @@ -37,7 +37,7 @@ export class WorkflowResultState { constructor(private getAncestorPath: () => string[]) {} - set(operatorId: string, stepId: string, operatorInfo: OperatorInfo): void { + set(operatorId: string, stepId: string, operatorInfo: OperatorExecutionSummary): void { let versions = this.results.get(operatorId); if (!versions) { versions = new Map(); @@ -58,7 +58,7 @@ export class WorkflowResultState { return undefined; } - getOperatorInfo(operatorId: string): OperatorInfo | undefined { + getOperatorInfo(operatorId: string): OperatorExecutionSummary | undefined { return this.get(operatorId)?.operatorInfo; } diff --git a/agent-service/src/api/compile-api.ts b/agent-service/src/api/compile-api.ts index 8ffd27fd52c..8a361231c42 100644 --- a/agent-service/src/api/compile-api.ts +++ b/agent-service/src/api/compile-api.ts @@ -19,8 +19,13 @@ import { getBackendConfig } from "./backend-api"; import type { LogicalPlan, OperatorPortSchemaMap } from "../types/workflow"; +import type { WorkflowFatalError } from "../types/execution"; import { createLogger } from "../logger"; +// WorkflowFatalError is defined in types/execution.ts (shared by compile and +// execution errors); re-exported here for existing importers of this module. +export type { WorkflowFatalError }; + const log = createLogger("CompileAPI"); export interface SchemaAttribute { @@ -30,12 +35,6 @@ export interface SchemaAttribute { export type PortSchema = ReadonlyArray; -export interface WorkflowFatalError { - type: string; - message: string; - operatorId?: string; -} - export interface WorkflowCompilationResponse { physicalPlan?: any; operatorOutputSchemas: Record; diff --git a/agent-service/src/server.spec.ts b/agent-service/src/server.spec.ts index c1fdddc9339..8f7e1c7f857 100644 --- a/agent-service/src/server.spec.ts +++ b/agent-service/src/server.spec.ts @@ -323,6 +323,7 @@ describe("agent read routes", () => { test("GET /:id/operator-results maps the visible operator results", async () => { const agent = _getAgentForTests(id)!; + // The route returns each operator's OperatorExecutionSummary verbatim. (agent as any).getWorkflowResultState = () => ({ getAllVisible: () => new Map([ @@ -330,27 +331,25 @@ describe("agent read routes", () => { "op-1", { operatorInfo: { - state: "COMPLETED", - inputTuples: 1, - outputTuples: 2, - inputPortShapes: [], - result: [{ a: 1 }], - error: undefined, - warnings: [], - consoleLogs: [], - totalRowCount: 2, - resultStatistics: {}, + state: "Completed", + errorMessages: [], + resultSummary: { + resultMode: "table", + sampleTuples: [{ a: 1 }], + totalRowCount: 2, + }, }, }, ], ]), }); - const body = await readJson<{ results: Record }>( - await getJson(`${API}/agents/${id}/operator-results`) - ); - expect(body.results["op-1"].outputTuples).toBe(2); - expect(body.results["op-1"].outputColumns).toBe(1); + const body = await readJson<{ + results: Record; + }>(await getJson(`${API}/agents/${id}/operator-results`)); + expect(body.results["op-1"].state).toBe("Completed"); + expect(body.results["op-1"].resultSummary.totalRowCount).toBe(2); + expect(body.results["op-1"].resultSummary.sampleTuples).toEqual([{ a: 1 }]); }); }); diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index 030d27b95bf..6c23943be59 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -21,7 +21,6 @@ import { Elysia, t } from "elysia"; import { cors } from "@elysiajs/cors"; import { createOpenAI } from "@ai-sdk/openai"; import { TexeraAgent } from "./agent/texera-agent"; -import { getVisibleResultHeaders } from "./agent/tools/tools-utility"; import { getBackendConfig } from "./api/backend-api"; import { extractBearerToken, extractUserFromToken, validateToken } from "./api/auth-api"; import { retrieveWorkflow } from "./api/workflow-api"; @@ -42,7 +41,7 @@ import type { import { AgentState, OperatorResultSerializationMode } from "./types/agent"; import type { WsClientCommand, WsServerEvent } from "./types/ws"; import { WsServerSnapshotEvent, WsServerStepEvent, WsServerStatusEvent, WsServerErrorEvent } from "./types/ws"; -import type { OperatorResultSummary } from "./types/execution"; +import type { OperatorExecutionSummary } from "./types/execution"; const agentStore = new Map(); let agentCounter = 0; @@ -388,25 +387,12 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) } ); -function getOperatorResultSummaries(agent: TexeraAgent): Record { +function getOperatorResultSummaries(agent: TexeraAgent): Record { const resultState = agent.getWorkflowResultState(); const visible = resultState.getAllVisible(); - const results: Record = {}; + const results: Record = {}; for (const [opId, entry] of visible) { - const info = entry.operatorInfo; - results[opId] = { - state: info.state, - inputTuples: info.inputTuples, - outputTuples: info.outputTuples, - inputPortShapes: info.inputPortShapes, - outputColumns: info.result && info.result.length > 0 ? getVisibleResultHeaders(info.result[0]).length : undefined, - error: info.error, - warnings: info.warnings, - consoleLogCount: info.consoleLogs?.length, - totalRowCount: info.totalRowCount, - sampleRecords: info.result, - resultStatistics: info.resultStatistics, - }; + results[opId] = entry.operatorInfo; } return results; } diff --git a/agent-service/src/types/execution.ts b/agent-service/src/types/execution.ts index d638a889d47..eae9e0f0daa 100644 --- a/agent-service/src/types/execution.ts +++ b/agent-service/src/types/execution.ts @@ -17,56 +17,93 @@ * under the License. */ -interface ConsoleMessage { +// A fatal error reported for one operator. Reuses the engine's wire shape +// (workflowruntimestate.proto): `type` is the FatalErrorType enum name. The same +// type the workflow-compiling service returns for compilation errors, so compile +// and execution errors share one shape. Re-exported by api/compile-api.ts. +export interface WorkflowFatalError { + // FatalErrorType enum name, e.g. "COMPILATION_ERROR" | "EXECUTION_FAILURE". + type: string; + message: string; + details?: string; + operatorId?: string; + workerId?: string; + timestamp?: { seconds: number; nanos: number }; +} + +// Lifecycle state of a single operator, as reported by the engine +// (mirrors the backend's WorkflowAggregatedState string mapping). +export type OperatorState = + | "Uninitialized" + | "Ready" + | "Running" + | "Pausing" + | "Paused" + | "Resuming" + | "Completed" + | "Failed" + | "Killed" + | "Terminated" + | "Unknown"; + +// Aggregated state of a whole workflow execution: the OperatorState values the +// engine reports, plus the synthetic outcomes the sync-execution endpoint adds. +export type WorkflowExecutionState = OperatorState | "Error" | "CompilationFailed"; + +// A single console message emitted by an operator during execution. +// `title` is the short header (Scala errors put their text here); `message` is +// the body (Python errors / stack traces). +export interface ConsoleMessage { msgType: string; + title: string; message: string; } -interface PortShape { - portIndex: number; - rows: number; - columns: number; +// One sampled output row: its original position plus the row's columns. (A viz +// payload's tuple still carries an `__is_visualization__` marker.) +export interface SampleRow { + rowIndex: number; + tuple: Record; } -export interface OperatorInfo { - state: string; - inputTuples: number; - outputTuples: number; - inputPortShapes?: PortShape[]; +// An operator's output, summarized for the agent. `sampleTuples` are the +// symmetrically-truncated output rows (the middle is dropped, so `rowIndex` +// values may have gaps). `outputSchema` / per-column statistics are intended +// future additions — the engine does not produce them yet. +export interface OperatorResultSummary { + // "table" or "visualization". resultMode: string; - result?: Record[]; - totalRowCount?: number; - displayedRows?: number; - truncated?: boolean; - consoleLogs?: ConsoleMessage[]; - error?: string; - warnings?: string[]; - resultStatistics?: Record; + sampleTuples: SampleRow[]; + // Total output rows before truncation (sampleTuples may hold fewer). + totalRowCount: number; } -export interface SyncExecutionResult { - success: boolean; - state: string; - operators: Record; - compilationErrors?: Record; - errors?: string[]; +// An operator's console output. Warnings are not a separate field: they are the +// messages whose `title` the engine prefixes with "WARNING: ", derived on demand. +export interface OperatorConsoleLogsSummary { + messages: ConsoleMessage[]; } -/** - * Wire projection of one operator's execution result, summarized for the - * client: counts and a small record sample instead of full payloads. Returned - * by the REST route `GET /agents/:id/operator-results`. - */ -export interface OperatorResultSummary { - state: string; - inputTuples: number; - outputTuples: number; - inputPortShapes?: PortShape[]; - outputColumns?: number; - error?: string; - warnings?: string[]; - consoleLogCount?: number; - totalRowCount?: number; - sampleRecords?: Record[]; - resultStatistics?: Record; +// Per-operator execution summary returned by the sync-execution backend. +// Orthogonal sub-summaries replace the previous flat `OperatorInfo`. +export interface OperatorExecutionSummary { + state: OperatorState; + // Empty means the operator did not fail. + errorMessages: ReadonlyArray; + // Absent when the operator produced no materialized result. + resultSummary?: OperatorResultSummary; + // Absent when the operator produced no console output. + consoleLogsSummary?: OperatorConsoleLogsSummary; +} + +// The result of one synchronous workflow execution. +export interface WorkflowExecutionSummary { + // True only on a clean run; can be false even when state is "Completed" + // (e.g. an operator logged a console error without aborting the run). + success: boolean; + state: WorkflowExecutionState; + operators: Record; + // Workflow-level errors (timeouts, init/compile failures, fatal errors); + // empty means none. + errors: string[]; } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala index b70bafb4b0b..e7ff87222f5 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala @@ -44,6 +44,9 @@ import org.apache.texera.amber.engine.common.executionruntimestate.{ ExecutionMetadataStore, ExecutionStatsStore } +import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError +import org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE +import com.google.protobuf.timestamp.Timestamp import io.reactivex.rxjava3.core.Observable import org.apache.texera.auth.SessionUser import org.apache.texera.dao.SqlServer @@ -55,12 +58,12 @@ import org.apache.texera.web.service.{ExecutionResultService, WorkflowService} import org.apache.texera.web.storage.ExecutionStateStore.updateWorkflowState import java.net.URI +import java.time.Instant import java.util.concurrent.TimeUnit import javax.annotation.security.RolesAllowed import javax.ws.rs._ import javax.ws.rs.core.MediaType import scala.collection.mutable -import scala.jdk.CollectionConverters._ import com.fasterxml.jackson.databind.ObjectMapper case class SyncExecutionRequest( @@ -79,32 +82,41 @@ case class ConsoleMessageInfo( message: String ) -case class PortShape( - portIndex: Int, - rows: Long +// One sampled output row: the original row position plus the row's columns as a +// processed/truncated JSON object (not a raw engine Tuple, which would serialize +// as {schema, fields[]} and bypass the type-aware conversion + cell truncation). +// The index is carried explicitly rather than embedded in the tuple. +case class SampleRow( + rowIndex: Int, + tuple: ObjectNode ) -case class OperatorInfo( - state: String, - inputTuples: Long, - outputTuples: Long, - inputPortShapes: Option[List[PortShape]], +case class OperatorResultSummary( resultMode: String, // "table" or "visualization" - result: Option[Any], // JSON array (List[ObjectNode]) - totalRowCount: Option[Int], - displayedRows: Option[Int], - truncated: Option[Boolean], - consoleLogs: Option[List[ConsoleMessageInfo]], - error: Option[String], - warnings: Option[List[String]] + sampleTuples: List[SampleRow], + totalRowCount: Int +) + +case class OperatorConsoleLogsSummary( + messages: List[ConsoleMessageInfo] ) -case class SyncExecutionResult( +// Per-operator execution summary. Orthogonal sub-summaries replace the previous +// flat OperatorInfo; must stay in sync with agent-service's OperatorExecutionSummary. +// `errorMessages` reuses the engine's WorkflowFatalError, the same type the +// compiling service returns for compilation errors, for one consistent wire shape. +case class OperatorExecutionSummary( + state: String, + errorMessages: List[WorkflowFatalError], // empty means the operator did not fail + resultSummary: Option[OperatorResultSummary], + consoleLogsSummary: Option[OperatorConsoleLogsSummary] +) + +case class WorkflowExecutionSummary( success: Boolean, state: String, - operators: Map[String, OperatorInfo], - compilationErrors: Option[Map[String, String]], - errors: Option[List[String]] + operators: Map[String, OperatorExecutionSummary], + errors: List[String] // empty means none ) sealed trait TerminationReason @@ -129,7 +141,7 @@ class SyncExecutionResource extends LazyLogging { @PathParam("cuid") computingUnitId: Int, request: SyncExecutionRequest, @Auth user: SessionUser - ): SyncExecutionResult = { + ): WorkflowExecutionSummary = { val timeoutSeconds = request.timeoutSeconds val maxOperatorResultCharLimit = @@ -176,12 +188,11 @@ class SyncExecutionResource extends LazyLogging { val executionService = workflowService.executionService.getValue if (executionService == null) { - return SyncExecutionResult( + return WorkflowExecutionSummary( success = false, state = "Error", operators = Map.empty, - compilationErrors = None, - errors = Some(List("Failed to initialize execution service")) + errors = List("Failed to initialize execution service") ) } @@ -254,21 +265,19 @@ class SyncExecutionResource extends LazyLogging { } catch { case _: java.util.concurrent.TimeoutException => killExecution(executionService) - return SyncExecutionResult( + return WorkflowExecutionSummary( success = false, state = "Killed", operators = Map.empty, - compilationErrors = None, - errors = Some(List(s"Timeout after $timeoutSeconds seconds")) + errors = List(s"Timeout after $timeoutSeconds seconds") ) case e: Exception => logger.error(s"Error waiting for execution: ${e.getMessage}", e) - return SyncExecutionResult( + return WorkflowExecutionSummary( success = false, state = "Error", operators = Map.empty, - compilationErrors = None, - errors = Some(List(e.getMessage)) + errors = List(e.getMessage) ) } } @@ -318,7 +327,7 @@ class SyncExecutionResource extends LazyLogging { .map(err => s"${err.`type`}: ${err.message}") .toList - val hasOperatorConsoleError = operatorInfos.values.exists(_.error.isDefined) + val hasOperatorConsoleError = operatorInfos.values.exists(_.errorMessages.nonEmpty) val stateString = if (terminatedByConsoleError) "Failed" @@ -328,12 +337,11 @@ class SyncExecutionResource extends LazyLogging { val isSuccess = (finalState.state == COMPLETED || terminatedByTargetResults) && !hasOperatorConsoleError && !terminatedByConsoleError - SyncExecutionResult( + WorkflowExecutionSummary( success = isSuccess, state = stateString, operators = operatorInfos, - compilationErrors = None, - errors = if (fatalErrors.nonEmpty) Some(fatalErrors) else None + errors = fatalErrors ) } catch { @@ -382,8 +390,8 @@ class SyncExecutionResource extends LazyLogging { maxOperatorResultCharLimit: Int, maxOperatorResultCellCharLimit: Int, inMemoryConsoleState: Option[ExecutionConsoleStore] = None - ): Map[String, OperatorInfo] = { - val operatorInfos = mutable.Map[String, OperatorInfo]() + ): Map[String, OperatorExecutionSummary] = { + val operatorInfos = mutable.Map[String, OperatorExecutionSummary]() val statsState = executionService.executionStateStore.statsStore.getState val operatorStats = statsState.operatorInfo @@ -406,23 +414,9 @@ class SyncExecutionResource extends LazyLogging { for (opId <- targetOps) { val stats = operatorStats.get(opId) - val (state, inputTuples, outputTuples): (String, Long, Long) = stats match { - case Some(s) => - val inputCount = s.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum - val outputCount = s.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum - (stateToString(s.operatorState), inputCount, outputCount) - case None => ("Unknown", 0L, 0L) - } + val state = stats.map(s => stateToString(s.operatorState)).getOrElse("Unknown") - val inputPortShapes: Option[List[PortShape]] = stats - .map { s => - s.operatorStatistics.inputMetrics.map { pm => - PortShape(pm.portId.id, pm.tupleMetrics.count) - }.toList - } - .filter(_.nonEmpty) - - val (resultMode, result, totalRowCount, displayedRows, truncated) = + val (resultMode, result, totalRowCount, _, _) = collectOperatorResult( executionId, opId, @@ -459,31 +453,40 @@ class SyncExecutionResource extends LazyLogging { } ) - // Convention: PRINT messages prefixed with "WARNING: " surface as warnings. - val warningMsgs = consoleLogs - .map(_.filter(_.title.startsWith("WARNING: ")).map(_.title)) - .filter(_.nonEmpty) - operatorInfos(opId) = OperatorInfo( + // Absent when the operator produced no materialized result. `result` and + // `totalRowCount` are populated together, so map over the former. + val resultSummary = result.map { tuples => + OperatorResultSummary( + resultMode = resultMode, + sampleTuples = tuples, + totalRowCount = totalRowCount.getOrElse(0) + ) + } + + val consoleLogsSummary = consoleLogs.map { logs => + OperatorConsoleLogsSummary(messages = logs) + } + + // Per-operator runtime errors come from console ERROR logs; surface them as + // EXECUTION_FAILURE WorkflowFatalErrors (same type the compiler emits for + // COMPILATION_ERRORs). Empty list means the operator did not fail. + val errorMessages = errorMsg + .map(msg => List(WorkflowFatalError(EXECUTION_FAILURE, Timestamp(Instant.now), msg, "", opId))) + .getOrElse(List.empty) + + operatorInfos(opId) = OperatorExecutionSummary( state = state, - inputTuples = inputTuples, - outputTuples = outputTuples, - inputPortShapes = inputPortShapes, - resultMode = resultMode, - result = result, - totalRowCount = totalRowCount, - displayedRows = displayedRows, - truncated = truncated, - consoleLogs = consoleLogs, - error = errorMsg, - warnings = warningMsgs + errorMessages = errorMessages, + resultSummary = resultSummary, + consoleLogsSummary = consoleLogsSummary ) } operatorInfos.toMap } - private def handleExecutionError(e: Exception): SyncExecutionResult = { + private def handleExecutionError(e: Exception): WorkflowExecutionSummary = { val errorMsg = e.getMessage val isCompilationError = errorMsg != null && ( errorMsg.contains("compilation") || @@ -493,20 +496,18 @@ class SyncExecutionResource extends LazyLogging { ) if (isCompilationError) { - SyncExecutionResult( + WorkflowExecutionSummary( success = false, state = "CompilationFailed", operators = Map.empty, - compilationErrors = Some(Map("error" -> errorMsg)), - errors = Some(List(errorMsg)) + errors = List(errorMsg) ) } else { - SyncExecutionResult( + WorkflowExecutionSummary( success = false, state = "Error", operators = Map.empty, - compilationErrors = None, - errors = Some(List(Option(e.getMessage).getOrElse("Unknown error"))) + errors = List(Option(e.getMessage).getOrElse("Unknown error")) ) } } @@ -521,9 +522,7 @@ class SyncExecutionResource extends LazyLogging { opId: String, maxOperatorResultCharLimit: Int, maxOperatorResultCellCharLimit: Int - ): (String, Option[Any], Option[Int], Option[Int], Option[Boolean]) = { - import com.fasterxml.jackson.databind.node.ObjectNode - + ): (String, Option[List[SampleRow]], Option[Int], Option[Int], Option[Boolean]) = { try { val storageUriOption = WorkflowExecutionsResource.getResultUriByLogicalPortId( executionId, @@ -543,13 +542,7 @@ class SyncExecutionResource extends LazyLogging { val tupleIterator = document.get() if (totalCount == 0 || !tupleIterator.hasNext) { - return ( - "table", - Some(List.empty[ObjectNode].asJava), - Some(0), - Some(0), - Some(false) - ) + return ("table", Some(List.empty[SampleRow]), Some(0), Some(0), Some(false)) } // A single tuple with html-content / json-content is a visualization payload — @@ -558,71 +551,53 @@ class SyncExecutionResource extends LazyLogging { if (totalCount == 1 && isVisualizationTuple(firstTuple)) { val jsonResults = ExecutionResultService.convertTuplesToJson(List(firstTuple), isVisualization = true) - jsonResults.foreach( - _.asInstanceOf[ObjectNode].put("__is_visualization__", true) - ) - return ( - "visualization", - Some(jsonResults), - Some(totalCount), - Some(1), - Some(false) - ) + jsonResults.foreach(_.put("__is_visualization__", true)) + val rows = jsonResults.zipWithIndex.map { case (json, idx) => SampleRow(idx, json) } + return ("visualization", Some(rows), Some(totalCount), Some(1), Some(false)) } - // __row_index__ preserves the original position so the frontend can show - // "row N" correctly after symmetric truncation drops the middle. + // rowIndex preserves the original position so the client can show "row N" + // correctly after symmetric truncation drops the middle. var rowIndex = 0 val firstJson = ExecutionResultService.convertTuplesToJson(List(firstTuple)).head val truncatedFirst = truncateSingleTuple(firstJson, maxOperatorResultCellCharLimit) - truncatedFirst.put("__row_index__", rowIndex) val firstSize = estimateTupleSize(truncatedFirst, mapper) if (firstSize >= maxOperatorResultCharLimit) { - return ( - "table", - Some(List(truncatedFirst).asJava), - Some(totalCount), - Some(1), - Some(true) - ) + return ("table", Some(List(SampleRow(rowIndex, truncatedFirst))), Some(totalCount), Some(1), Some(true)) } val halfLimit = maxOperatorResultCharLimit / 2 val truncationNoticeSize = 50 // reserved for the "...skipped..." marker - val frontTuples = mutable.ListBuffer[ObjectNode](truncatedFirst) + val frontRows = mutable.ListBuffer[SampleRow](SampleRow(rowIndex, truncatedFirst)) var frontSize = firstSize - var processedCount = 1 while (tupleIterator.hasNext && frontSize < halfLimit) { val tuple = tupleIterator.next() rowIndex += 1 - processedCount += 1 val jsonTuple = ExecutionResultService.convertTuplesToJson(List(tuple)).head val truncatedTuple = truncateSingleTuple(jsonTuple, maxOperatorResultCellCharLimit) - truncatedTuple.put("__row_index__", rowIndex) val tupleSize = estimateTupleSize(truncatedTuple, mapper) + val row = SampleRow(rowIndex, truncatedTuple) if (frontSize + tupleSize <= halfLimit) { - frontTuples += truncatedTuple + frontRows += row frontSize += tupleSize } else { // Front is full — switch to a sliding window for the back half. - val backBuffer = mutable.ArrayBuffer[(ObjectNode, Int)]() - backBuffer += ((truncatedTuple, tupleSize)) + val backBuffer = mutable.ArrayBuffer[(SampleRow, Int)]() + backBuffer += ((row, tupleSize)) var backSize = tupleSize while (tupleIterator.hasNext) { val t = tupleIterator.next() rowIndex += 1 - processedCount += 1 val jt = ExecutionResultService.convertTuplesToJson(List(t)).head val tt = truncateSingleTuple(jt, maxOperatorResultCellCharLimit) - tt.put("__row_index__", rowIndex) val ts = estimateTupleSize(tt, mapper) - backBuffer += ((tt, ts)) + backBuffer += ((SampleRow(rowIndex, tt), ts)) backSize += ts while (backSize > halfLimit - truncationNoticeSize && backBuffer.size > 1) { @@ -631,34 +606,24 @@ class SyncExecutionResource extends LazyLogging { } } - val backTuples = backBuffer.map(_._1).toList - val allTuples = frontTuples.toList ++ backTuples - val skippedRows = totalCount - allTuples.size - - return ( - "table", - Some(allTuples.asJava), - Some(totalCount), - Some(allTuples.size), - Some(skippedRows > 0) - ) + val allRows = frontRows.toList ++ backBuffer.map(_._1).toList + val skippedRows = totalCount - allRows.size + return ("table", Some(allRows), Some(totalCount), Some(allRows.size), Some(skippedRows > 0)) } } if (tupleIterator.hasNext) { - val backBuffer = mutable.ArrayBuffer[(ObjectNode, Int)]() + val backBuffer = mutable.ArrayBuffer[(SampleRow, Int)]() var backSize = 0 while (tupleIterator.hasNext) { val t = tupleIterator.next() rowIndex += 1 - processedCount += 1 val jt = ExecutionResultService.convertTuplesToJson(List(t)).head val tt = truncateSingleTuple(jt, maxOperatorResultCellCharLimit) - tt.put("__row_index__", rowIndex) val ts = estimateTupleSize(tt, mapper) - backBuffer += ((tt, ts)) + backBuffer += ((SampleRow(rowIndex, tt), ts)) backSize += ts while (backSize > halfLimit - truncationNoticeSize && backBuffer.size > 1) { @@ -667,25 +632,11 @@ class SyncExecutionResource extends LazyLogging { } } - val backTuples = backBuffer.map(_._1).toList - val allTuples = frontTuples.toList ++ backTuples - val skippedRows = totalCount - allTuples.size - - ( - "table", - Some(allTuples.asJava), - Some(totalCount), - Some(allTuples.size), - Some(skippedRows > 0) - ) + val allRows = frontRows.toList ++ backBuffer.map(_._1).toList + val skippedRows = totalCount - allRows.size + ("table", Some(allRows), Some(totalCount), Some(allRows.size), Some(skippedRows > 0)) } else { - ( - "table", - Some(frontTuples.toList.asJava), - Some(totalCount), - Some(frontTuples.size), - Some(false) - ) + ("table", Some(frontRows.toList), Some(totalCount), Some(frontRows.size), Some(false)) } case None => diff --git a/frontend/src/app/workspace/service/agent/agent.service.ts b/frontend/src/app/workspace/service/agent/agent.service.ts index 5e7c254f22c..8c567d532d2 100644 --- a/frontend/src/app/workspace/service/agent/agent.service.ts +++ b/frontend/src/app/workspace/service/agent/agent.service.ts @@ -103,7 +103,7 @@ export interface OperatorResultSummary { state: string; inputTuples: number; outputTuples: number; - inputPortShapes?: { portIndex: number; rows: number; columns: number }[]; + inputPortShapes?: { portIndex: number; rows: number }[]; outputColumns?: number; error?: string; warnings?: string[]; @@ -113,6 +113,24 @@ export interface OperatorResultSummary { resultStatistics?: Record; } +/** + * Per-operator execution summary as sent by the agent-service over the + * operator-results endpoint (mirror of its OperatorExecutionSummary). The flat + * OperatorResultSummary above is derived from this for display. + */ +interface WireOperatorExecutionSummary { + state: string; + errorMessages?: { type: string; message: string }[]; + resultSummary?: { + resultMode: string; + sampleTuples: { rowIndex: number; tuple: Record }[]; + totalRowCount: number; + }; + consoleLogsSummary?: { + messages: { msgType: string; title: string; message: string }[]; + }; +} + interface ApiAgentInfo { id: string; name: string; @@ -1233,10 +1251,24 @@ export class AgentService { /** * Update operator result summaries from an API response. */ - private updateOperatorResultSummaries(results: Record): void { + private updateOperatorResultSummaries(results: Record): void { const summaries = new Map(); for (const [opId, data] of Object.entries(results)) { - summaries.set(opId, data); + summaries.set(opId, { + state: data.state, + // Tuple counts are no longer carried per-port; output rows come from the + // result summary, input shapes are derivable from the DAG when needed. + inputTuples: 0, + outputTuples: data.resultSummary?.totalRowCount ?? 0, + error: data.errorMessages?.map(e => e.message).join("; ") || undefined, + warnings: (data.consoleLogsSummary?.messages ?? []) + .filter(m => m.title.startsWith("WARNING: ")) + .map(m => m.title), + consoleLogCount: data.consoleLogsSummary?.messages.length, + totalRowCount: data.resultSummary?.totalRowCount, + // Flatten back to embedded __row_index__ so the display components are unchanged. + sampleRecords: data.resultSummary?.sampleTuples?.map(r => ({ __row_index__: r.rowIndex, ...r.tuple })), + }); } this.operatorResultSummariesSubject.next(summaries); } @@ -1249,11 +1281,11 @@ export class AgentService { */ public fetchOperatorResults(agentId: string): void { this.http - .get<{ results: Record }>( + .get<{ results: Record }>( `${this.AGENT_API_BASE}/agents/${agentId}/operator-results`, this.agentHeaders(agentId) ) - .pipe(catchError(() => of({ results: {} as Record }))) + .pipe(catchError(() => of({ results: {} as Record }))) .subscribe(response => { this.updateOperatorResultSummaries(response.results); }); From e2163bc571aa11ca040e3b0794670be6fc933771 Mon Sep 17 00:00:00 2001 From: Jiadong Bai <43344272+bobbai00@users.noreply.github.com> Date: Sun, 28 Jun 2026 18:31:10 -0700 Subject: [PATCH 2/5] refactor(frontend): consume execution summaries directly --- agent-service/src/server.spec.ts | 4 +- agent-service/src/types/execution.ts | 4 +- .../web/resource/SyncExecutionResource.scala | 6 +- .../agent-interaction.component.html | 41 ++----- .../agent-interaction.component.scss | 89 --------------- .../agent-interaction.component.ts | 101 ++++-------------- .../workflow-editor.component.html | 3 +- .../workflow-editor.component.ts | 18 ++-- .../service/agent/agent.service.spec.ts | 24 +++-- .../workspace/service/agent/agent.service.ts | 100 ++++++++--------- 10 files changed, 102 insertions(+), 288 deletions(-) diff --git a/agent-service/src/server.spec.ts b/agent-service/src/server.spec.ts index 8f7e1c7f857..1897d0dfb45 100644 --- a/agent-service/src/server.spec.ts +++ b/agent-service/src/server.spec.ts @@ -335,7 +335,7 @@ describe("agent read routes", () => { errorMessages: [], resultSummary: { resultMode: "table", - sampleTuples: [{ a: 1 }], + sampleTuples: [{ rowIndex: 0, tuple: { a: 1 } }], totalRowCount: 2, }, }, @@ -349,7 +349,7 @@ describe("agent read routes", () => { }>(await getJson(`${API}/agents/${id}/operator-results`)); expect(body.results["op-1"].state).toBe("Completed"); expect(body.results["op-1"].resultSummary.totalRowCount).toBe(2); - expect(body.results["op-1"].resultSummary.sampleTuples).toEqual([{ a: 1 }]); + expect(body.results["op-1"].resultSummary.sampleTuples).toEqual([{ rowIndex: 0, tuple: { a: 1 } }]); }); }); diff --git a/agent-service/src/types/execution.ts b/agent-service/src/types/execution.ts index eae9e0f0daa..db9b164d3b9 100644 --- a/agent-service/src/types/execution.ts +++ b/agent-service/src/types/execution.ts @@ -70,7 +70,7 @@ export interface SampleRow { // symmetrically-truncated output rows (the middle is dropped, so `rowIndex` // values may have gaps). `outputSchema` / per-column statistics are intended // future additions — the engine does not produce them yet. -export interface OperatorResultSummary { +export interface OperatorOutputSummary { // "table" or "visualization". resultMode: string; sampleTuples: SampleRow[]; @@ -91,7 +91,7 @@ export interface OperatorExecutionSummary { // Empty means the operator did not fail. errorMessages: ReadonlyArray; // Absent when the operator produced no materialized result. - resultSummary?: OperatorResultSummary; + resultSummary?: OperatorOutputSummary; // Absent when the operator produced no console output. consoleLogsSummary?: OperatorConsoleLogsSummary; } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala index e7ff87222f5..47ce6b9515b 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala @@ -91,7 +91,7 @@ case class SampleRow( tuple: ObjectNode ) -case class OperatorResultSummary( +case class OperatorOutputSummary( resultMode: String, // "table" or "visualization" sampleTuples: List[SampleRow], totalRowCount: Int @@ -108,7 +108,7 @@ case class OperatorConsoleLogsSummary( case class OperatorExecutionSummary( state: String, errorMessages: List[WorkflowFatalError], // empty means the operator did not fail - resultSummary: Option[OperatorResultSummary], + resultSummary: Option[OperatorOutputSummary], consoleLogsSummary: Option[OperatorConsoleLogsSummary] ) @@ -457,7 +457,7 @@ class SyncExecutionResource extends LazyLogging { // Absent when the operator produced no materialized result. `result` and // `totalRowCount` are populated together, so map over the former. val resultSummary = result.map { tuples => - OperatorResultSummary( + OperatorOutputSummary( resultMode = resultMode, sampleTuples = tuples, totalRowCount = totalRowCount.getOrElse(0) diff --git a/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html b/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html index cd2f1e0f52a..1ff6d9c4c6d 100644 --- a/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html +++ b/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html @@ -20,7 +20,7 @@