diff --git a/agent-service/src/agent/texera-agent.ts b/agent-service/src/agent/texera-agent.ts index ccd0545919a..8a2eaec273c 100644 --- a/agent-service/src/agent/texera-agent.ts +++ b/agent-service/src/agent/texera-agent.ts @@ -718,7 +718,7 @@ export class TexeraAgent { const result = new Map(); const visible = this.workflowResultState.getAllVisible(); for (const [operatorId, entry] of visible) { - result.set(operatorId, formatOperatorResult(operatorId, entry.operatorInfo, this.workflowState)); + result.set(operatorId, formatOperatorResult(operatorId, entry.operatorInfo)); } return result; } diff --git a/agent-service/src/agent/tools/result-formatting.spec.ts b/agent-service/src/agent/tools/result-formatting.spec.ts index e6d1afdf2e3..0658692772f 100644 --- a/agent-service/src/agent/tools/result-formatting.spec.ts +++ b/agent-service/src/agent/tools/result-formatting.spec.ts @@ -19,104 +19,118 @@ import { describe, expect, test } from "bun:test"; import { formatOperatorResult } from "./result-formatting"; -import { WorkflowState } from "../workflow-state"; -import type { OperatorInfo } from "../../types/execution"; -import type { OperatorPredicate, OperatorLink, PortDescription } from "../../types/workflow"; +import { + ConsoleMessageType, + OperatorState, + OperatorResultMode, + WorkflowFatalErrorType, + type OperatorExecutionSummary, + type WorkflowFatalError, + type SampleRow, +} from "../../types/execution"; + +function toSampleRows(rows: Record[]): SampleRow[] { + return rows.map((tuple, rowIndex) => ({ rowIndex, tuple })); +} -function makeOpInfo(overrides: Partial = {}): OperatorInfo { - return { - state: "completed", - inputTuples: 0, - outputTuples: 0, - resultMode: "table", - ...overrides, - }; +interface OpInfoOverrides { + state?: OperatorState; + error?: string; + outputTuples?: number; + tuplesCount?: number; + warnings?: string[]; + result?: Record[]; + sampleTuples?: SampleRow[]; + resultMode?: OperatorResultMode; } -function makeOperator(id: string, inputPortIDs: string[] = []): OperatorPredicate { - const inputPorts: PortDescription[] = inputPortIDs.map((portID, i) => ({ - portID, - displayName: `Input ${i}`, - })); +function makeExecutionFailure(message: string): WorkflowFatalError { return { - operatorID: id, - operatorType: "TestOp", - operatorVersion: "1.0", - operatorProperties: {}, - inputPorts, - outputPorts: [{ portID: "output-0", displayName: "Output 0" }], - showAdvanced: false, + type: { name: WorkflowFatalErrorType.EXECUTION_FAILURE }, + timestamp: { seconds: 0, nanos: 0 }, + message, + details: "", + operatorId: "", + workerId: "", }; } -function makeLink(linkID: string, source: [string, string], target: [string, string]): OperatorLink { - return { - linkID, - source: { operatorID: source[0], portID: source[1] }, - target: { operatorID: target[0], portID: target[1] }, +function makeOpInfo(overrides: OpInfoOverrides = {}): OperatorExecutionSummary { + const summary: OperatorExecutionSummary = { + state: overrides.state ?? OperatorState.COMPLETED, + errorMessages: overrides.error ? [makeExecutionFailure(overrides.error)] : [], }; + // The result summary is present only when the operator produced a result. + if (overrides.result !== undefined || overrides.sampleTuples !== undefined) { + summary.resultSummary = { + resultMode: overrides.resultMode ?? OperatorResultMode.TABLE, + // Non-arrays are passed through to exercise the "(no result data)" guard. + sampleTuples: + overrides.sampleTuples ?? + (Array.isArray(overrides.result) ? toSampleRows(overrides.result) : (overrides.result as any)), + tuplesCount: overrides.tuplesCount ?? overrides.outputTuples ?? 0, + }; + } + if (overrides.warnings) { + // Warnings are derived from console messages whose title is "WARNING: ...". + summary.consoleLogsSummary = { + messages: overrides.warnings.map(w => ({ msgType: ConsoleMessageType.PRINT, title: w, message: "" })), + }; + } + return summary; } -const EMPTY_STATE = new WorkflowState(); - describe("formatOperatorResult - early returns", () => { test("returns [ERROR] prefix when error field is set", () => { - const out = formatOperatorResult("op1", makeOpInfo({ error: "boom" }), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo({ error: "boom" })); expect(out).toBe("[ERROR] boom"); }); test("treats empty-string error as falsy and continues to result path", () => { - const out = formatOperatorResult("op1", makeOpInfo({ error: "" }), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo({ error: "" })); expect(out).not.toContain("[ERROR]"); expect(out).toContain("(no result data)"); }); test("returns (no result data) when result is undefined", () => { - const out = formatOperatorResult("op1", makeOpInfo(), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo()); expect(out).toBe("(no result data)"); }); test("returns (no result data) when result is not an array", () => { - const out = formatOperatorResult( - "op1", - makeOpInfo({ result: { rows: [] } as unknown as Record[] }), - EMPTY_STATE - ); + const out = formatOperatorResult("op1", makeOpInfo({ result: { rows: [] } as unknown as Record[] })); expect(out).toBe("(no result data)"); }); test("empty array result emits brief summary plus zero-column shape only", () => { - const out = formatOperatorResult("op1", makeOpInfo({ result: [], outputTuples: 0 }), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo({ result: [], outputTuples: 0 })); expect(out.split("\n")).toEqual(["Executed operator op1", "Output table shape: (0, 0)"]); }); }); describe("formatOperatorResult - table shape and metadata", () => { - test("uses outputTuples for row count when totalRowCount missing", () => { - const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 7, result: [{ a: 1, b: 2 }] }), EMPTY_STATE); + test("uses outputTuples for row count when tuplesCount missing", () => { + const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 7, result: [{ a: 1, b: 2 }] })); expect(out).toContain("Output table shape: (7, 2)"); }); - test("totalRowCount overrides outputTuples in output shape", () => { + test("tuplesCount overrides outputTuples in output shape", () => { const out = formatOperatorResult( "op1", - makeOpInfo({ outputTuples: 7, totalRowCount: 999, result: [{ a: 1, b: 2 }] }), - EMPTY_STATE + makeOpInfo({ outputTuples: 7, tuplesCount: 999, result: [{ a: 1, b: 2 }] }) ); expect(out).toContain("Output table shape: (999, 2)"); }); - test("filters internal __is_visualization__ key from outer column count", () => { + test("counts every result tuple key as a column", () => { const out = formatOperatorResult( "op1", makeOpInfo({ outputTuples: 1, - result: [{ __is_visualization__: true, "html-content": "" }], - }), - EMPTY_STATE + result: [{ "html-content": "", label: "chart" }], + }) ); - // 1 visible column ("html-content") since __is_visualization__ is filtered. - expect(out).toContain("Output table shape: (1, 1)"); + expect(out).toContain("Output table shape: (1, 2)"); }); test("appends warnings after metadata lines", () => { @@ -125,104 +139,32 @@ describe("formatOperatorResult - table shape and metadata", () => { makeOpInfo({ outputTuples: 1, result: [{ a: 1 }], - warnings: ["truncated to 1 row", "something else"], - }), - EMPTY_STATE + warnings: ["WARNING: truncated to 1 row", "WARNING: something else"], + }) ); const lines = out.split("\n"); expect(lines[0]).toBe("Executed operator op1"); expect(lines[1]).toBe("Output table shape: (1, 1)"); - expect(lines[2]).toBe("truncated to 1 row"); - expect(lines[3]).toBe("something else"); - }); -}); - -describe("formatOperatorResult - input port metadata", () => { - test("omits input metadata when inputPortShapes is missing", () => { - const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, result: [{ a: 1 }] }), EMPTY_STATE); - expect(out).not.toContain("Input operator"); - }); - - test("omits input metadata when inputPortShapes is empty", () => { - const out = formatOperatorResult( - "op1", - makeOpInfo({ outputTuples: 1, result: [{ a: 1 }], inputPortShapes: [] }), - EMPTY_STATE - ); - expect(out).not.toContain("Input operator"); - }); - - test("falls back to inputN placeholder when no upstream link matches the port", () => { - const out = formatOperatorResult( - "op1", - makeOpInfo({ - outputTuples: 1, - result: [{ a: 1 }], - inputPortShapes: [{ portIndex: 0, rows: 5, columns: 3 }], - }), - EMPTY_STATE - ); - expect(out).toContain("Input operator(table shape): input0(5, 3)"); - }); - - test("uses upstream operator id when an input link matches the port", () => { - const state = new WorkflowState(); - state.addOperator(makeOperator("upstream")); - state.addOperator(makeOperator("op1", ["input-0"])); - state.addLink(makeLink("l1", ["upstream", "output-0"], ["op1", "input-0"])); - - const out = formatOperatorResult( - "op1", - makeOpInfo({ - outputTuples: 4, - result: [{ a: 1, b: 2 }], - inputPortShapes: [{ portIndex: 0, rows: 10, columns: 2 }], - }), - state - ); - expect(out).toContain("Input operator(table shape): upstream(10, 2)"); - }); - - test("sorts multiple input ports by portIndex regardless of input order", () => { - const state = new WorkflowState(); - state.addOperator(makeOperator("up0")); - state.addOperator(makeOperator("up1")); - state.addOperator(makeOperator("op1", ["input-0", "input-1"])); - state.addLink(makeLink("l0", ["up0", "output-0"], ["op1", "input-0"])); - state.addLink(makeLink("l1", ["up1", "output-0"], ["op1", "input-1"])); - - const out = formatOperatorResult( - "op1", - makeOpInfo({ - outputTuples: 1, - result: [{ a: 1 }], - inputPortShapes: [ - { portIndex: 1, rows: 2, columns: 2 }, - { portIndex: 0, rows: 1, columns: 1 }, - ], - }), - state - ); - expect(out).toContain("Input operator(table shape): up0(1, 1), up1(2, 2)"); + expect(lines[2]).toBe("WARNING: truncated to 1 row"); + expect(lines[3]).toBe("WARNING: something else"); }); }); describe("formatOperatorResult - visualization rows", () => { - test("strips html-content and json-content payloads when row is flagged as visualization", () => { + test("strips html-content and json-content payloads when result mode is visualization", () => { const out = formatOperatorResult( "op1", makeOpInfo({ outputTuples: 1, + resultMode: OperatorResultMode.VISUALIZATION, result: [ { - __is_visualization__: true, "html-content": "
hidden
", "json-content": '{"big":1}', label: "chart", }, ], - }), - EMPTY_STATE + }) ); expect(out).toContain(""); expect(out).not.toContain("
hidden
"); @@ -230,40 +172,38 @@ describe("formatOperatorResult - visualization rows", () => { expect(out).toContain("chart"); }); - test("__is_visualization__ false leaves the visualization-only fields untouched", () => { + test("table result mode leaves visualization payload fields untouched", () => { const out = formatOperatorResult( "op1", makeOpInfo({ outputTuples: 1, - result: [{ __is_visualization__: false, "html-content": "" }], - }), - EMPTY_STATE + resultMode: OperatorResultMode.TABLE, + result: [{ "html-content": "" }], + }) ); expect(out).toContain(""); expect(out).not.toContain(""); }); - test("__is_visualization__ column is excluded from rendered table body and shape agrees", () => { + test("table rows render all tuple columns and shape agrees", () => { const out = formatOperatorResult( "op1", makeOpInfo({ outputTuples: 1, - result: [{ __is_visualization__: false, value: 1 }], - }), - EMPTY_STATE + result: [{ value: 1 }], + }) ); const lines = out.split("\n"); expect(out).toContain("Output table shape: (1, 1)"); // Header line is the third line (after brief summary and shape line). expect(lines[2]).toBe("\tvalue"); expect(lines[3]).toBe("0\t1"); - expect(out).not.toContain("__is_visualization__"); }); }); describe("jsonToTableFormat - cell coercion via formatOperatorResult", () => { - function tableLines(opInfo: Partial): string[] { - const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, ...opInfo }), EMPTY_STATE); + function tableLines(opInfo: OpInfoOverrides): string[] { + const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, ...opInfo })); // Skip brief summary + shape line. return out.split("\n").slice(2); } @@ -296,17 +236,16 @@ describe("jsonToTableFormat - cell coercion via formatOperatorResult", () => { }); describe("jsonToTableFormat - row index gaps", () => { - test("inserts ... separator when __row_index__ skips ahead", () => { + test("inserts ... separator when rowIndex skips ahead", () => { const out = formatOperatorResult( "op1", makeOpInfo({ outputTuples: 2, - result: [ - { __row_index__: 0, v: "a" }, - { __row_index__: 5, v: "b" }, + sampleTuples: [ + { rowIndex: 0, tuple: { v: "a" } }, + { rowIndex: 5, tuple: { v: "b" } }, ], - }), - EMPTY_STATE + }) ); const lines = out.split("\n"); // header, row0, gap marker, row5 @@ -316,26 +255,24 @@ describe("jsonToTableFormat - row index gaps", () => { expect(lines[lines.length - 1]).toBe("5\tb"); }); - test("no separator is emitted between consecutive __row_index__ values", () => { + test("no separator is emitted between consecutive rowIndex values", () => { const out = formatOperatorResult( "op1", makeOpInfo({ outputTuples: 2, - result: [ - { __row_index__: 0, v: "a" }, - { __row_index__: 1, v: "b" }, + sampleTuples: [ + { rowIndex: 0, tuple: { v: "a" } }, + { rowIndex: 1, tuple: { v: "b" } }, ], - }), - EMPTY_STATE + }) ); expect(out).not.toContain("...\t..."); }); - test("non-zero starting __row_index__ does not emit a leading gap marker", () => { + test("non-zero starting rowIndex does not emit a leading gap marker", () => { const out = formatOperatorResult( "op1", - makeOpInfo({ outputTuples: 1, result: [{ __row_index__: 9, v: "z" }] }), - EMPTY_STATE + makeOpInfo({ outputTuples: 1, sampleTuples: [{ rowIndex: 9, tuple: { v: "z" } }] }) ); expect(out).not.toContain("...\t..."); expect(out.endsWith("9\tz")).toBe(true); diff --git a/agent-service/src/agent/tools/result-formatting.ts b/agent-service/src/agent/tools/result-formatting.ts index 5ed4aacc5d4..88ee29b5662 100644 --- a/agent-service/src/agent/tools/result-formatting.ts +++ b/agent-service/src/agent/tools/result-formatting.ts @@ -17,100 +17,62 @@ * under the License. */ -import type { OperatorInfo } from "../../types/execution"; -import type { WorkflowState } from "../workflow-state"; -import { formatExecuteOperatorResult, getVisibleResultHeaders } from "./tools-utility"; +import { OperatorResultMode, type OperatorExecutionSummary, type SampleRow } from "../../types/execution"; +import { formatExecuteOperatorResult, getOperatorWarnings, getVisibleResultHeaders } from "./tools-utility"; -export function formatOperatorResult(operatorId: string, opInfo: OperatorInfo, workflowState: WorkflowState): string { - if (opInfo.error) { - return `[ERROR] ${opInfo.error}`; +export function formatOperatorResult(operatorId: string, opInfo: OperatorExecutionSummary): string { + const errorText = opInfo.errorMessages.map(e => e.message).join("; "); + if (errorText) { + return `[ERROR] ${errorText}`; } - if (!opInfo.result || !Array.isArray(opInfo.result)) { + const sampleTuples = opInfo.resultSummary?.sampleTuples; + if (!sampleTuples || !Array.isArray(sampleTuples)) { return "(no result data)"; } - const jsonArray = opInfo.result as Record[]; - const headers = jsonArray.length > 0 ? getVisibleResultHeaders(jsonArray[0]) : []; - const columns = headers.length; - - const isViz = jsonArray.length > 0 && jsonArray[0]["__is_visualization__"] === true; - const serializableArray = isViz - ? jsonArray.map(row => { + const isViz = opInfo.resultSummary?.resultMode === OperatorResultMode.VISUALIZATION; + const rows: SampleRow[] = isViz + ? sampleTuples.map(({ rowIndex, tuple }) => { const cleaned: Record = {}; - for (const key of Object.keys(row)) { - if (key === "__is_visualization__") continue; + for (const key of Object.keys(tuple)) { if (key === "html-content" || key === "json-content") { cleaned[key] = ""; } else { - cleaned[key] = row[key]; + cleaned[key] = tuple[key]; } } - return cleaned; + return { rowIndex, tuple: cleaned }; }) - : jsonArray; + : sampleTuples; + + const headers = rows.length > 0 ? getVisibleResultHeaders(rows[0].tuple) : []; + const columns = headers.length; - const dataString = jsonToTableFormat(serializableArray); + const dataString = jsonToTableFormat(rows); - const metadataLines = [ - formatInputOutputMetadata(workflowState, operatorId, opInfo, columns), - ...(opInfo.warnings ?? []), - ].filter(Boolean); + // Output shape only; input-port shapes are derivable by the agent from the DAG + // links plus each upstream operator's own output shape shown in context. + const outputRows = opInfo.resultSummary?.tuplesCount ?? 0; + const metadataLines = [`Output table shape: (${outputRows}, ${columns})`, ...getOperatorWarnings(opInfo)].filter( + Boolean + ); const briefSummary = formatExecuteOperatorResult(operatorId); return [briefSummary, ...metadataLines, dataString].filter(Boolean).join("\n"); } -function formatInputOutputMetadata( - workflowState: WorkflowState, - operatorId: string, - opInfo: OperatorInfo, - outputColumns: number -): string { - const outputRows = opInfo.totalRowCount ?? opInfo.outputTuples; - const outputLine = `Output table shape: (${outputRows}, ${outputColumns})`; - - const inputShapes = opInfo.inputPortShapes; - if (!inputShapes || inputShapes.length === 0) { - return outputLine; - } - - const inputLinks = workflowState.getAllLinks().filter(l => l.target.operatorID === operatorId); - const portIndexToUpstream = new Map(); - const op = workflowState.getOperator(operatorId); - for (const link of inputLinks) { - const portIdx = op?.inputPorts.findIndex(p => p.portID === link.target.portID) ?? -1; - if (portIdx >= 0) { - portIndexToUpstream.set(portIdx, link.source.operatorID); - } - } - - const inputPart = inputShapes - .sort((a, b) => a.portIndex - b.portIndex) - .map(p => { - const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`; - return `${name}(${p.rows}, ${p.columns})`; - }) - .join(", "); - - return `Input operator(table shape): ${inputPart}\n${outputLine}`; -} - -function jsonToTableFormat(jsonResult: Record[]): string { - if (!jsonResult || jsonResult.length === 0) return ""; +function jsonToTableFormat(rows: SampleRow[]): string { + if (!rows || rows.length === 0) return ""; - const hasRowIndex = "__row_index__" in jsonResult[0]; - const headers = getVisibleResultHeaders(jsonResult[0]); + const headers = getVisibleResultHeaders(rows[0].tuple); if (headers.length === 0) return ""; const headerLine = "\t" + headers.join("\t"); const formattedRows: string[] = []; let prevIndex = -1; - for (let i = 0; i < jsonResult.length; i++) { - const row = jsonResult[i]; - const rowIndex = hasRowIndex ? (row["__row_index__"] as number) : i; - + for (const { rowIndex, tuple } of rows) { if (prevIndex >= 0 && rowIndex > prevIndex + 1) { const dots = headers.map(() => "...").join("\t"); formattedRows.push(`...\t${dots}`); @@ -118,7 +80,7 @@ function jsonToTableFormat(jsonResult: Record[]): string { prevIndex = rowIndex; const cells = headers.map(h => { - const val = row[h]; + const val = tuple[h]; if (val === null) return "NaN"; if (val === undefined) return ""; if (typeof val === "number" || typeof val === "boolean") return String(val); diff --git a/agent-service/src/agent/tools/tools-utility.spec.ts b/agent-service/src/agent/tools/tools-utility.spec.ts index b505199709e..c1939388342 100644 --- a/agent-service/src/agent/tools/tools-utility.spec.ts +++ b/agent-service/src/agent/tools/tools-utility.spec.ts @@ -29,37 +29,17 @@ import { } from "./tools-utility"; describe("getVisibleResultHeaders", () => { - test("returns every key when no internal columns are present", () => { + test("returns every key", () => { expect(getVisibleResultHeaders({ a: 1, b: 2 })).toEqual(["a", "b"]); }); - test("strips __row_index__ from the result", () => { - expect(getVisibleResultHeaders({ __row_index__: 0, a: 1 })).toEqual(["a"]); - }); - - test("strips __is_visualization__ from the result", () => { - expect(getVisibleResultHeaders({ __is_visualization__: true, a: 1 })).toEqual(["a"]); - }); - - test("strips every known internal column at once", () => { - expect(getVisibleResultHeaders({ __row_index__: 0, __is_visualization__: true, a: 1, b: 2 })).toEqual(["a", "b"]); - }); - test("preserves visible column order", () => { - expect(getVisibleResultHeaders({ z: 1, __row_index__: 0, a: 2, __is_visualization__: true, m: 3 })).toEqual([ - "z", - "a", - "m", - ]); + expect(getVisibleResultHeaders({ z: 1, a: 2, m: 3 })).toEqual(["z", "a", "m"]); }); test("returns an empty array for an empty row", () => { expect(getVisibleResultHeaders({})).toEqual([]); }); - - test("returns an empty array when only internal columns are present", () => { - expect(getVisibleResultHeaders({ __row_index__: 0, __is_visualization__: true })).toEqual([]); - }); }); describe("createToolResult", () => { diff --git a/agent-service/src/agent/tools/tools-utility.ts b/agent-service/src/agent/tools/tools-utility.ts index 6c9ab004f6e..12d71966cb8 100644 --- a/agent-service/src/agent/tools/tools-utility.ts +++ b/agent-service/src/agent/tools/tools-utility.ts @@ -17,10 +17,16 @@ * under the License. */ -export const INTERNAL_RESULT_KEYS: ReadonlySet = new Set(["__row_index__", "__is_visualization__"]); +import type { OperatorExecutionSummary } from "../../types/execution"; export function getVisibleResultHeaders(row: Record): string[] { - return Object.keys(row).filter(k => !INTERNAL_RESULT_KEYS.has(k)); + return Object.keys(row); +} + +// Warnings are the console messages the engine tags with a "WARNING: " title +// prefix; derive them rather than carrying a separate field on the summary. +export function getOperatorWarnings(opInfo: OperatorExecutionSummary): string[] { + return (opInfo.consoleLogsSummary?.messages ?? []).filter(m => m.title.startsWith("WARNING: ")).map(m => m.title); } export function createToolResult(message: string): string { diff --git a/agent-service/src/agent/tools/workflow-execution-tools.ts b/agent-service/src/agent/tools/workflow-execution-tools.ts index 78c6cfa3d55..84671bbd94d 100644 --- a/agent-service/src/agent/tools/workflow-execution-tools.ts +++ b/agent-service/src/agent/tools/workflow-execution-tools.ts @@ -19,12 +19,25 @@ import { z } from "zod"; import { tool } from "ai"; -import { createErrorResult, formatExecuteOperatorResult, getVisibleResultHeaders } from "./tools-utility"; +import { + createErrorResult, + formatExecuteOperatorResult, + getOperatorWarnings, + getVisibleResultHeaders, +} from "./tools-utility"; import type { WorkflowState } from "../workflow-state"; import { getBackendConfig } from "../../api/backend-api"; import { env } from "../../config/env"; import type { LogicalPlan, LogicalLink } from "../../api/execution-api"; -import type { OperatorInfo, SyncExecutionResult } from "../../types/execution"; +import { + OperatorState, + WorkflowFatalErrorType, + WorkflowExecutionState, + type OperatorExecutionSummary, + type SampleRow, + type WorkflowFatalError, + type WorkflowExecutionSummary, +} from "../../types/execution"; import { WorkflowSystemMetadata } from "../util/workflow-system-metadata"; import { DEFAULT_AGENT_SETTINGS } from "../../types/agent"; import { createLogger } from "../../logger"; @@ -255,7 +268,7 @@ async function executeWorkflowHttp( config: ExecutionConfig, logicalPlan: LogicalPlan, options: { abortSignal?: AbortSignal } = {} -): Promise { +): Promise { const backendConfig = getBackendConfig(); const workflowId = config.workflowId; @@ -312,7 +325,7 @@ async function executeWorkflowHttp( throw new Error(`Execution request failed: ${response.status} ${response.statusText} - ${errorText}`); } - return (await response.json()) as SyncExecutionResult; + return (await response.json()) as WorkflowExecutionSummary; } catch (error) { if (error instanceof Error && error.name === "AbortError") { throw error; @@ -320,62 +333,19 @@ async function executeWorkflowHttp( log.error({ err: error }, "execution failed"); return { success: false, - state: "Error", + state: WorkflowExecutionState.ERROR, operators: {}, errors: [error instanceof Error ? error.message : "Unknown error"], }; } } -function formatInputOutput( - workflowState: WorkflowState, - operatorId: string, - opInfo: OperatorInfo, - outputColumns: number -): string { - const outputRows = opInfo.totalRowCount ?? opInfo.outputTuples; - const outputLine = `Output table shape: (${outputRows}, ${outputColumns})`; - - const inputShapes = opInfo.inputPortShapes; - if (!inputShapes || inputShapes.length === 0) { - return outputLine; - } - - const inputLinks = workflowState.getAllLinks().filter(l => l.target.operatorID === operatorId); - const portIndexToUpstream = new Map(); - const op = workflowState.getOperator(operatorId); - for (const link of inputLinks) { - const portIdx = op?.inputPorts.findIndex(p => p.portID === link.target.portID) ?? -1; - if (portIdx >= 0) { - portIndexToUpstream.set(portIdx, link.source.operatorID); - } - } - - const inputPart = inputShapes - .sort((a, b) => a.portIndex - b.portIndex) - .map(p => { - const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`; - return `${name}(${p.rows}, ${p.columns})`; - }) - .join(", "); - - return `Input operator(table shape): ${inputPart}\n${outputLine}`; -} - function formatExecutionError( - compilationErrors?: Record, operatorErrors?: Array<{ operatorId: string; error: string }>, generalErrors?: string[] ): string { const lines: string[] = ["Execution failed due to the following error:"]; - if (compilationErrors && Object.keys(compilationErrors).length > 0) { - lines.push("Compilation error:"); - for (const [key, value] of Object.entries(compilationErrors)) { - lines.push(` ${key}: ${value}`); - } - } - if (operatorErrors && operatorErrors.length > 0) { lines.push("Execution error:"); for (const { operatorId, error } of operatorErrors) { @@ -393,11 +363,22 @@ function formatExecutionError( return lines.join("\n"); } -function jsonToTableFormat(jsonResult: Record[]): string { - if (!jsonResult || jsonResult.length === 0) return ""; +function makeExecutionFailure(message: string, operatorId: string): WorkflowFatalError { + const now = Date.now(); + return { + type: { name: WorkflowFatalErrorType.EXECUTION_FAILURE }, + timestamp: { seconds: Math.floor(now / 1000), nanos: (now % 1000) * 1_000_000 }, + message, + details: "", + operatorId, + workerId: "", + }; +} + +function jsonToTableFormat(rows: SampleRow[]): string { + if (!rows || rows.length === 0) return ""; - const hasRowIndex = jsonResult.length > 0 && "__row_index__" in jsonResult[0]; - const headers = getVisibleResultHeaders(jsonResult[0]); + const headers = getVisibleResultHeaders(rows[0].tuple); if (headers.length === 0) return ""; // Leading tab aligns headers with the index column (pandas __repr__ style). const headerLine = "\t" + headers.join("\t"); @@ -405,10 +386,7 @@ function jsonToTableFormat(jsonResult: Record[]): string { const formattedRows: string[] = []; let prevIndex = -1; - for (let i = 0; i < jsonResult.length; i++) { - const row = jsonResult[i]; - const rowIndex = hasRowIndex ? (row["__row_index__"] as number) : i; - + for (const { rowIndex, tuple } of rows) { if (prevIndex >= 0 && rowIndex > prevIndex + 1) { const dots = headers.map(() => "...").join("\t"); formattedRows.push(`...\t${dots}`); @@ -416,7 +394,7 @@ function jsonToTableFormat(jsonResult: Record[]): string { prevIndex = rowIndex; const cells = headers.map(h => { - const val = row[h]; + const val = tuple[h]; if (val === null) return "NaN"; if (val === undefined) return ""; if (typeof val === "number" || typeof val === "boolean") return String(val); @@ -438,7 +416,7 @@ export async function executeOperatorAndFormat( operatorId: string, options: { abortSignal?: AbortSignal; - onResult?: (operatorId: string, operatorInfo: OperatorInfo) => void; + onResult?: (operatorId: string, operatorInfo: OperatorExecutionSummary) => void; onResultLegacy?: (operatorId: string, backendStats?: Record) => void; } = {} ): Promise { @@ -466,34 +444,27 @@ export async function executeOperatorAndFormat( } } - const result: SyncExecutionResult = await executeWorkflowHttp(config, logicalPlan, { + const result: WorkflowExecutionSummary = await executeWorkflowHttp(config, logicalPlan, { abortSignal: options.abortSignal, }); if (!result.success) { - const compilationErrors = - result.state === "CompilationFailed" || result.state === "ValidationFailed" - ? result.compilationErrors - : undefined; - const operatorErrors = - result.state === "Failed" + result.state === WorkflowExecutionState.FAILED ? Object.entries(result.operators) - .filter(([_, op]) => op.error) - .map(([opId, op]) => ({ operatorId: opId, error: op.error! })) + .filter(([_, op]) => op.errorMessages.length) + .map(([opId, op]) => ({ operatorId: opId, error: op.errorMessages.map(e => e.message).join("; ") })) : undefined; - const generalErrors = result.state === "Killed" ? ["Workflow execution was killed (timeout)."] : result.errors; + const generalErrors = + result.state === WorkflowExecutionState.KILLED ? ["Workflow execution was killed (timeout)."] : result.errors; - const errorText = formatExecutionError(compilationErrors, operatorErrors, generalErrors); + const errorText = formatExecutionError(operatorErrors, generalErrors); if (options.onResult) { - const errorInfo: OperatorInfo = { - state: result.state, - inputTuples: 0, - outputTuples: 0, - resultMode: "table", - error: errorText, + const errorInfo: OperatorExecutionSummary = { + state: OperatorState.FAILED, + errorMessages: [makeExecutionFailure(errorText, operatorId)], }; options.onResult(operatorId, errorInfo); } @@ -503,36 +474,35 @@ export async function executeOperatorAndFormat( const opInfo = result.operators[operatorId]; if (!opInfo) { - return createErrorResult( - formatExecutionError(undefined, undefined, [`No result found for operator: ${operatorId}`]) - ); + return createErrorResult(formatExecutionError(undefined, [`No result found for operator: ${operatorId}`])); } - if (opInfo.error) { + if (opInfo.errorMessages.length) { if (options.onResult) { options.onResult(operatorId, opInfo); } - return createErrorResult(formatExecutionError(undefined, [{ operatorId, error: opInfo.error }])); + const opError = opInfo.errorMessages.map(e => e.message).join("; "); + return createErrorResult(formatExecutionError([{ operatorId, error: opError }])); } - if (!opInfo.result || !Array.isArray(opInfo.result)) { + const sampleTuples = opInfo.resultSummary?.sampleTuples; + if (!sampleTuples || !Array.isArray(sampleTuples)) { return "(no result data)"; } - const jsonArray = opInfo.result as Record[]; - const headers = jsonArray.length > 0 ? getVisibleResultHeaders(jsonArray[0]) : []; + const headers = sampleTuples.length > 0 ? getVisibleResultHeaders(sampleTuples[0].tuple) : []; const columns = headers.length; // Notify for every operator in the execution so upstream stats are also stored. if (options.onResult) { for (const [opId, info] of Object.entries(result.operators)) { - if (info && !info.error) { + if (info && !info.errorMessages.length) { options.onResult(opId, info); } } } - let dataString = jsonToTableFormat(jsonArray); + let dataString = jsonToTableFormat(sampleTuples); // Safety-net: TSV serialization may add padding beyond backend's raw-record budget. const charLimit = config.maxOperatorResultCharLimit ?? DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit; @@ -568,9 +538,11 @@ export async function executeOperatorAndFormat( dataString = [headerLine, ...keptRows].join("\n"); } - const shapeLine = formatInputOutput(workflowState, operatorId, opInfo, columns); + // Output shape only; the agent derives input-port shapes from the DAG + the + // upstream operators' own output shapes shown in context. + const shapeLine = `Output table shape: (${opInfo.resultSummary?.tuplesCount ?? 0}, ${columns})`; - const warningLines = opInfo.warnings?.map(w => w) ?? []; + const warningLines = getOperatorWarnings(opInfo); const metadataLines = [shapeLine, ...warningLines].filter(Boolean); @@ -589,7 +561,7 @@ export async function executeOperatorAndFormat( export function createExecuteOperatorTool( workflowState: WorkflowState, getConfig: () => ExecutionConfig, - onResult?: (operatorId: string, operatorInfo: OperatorInfo) => void + onResult?: (operatorId: string, operatorInfo: OperatorExecutionSummary) => void ) { return tool({ description: diff --git a/agent-service/src/agent/workflow-result-state.spec.ts b/agent-service/src/agent/workflow-result-state.spec.ts index b2e46fd0d9e..4345b12f9b0 100644 --- a/agent-service/src/agent/workflow-result-state.spec.ts +++ b/agent-service/src/agent/workflow-result-state.spec.ts @@ -19,14 +19,13 @@ import { describe, expect, test } from "bun:test"; import { WorkflowResultState } from "./workflow-result-state"; -import type { OperatorInfo } from "../types/execution"; +import { OperatorResultMode, OperatorState, type OperatorExecutionSummary } from "../types/execution"; -function makeInfo(outputTuples: number): OperatorInfo { +function makeInfo(tuplesCount: number): OperatorExecutionSummary { return { - state: "Completed", - inputTuples: 0, - outputTuples, - resultMode: "table", + state: OperatorState.COMPLETED, + errorMessages: [], + resultSummary: { resultMode: OperatorResultMode.TABLE, sampleTuples: [], tuplesCount }, }; } @@ -40,15 +39,15 @@ describe("WorkflowResultState - ancestor walk", () => { state.set("op1", "step-C", makeInfo(3)); path = ["step-A", "step-B", "step-C"]; - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(3); + expect(state.get("op1")?.operatorInfo.resultSummary?.tuplesCount).toBe(3); // Rewind to step-B; step-C is no longer an ancestor. path = ["step-A", "step-B"]; - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(2); + expect(state.get("op1")?.operatorInfo.resultSummary?.tuplesCount).toBe(2); // Rewind further. path = ["step-A"]; - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(1); + expect(state.get("op1")?.operatorInfo.resultSummary?.tuplesCount).toBe(1); }); test("returns undefined when no ancestor has a result", () => { @@ -74,8 +73,8 @@ describe("WorkflowResultState - ancestor walk", () => { path = ["step-A", "step-B"]; const visible = state.getAllVisible(); expect(visible.size).toBe(2); - expect(visible.get("op1")?.operatorInfo.outputTuples).toBe(1); - expect(visible.get("op2")?.operatorInfo.outputTuples).toBe(7); + expect(visible.get("op1")?.operatorInfo.resultSummary?.tuplesCount).toBe(1); + expect(visible.get("op2")?.operatorInfo.resultSummary?.tuplesCount).toBe(7); }); test("clear drops all stored results", () => { @@ -90,6 +89,6 @@ describe("WorkflowResultState - ancestor walk", () => { const state = new WorkflowResultState(() => ["step-A"]); state.set("op1", "step-A", makeInfo(1)); state.set("op1", "step-A", makeInfo(42)); - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(42); + expect(state.get("op1")?.operatorInfo.resultSummary?.tuplesCount).toBe(42); }); }); diff --git a/agent-service/src/agent/workflow-result-state.ts b/agent-service/src/agent/workflow-result-state.ts index e6f13c2301a..34d604657e3 100644 --- a/agent-service/src/agent/workflow-result-state.ts +++ b/agent-service/src/agent/workflow-result-state.ts @@ -17,10 +17,10 @@ * under the License. */ -import type { OperatorInfo } from "../types/execution"; +import type { OperatorExecutionSummary } from "../types/execution"; interface ResultEntry { - operatorInfo: OperatorInfo; + operatorInfo: OperatorExecutionSummary; stepId: string; } @@ -37,7 +37,7 @@ export class WorkflowResultState { constructor(private getAncestorPath: () => string[]) {} - set(operatorId: string, stepId: string, operatorInfo: OperatorInfo): void { + set(operatorId: string, stepId: string, operatorInfo: OperatorExecutionSummary): void { let versions = this.results.get(operatorId); if (!versions) { versions = new Map(); @@ -58,7 +58,7 @@ export class WorkflowResultState { return undefined; } - getOperatorInfo(operatorId: string): OperatorInfo | undefined { + getOperatorInfo(operatorId: string): OperatorExecutionSummary | undefined { return this.get(operatorId)?.operatorInfo; } diff --git a/agent-service/src/api/compile-api.ts b/agent-service/src/api/compile-api.ts index 8ffd27fd52c..8a361231c42 100644 --- a/agent-service/src/api/compile-api.ts +++ b/agent-service/src/api/compile-api.ts @@ -19,8 +19,13 @@ import { getBackendConfig } from "./backend-api"; import type { LogicalPlan, OperatorPortSchemaMap } from "../types/workflow"; +import type { WorkflowFatalError } from "../types/execution"; import { createLogger } from "../logger"; +// WorkflowFatalError is defined in types/execution.ts (shared by compile and +// execution errors); re-exported here for existing importers of this module. +export type { WorkflowFatalError }; + const log = createLogger("CompileAPI"); export interface SchemaAttribute { @@ -30,12 +35,6 @@ export interface SchemaAttribute { export type PortSchema = ReadonlyArray; -export interface WorkflowFatalError { - type: string; - message: string; - operatorId?: string; -} - export interface WorkflowCompilationResponse { physicalPlan?: any; operatorOutputSchemas: Record; diff --git a/agent-service/src/server.spec.ts b/agent-service/src/server.spec.ts index c1fdddc9339..d64aa855b9f 100644 --- a/agent-service/src/server.spec.ts +++ b/agent-service/src/server.spec.ts @@ -21,6 +21,7 @@ import { beforeEach, describe, expect, spyOn, test } from "bun:test"; import { buildApp, start, _resetAgentStoreForTests, _getAgentForTests } from "./server"; import { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata"; import { env } from "./config/env"; +import { OperatorResultMode, OperatorState } from "./types/execution"; const API = env.API_PREFIX; const app = buildApp(); @@ -323,6 +324,7 @@ describe("agent read routes", () => { test("GET /:id/operator-results maps the visible operator results", async () => { const agent = _getAgentForTests(id)!; + // The route returns each operator's OperatorExecutionSummary verbatim. (agent as any).getWorkflowResultState = () => ({ getAllVisible: () => new Map([ @@ -330,27 +332,25 @@ describe("agent read routes", () => { "op-1", { operatorInfo: { - state: "COMPLETED", - inputTuples: 1, - outputTuples: 2, - inputPortShapes: [], - result: [{ a: 1 }], - error: undefined, - warnings: [], - consoleLogs: [], - totalRowCount: 2, - resultStatistics: {}, + state: OperatorState.COMPLETED, + errorMessages: [], + resultSummary: { + resultMode: OperatorResultMode.TABLE, + sampleTuples: [{ rowIndex: 0, tuple: { a: 1 } }], + tuplesCount: 2, + }, }, }, ], ]), }); - const body = await readJson<{ results: Record }>( - await getJson(`${API}/agents/${id}/operator-results`) - ); - expect(body.results["op-1"].outputTuples).toBe(2); - expect(body.results["op-1"].outputColumns).toBe(1); + const body = await readJson<{ + results: Record; + }>(await getJson(`${API}/agents/${id}/operator-results`)); + expect(body.results["op-1"].state).toBe("Completed"); + expect(body.results["op-1"].resultSummary.tuplesCount).toBe(2); + expect(body.results["op-1"].resultSummary.sampleTuples).toEqual([{ rowIndex: 0, tuple: { a: 1 } }]); }); }); diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index 030d27b95bf..6c23943be59 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -21,7 +21,6 @@ import { Elysia, t } from "elysia"; import { cors } from "@elysiajs/cors"; import { createOpenAI } from "@ai-sdk/openai"; import { TexeraAgent } from "./agent/texera-agent"; -import { getVisibleResultHeaders } from "./agent/tools/tools-utility"; import { getBackendConfig } from "./api/backend-api"; import { extractBearerToken, extractUserFromToken, validateToken } from "./api/auth-api"; import { retrieveWorkflow } from "./api/workflow-api"; @@ -42,7 +41,7 @@ import type { import { AgentState, OperatorResultSerializationMode } from "./types/agent"; import type { WsClientCommand, WsServerEvent } from "./types/ws"; import { WsServerSnapshotEvent, WsServerStepEvent, WsServerStatusEvent, WsServerErrorEvent } from "./types/ws"; -import type { OperatorResultSummary } from "./types/execution"; +import type { OperatorExecutionSummary } from "./types/execution"; const agentStore = new Map(); let agentCounter = 0; @@ -388,25 +387,12 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) } ); -function getOperatorResultSummaries(agent: TexeraAgent): Record { +function getOperatorResultSummaries(agent: TexeraAgent): Record { const resultState = agent.getWorkflowResultState(); const visible = resultState.getAllVisible(); - const results: Record = {}; + const results: Record = {}; for (const [opId, entry] of visible) { - const info = entry.operatorInfo; - results[opId] = { - state: info.state, - inputTuples: info.inputTuples, - outputTuples: info.outputTuples, - inputPortShapes: info.inputPortShapes, - outputColumns: info.result && info.result.length > 0 ? getVisibleResultHeaders(info.result[0]).length : undefined, - error: info.error, - warnings: info.warnings, - consoleLogCount: info.consoleLogs?.length, - totalRowCount: info.totalRowCount, - sampleRecords: info.result, - resultStatistics: info.resultStatistics, - }; + results[opId] = entry.operatorInfo; } return results; } diff --git a/agent-service/src/types/execution.ts b/agent-service/src/types/execution.ts index d638a889d47..52672fa83e3 100644 --- a/agent-service/src/types/execution.ts +++ b/agent-service/src/types/execution.ts @@ -17,56 +17,122 @@ * under the License. */ -interface ConsoleMessage { - msgType: string; +export enum WorkflowFatalErrorType { + COMPILATION_ERROR = "COMPILATION_ERROR", + EXECUTION_FAILURE = "EXECUTION_FAILURE", +} + +// A fatal error reported for one operator. Reuses the engine's wire shape +// (workflowruntimestate.proto). The same type the workflow-compiling service +// returns for compilation errors, so compile and execution errors share one +// shape. Re-exported by api/compile-api.ts. +export interface WorkflowFatalError { + type: { name: WorkflowFatalErrorType }; + timestamp: { seconds: number; nanos: number }; message: string; + details: string; + operatorId: string; + workerId: string; } -interface PortShape { - portIndex: number; - rows: number; - columns: number; +// Lifecycle state of a single operator, as reported by the engine +// (mirrors the backend's WorkflowAggregatedState string mapping). +export enum OperatorState { + UNINITIALIZED = "Uninitialized", + READY = "Ready", + RUNNING = "Running", + PAUSING = "Pausing", + PAUSED = "Paused", + RESUMING = "Resuming", + COMPLETED = "Completed", + FAILED = "Failed", + KILLED = "Killed", + TERMINATED = "Terminated", + UNKNOWN = "Unknown", } -export interface OperatorInfo { - state: string; - inputTuples: number; - outputTuples: number; - inputPortShapes?: PortShape[]; - resultMode: string; - result?: Record[]; - totalRowCount?: number; - displayedRows?: number; - truncated?: boolean; - consoleLogs?: ConsoleMessage[]; - error?: string; - warnings?: string[]; - resultStatistics?: Record; +// Aggregated state of a whole workflow execution: the OperatorState values the +// engine reports, plus the synthetic outcomes the sync-execution endpoint adds. +export enum WorkflowExecutionState { + UNINITIALIZED = "Uninitialized", + READY = "Ready", + RUNNING = "Running", + PAUSING = "Pausing", + PAUSED = "Paused", + RESUMING = "Resuming", + COMPLETED = "Completed", + FAILED = "Failed", + KILLED = "Killed", + TERMINATED = "Terminated", + UNKNOWN = "Unknown", + ERROR = "Error", + COMPILATION_FAILED = "CompilationFailed", } -export interface SyncExecutionResult { - success: boolean; - state: string; - operators: Record; - compilationErrors?: Record; - errors?: string[]; +export enum ConsoleMessageType { + PRINT = "PRINT", + ERROR = "ERROR", + COMMAND = "COMMAND", + DEBUGGER = "DEBUGGER", } -/** - * Wire projection of one operator's execution result, summarized for the - * client: counts and a small record sample instead of full payloads. Returned - * by the REST route `GET /agents/:id/operator-results`. - */ +// A single console message emitted by an operator during execution. +// `title` is the short header (Scala errors put their text here); `message` is +// the body (Python errors / stack traces). +export interface ConsoleMessage { + msgType: ConsoleMessageType; + title: string; + message: string; +} + +// One sampled output row: its original position plus the row's columns. +export interface SampleRow { + rowIndex: number; + tuple: Record; +} + +export enum OperatorResultMode { + TABLE = "table", + VISUALIZATION = "visualization", +} + +// An operator's output, summarized for the agent. `sampleTuples` are the +// symmetrically-truncated output rows (the middle is dropped, so `rowIndex` +// values may have gaps). `outputSchema` / per-column statistics are intended +// future additions — the engine does not produce them yet. export interface OperatorResultSummary { - state: string; - inputTuples: number; - outputTuples: number; - inputPortShapes?: PortShape[]; - outputColumns?: number; - error?: string; - warnings?: string[]; - consoleLogCount?: number; - totalRowCount?: number; - sampleRecords?: Record[]; - resultStatistics?: Record; + resultMode: OperatorResultMode; + sampleTuples: SampleRow[]; + // Total output rows before truncation (sampleTuples may hold fewer). + tuplesCount: number; +} + +// An operator's console output. Warnings are not a separate field: they are the +// messages whose `title` the engine prefixes with "WARNING: ", derived on demand. +export interface OperatorConsoleLogsSummary { + messages: ConsoleMessage[]; +} + +// Per-operator execution summary returned by the sync-execution backend. +// Orthogonal sub-summaries replace the previous flat `OperatorInfo`. +export interface OperatorExecutionSummary { + state: OperatorState; + // Empty means the operator did not fail. + errorMessages: ReadonlyArray; + // Absent when the operator produced no materialized result. + resultSummary?: OperatorResultSummary; + // Absent when the operator produced no console output. + consoleLogsSummary?: OperatorConsoleLogsSummary; +} + +// The result of one synchronous workflow execution. +export interface WorkflowExecutionSummary { + // True only on a clean run; can be false even when state is "Completed" + // (e.g. an operator logged a console error without aborting the run). + success: boolean; + state: WorkflowExecutionState; + operators: Record; + // Workflow-level errors (timeouts, init/compile failures, fatal errors); + // empty means none. + errors: string[]; } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala index b70bafb4b0b..932d2eb7d96 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala @@ -44,6 +44,9 @@ import org.apache.texera.amber.engine.common.executionruntimestate.{ ExecutionMetadataStore, ExecutionStatsStore } +import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError +import org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE +import com.google.protobuf.timestamp.Timestamp import io.reactivex.rxjava3.core.Observable import org.apache.texera.auth.SessionUser import org.apache.texera.dao.SqlServer @@ -55,12 +58,12 @@ import org.apache.texera.web.service.{ExecutionResultService, WorkflowService} import org.apache.texera.web.storage.ExecutionStateStore.updateWorkflowState import java.net.URI +import java.time.Instant import java.util.concurrent.TimeUnit import javax.annotation.security.RolesAllowed import javax.ws.rs._ import javax.ws.rs.core.MediaType import scala.collection.mutable -import scala.jdk.CollectionConverters._ import com.fasterxml.jackson.databind.ObjectMapper case class SyncExecutionRequest( @@ -79,32 +82,41 @@ case class ConsoleMessageInfo( message: String ) -case class PortShape( - portIndex: Int, - rows: Long +// One sampled output row: the original row position plus the row's columns as a +// processed/truncated JSON object (not a raw engine Tuple, which would serialize +// as {schema, fields[]} and bypass the type-aware conversion + cell truncation). +// The index is carried explicitly rather than embedded in the tuple. +case class SampleRow( + rowIndex: Int, + tuple: ObjectNode ) -case class OperatorInfo( - state: String, - inputTuples: Long, - outputTuples: Long, - inputPortShapes: Option[List[PortShape]], +case class OperatorResultSummary( resultMode: String, // "table" or "visualization" - result: Option[Any], // JSON array (List[ObjectNode]) - totalRowCount: Option[Int], - displayedRows: Option[Int], - truncated: Option[Boolean], - consoleLogs: Option[List[ConsoleMessageInfo]], - error: Option[String], - warnings: Option[List[String]] + sampleTuples: List[SampleRow], + tuplesCount: Int +) + +case class OperatorConsoleLogsSummary( + messages: List[ConsoleMessageInfo] ) -case class SyncExecutionResult( +// Per-operator execution summary. Orthogonal sub-summaries replace the previous +// flat OperatorInfo; must stay in sync with agent-service's OperatorExecutionSummary. +// `errorMessages` reuses the engine's WorkflowFatalError, the same type the +// compiling service returns for compilation errors, for one consistent wire shape. +case class OperatorExecutionSummary( + state: String, + errorMessages: List[WorkflowFatalError], // empty means the operator did not fail + resultSummary: Option[OperatorResultSummary], + consoleLogsSummary: Option[OperatorConsoleLogsSummary] +) + +case class WorkflowExecutionSummary( success: Boolean, state: String, - operators: Map[String, OperatorInfo], - compilationErrors: Option[Map[String, String]], - errors: Option[List[String]] + operators: Map[String, OperatorExecutionSummary], + errors: List[String] // empty means none ) sealed trait TerminationReason @@ -129,7 +141,7 @@ class SyncExecutionResource extends LazyLogging { @PathParam("cuid") computingUnitId: Int, request: SyncExecutionRequest, @Auth user: SessionUser - ): SyncExecutionResult = { + ): WorkflowExecutionSummary = { val timeoutSeconds = request.timeoutSeconds val maxOperatorResultCharLimit = @@ -176,12 +188,11 @@ class SyncExecutionResource extends LazyLogging { val executionService = workflowService.executionService.getValue if (executionService == null) { - return SyncExecutionResult( + return WorkflowExecutionSummary( success = false, state = "Error", operators = Map.empty, - compilationErrors = None, - errors = Some(List("Failed to initialize execution service")) + errors = List("Failed to initialize execution service") ) } @@ -254,21 +265,19 @@ class SyncExecutionResource extends LazyLogging { } catch { case _: java.util.concurrent.TimeoutException => killExecution(executionService) - return SyncExecutionResult( + return WorkflowExecutionSummary( success = false, state = "Killed", operators = Map.empty, - compilationErrors = None, - errors = Some(List(s"Timeout after $timeoutSeconds seconds")) + errors = List(s"Timeout after $timeoutSeconds seconds") ) case e: Exception => logger.error(s"Error waiting for execution: ${e.getMessage}", e) - return SyncExecutionResult( + return WorkflowExecutionSummary( success = false, state = "Error", operators = Map.empty, - compilationErrors = None, - errors = Some(List(e.getMessage)) + errors = List(e.getMessage) ) } } @@ -318,7 +327,7 @@ class SyncExecutionResource extends LazyLogging { .map(err => s"${err.`type`}: ${err.message}") .toList - val hasOperatorConsoleError = operatorInfos.values.exists(_.error.isDefined) + val hasOperatorConsoleError = operatorInfos.values.exists(_.errorMessages.nonEmpty) val stateString = if (terminatedByConsoleError) "Failed" @@ -328,12 +337,11 @@ class SyncExecutionResource extends LazyLogging { val isSuccess = (finalState.state == COMPLETED || terminatedByTargetResults) && !hasOperatorConsoleError && !terminatedByConsoleError - SyncExecutionResult( + WorkflowExecutionSummary( success = isSuccess, state = stateString, operators = operatorInfos, - compilationErrors = None, - errors = if (fatalErrors.nonEmpty) Some(fatalErrors) else None + errors = fatalErrors ) } catch { @@ -382,8 +390,8 @@ class SyncExecutionResource extends LazyLogging { maxOperatorResultCharLimit: Int, maxOperatorResultCellCharLimit: Int, inMemoryConsoleState: Option[ExecutionConsoleStore] = None - ): Map[String, OperatorInfo] = { - val operatorInfos = mutable.Map[String, OperatorInfo]() + ): Map[String, OperatorExecutionSummary] = { + val operatorInfos = mutable.Map[String, OperatorExecutionSummary]() val statsState = executionService.executionStateStore.statsStore.getState val operatorStats = statsState.operatorInfo @@ -406,23 +414,9 @@ class SyncExecutionResource extends LazyLogging { for (opId <- targetOps) { val stats = operatorStats.get(opId) - val (state, inputTuples, outputTuples): (String, Long, Long) = stats match { - case Some(s) => - val inputCount = s.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum - val outputCount = s.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum - (stateToString(s.operatorState), inputCount, outputCount) - case None => ("Unknown", 0L, 0L) - } + val state = stats.map(s => stateToString(s.operatorState)).getOrElse("Unknown") - val inputPortShapes: Option[List[PortShape]] = stats - .map { s => - s.operatorStatistics.inputMetrics.map { pm => - PortShape(pm.portId.id, pm.tupleMetrics.count) - }.toList - } - .filter(_.nonEmpty) - - val (resultMode, result, totalRowCount, displayedRows, truncated) = + val (resultMode, result, tuplesCount, _, _) = collectOperatorResult( executionId, opId, @@ -459,31 +453,40 @@ class SyncExecutionResource extends LazyLogging { } ) - // Convention: PRINT messages prefixed with "WARNING: " surface as warnings. - val warningMsgs = consoleLogs - .map(_.filter(_.title.startsWith("WARNING: ")).map(_.title)) - .filter(_.nonEmpty) - operatorInfos(opId) = OperatorInfo( + // Absent when the operator produced no materialized result. `result` and + // `tuplesCount` are populated together, so map over the former. + val resultSummary = result.map { tuples => + OperatorResultSummary( + resultMode = resultMode, + sampleTuples = tuples, + tuplesCount = tuplesCount.getOrElse(0) + ) + } + + val consoleLogsSummary = consoleLogs.map { logs => + OperatorConsoleLogsSummary(messages = logs) + } + + // Per-operator runtime errors come from console ERROR logs; surface them as + // EXECUTION_FAILURE WorkflowFatalErrors (same type the compiler emits for + // COMPILATION_ERRORs). Empty list means the operator did not fail. + val errorMessages = errorMsg + .map(msg => List(WorkflowFatalError(EXECUTION_FAILURE, Timestamp(Instant.now), msg, "", opId))) + .getOrElse(List.empty) + + operatorInfos(opId) = OperatorExecutionSummary( state = state, - inputTuples = inputTuples, - outputTuples = outputTuples, - inputPortShapes = inputPortShapes, - resultMode = resultMode, - result = result, - totalRowCount = totalRowCount, - displayedRows = displayedRows, - truncated = truncated, - consoleLogs = consoleLogs, - error = errorMsg, - warnings = warningMsgs + errorMessages = errorMessages, + resultSummary = resultSummary, + consoleLogsSummary = consoleLogsSummary ) } operatorInfos.toMap } - private def handleExecutionError(e: Exception): SyncExecutionResult = { + private def handleExecutionError(e: Exception): WorkflowExecutionSummary = { val errorMsg = e.getMessage val isCompilationError = errorMsg != null && ( errorMsg.contains("compilation") || @@ -493,20 +496,18 @@ class SyncExecutionResource extends LazyLogging { ) if (isCompilationError) { - SyncExecutionResult( + WorkflowExecutionSummary( success = false, state = "CompilationFailed", operators = Map.empty, - compilationErrors = Some(Map("error" -> errorMsg)), - errors = Some(List(errorMsg)) + errors = List(errorMsg) ) } else { - SyncExecutionResult( + WorkflowExecutionSummary( success = false, state = "Error", operators = Map.empty, - compilationErrors = None, - errors = Some(List(Option(e.getMessage).getOrElse("Unknown error"))) + errors = List(Option(e.getMessage).getOrElse("Unknown error")) ) } } @@ -521,9 +522,7 @@ class SyncExecutionResource extends LazyLogging { opId: String, maxOperatorResultCharLimit: Int, maxOperatorResultCellCharLimit: Int - ): (String, Option[Any], Option[Int], Option[Int], Option[Boolean]) = { - import com.fasterxml.jackson.databind.node.ObjectNode - + ): (String, Option[List[SampleRow]], Option[Int], Option[Int], Option[Boolean]) = { try { val storageUriOption = WorkflowExecutionsResource.getResultUriByLogicalPortId( executionId, @@ -543,13 +542,7 @@ class SyncExecutionResource extends LazyLogging { val tupleIterator = document.get() if (totalCount == 0 || !tupleIterator.hasNext) { - return ( - "table", - Some(List.empty[ObjectNode].asJava), - Some(0), - Some(0), - Some(false) - ) + return ("table", Some(List.empty[SampleRow]), Some(0), Some(0), Some(false)) } // A single tuple with html-content / json-content is a visualization payload — @@ -558,71 +551,52 @@ class SyncExecutionResource extends LazyLogging { if (totalCount == 1 && isVisualizationTuple(firstTuple)) { val jsonResults = ExecutionResultService.convertTuplesToJson(List(firstTuple), isVisualization = true) - jsonResults.foreach( - _.asInstanceOf[ObjectNode].put("__is_visualization__", true) - ) - return ( - "visualization", - Some(jsonResults), - Some(totalCount), - Some(1), - Some(false) - ) + val rows = jsonResults.zipWithIndex.map { case (json, idx) => SampleRow(idx, json) } + return ("visualization", Some(rows), Some(totalCount), Some(1), Some(false)) } - // __row_index__ preserves the original position so the frontend can show - // "row N" correctly after symmetric truncation drops the middle. + // rowIndex preserves the original position so the client can show "row N" + // correctly after symmetric truncation drops the middle. var rowIndex = 0 val firstJson = ExecutionResultService.convertTuplesToJson(List(firstTuple)).head val truncatedFirst = truncateSingleTuple(firstJson, maxOperatorResultCellCharLimit) - truncatedFirst.put("__row_index__", rowIndex) val firstSize = estimateTupleSize(truncatedFirst, mapper) if (firstSize >= maxOperatorResultCharLimit) { - return ( - "table", - Some(List(truncatedFirst).asJava), - Some(totalCount), - Some(1), - Some(true) - ) + return ("table", Some(List(SampleRow(rowIndex, truncatedFirst))), Some(totalCount), Some(1), Some(true)) } val halfLimit = maxOperatorResultCharLimit / 2 val truncationNoticeSize = 50 // reserved for the "...skipped..." marker - val frontTuples = mutable.ListBuffer[ObjectNode](truncatedFirst) + val frontRows = mutable.ListBuffer[SampleRow](SampleRow(rowIndex, truncatedFirst)) var frontSize = firstSize - var processedCount = 1 while (tupleIterator.hasNext && frontSize < halfLimit) { val tuple = tupleIterator.next() rowIndex += 1 - processedCount += 1 val jsonTuple = ExecutionResultService.convertTuplesToJson(List(tuple)).head val truncatedTuple = truncateSingleTuple(jsonTuple, maxOperatorResultCellCharLimit) - truncatedTuple.put("__row_index__", rowIndex) val tupleSize = estimateTupleSize(truncatedTuple, mapper) + val row = SampleRow(rowIndex, truncatedTuple) if (frontSize + tupleSize <= halfLimit) { - frontTuples += truncatedTuple + frontRows += row frontSize += tupleSize } else { // Front is full — switch to a sliding window for the back half. - val backBuffer = mutable.ArrayBuffer[(ObjectNode, Int)]() - backBuffer += ((truncatedTuple, tupleSize)) + val backBuffer = mutable.ArrayBuffer[(SampleRow, Int)]() + backBuffer += ((row, tupleSize)) var backSize = tupleSize while (tupleIterator.hasNext) { val t = tupleIterator.next() rowIndex += 1 - processedCount += 1 val jt = ExecutionResultService.convertTuplesToJson(List(t)).head val tt = truncateSingleTuple(jt, maxOperatorResultCellCharLimit) - tt.put("__row_index__", rowIndex) val ts = estimateTupleSize(tt, mapper) - backBuffer += ((tt, ts)) + backBuffer += ((SampleRow(rowIndex, tt), ts)) backSize += ts while (backSize > halfLimit - truncationNoticeSize && backBuffer.size > 1) { @@ -631,34 +605,24 @@ class SyncExecutionResource extends LazyLogging { } } - val backTuples = backBuffer.map(_._1).toList - val allTuples = frontTuples.toList ++ backTuples - val skippedRows = totalCount - allTuples.size - - return ( - "table", - Some(allTuples.asJava), - Some(totalCount), - Some(allTuples.size), - Some(skippedRows > 0) - ) + val allRows = frontRows.toList ++ backBuffer.map(_._1).toList + val skippedRows = totalCount - allRows.size + return ("table", Some(allRows), Some(totalCount), Some(allRows.size), Some(skippedRows > 0)) } } if (tupleIterator.hasNext) { - val backBuffer = mutable.ArrayBuffer[(ObjectNode, Int)]() + val backBuffer = mutable.ArrayBuffer[(SampleRow, Int)]() var backSize = 0 while (tupleIterator.hasNext) { val t = tupleIterator.next() rowIndex += 1 - processedCount += 1 val jt = ExecutionResultService.convertTuplesToJson(List(t)).head val tt = truncateSingleTuple(jt, maxOperatorResultCellCharLimit) - tt.put("__row_index__", rowIndex) val ts = estimateTupleSize(tt, mapper) - backBuffer += ((tt, ts)) + backBuffer += ((SampleRow(rowIndex, tt), ts)) backSize += ts while (backSize > halfLimit - truncationNoticeSize && backBuffer.size > 1) { @@ -667,25 +631,11 @@ class SyncExecutionResource extends LazyLogging { } } - val backTuples = backBuffer.map(_._1).toList - val allTuples = frontTuples.toList ++ backTuples - val skippedRows = totalCount - allTuples.size - - ( - "table", - Some(allTuples.asJava), - Some(totalCount), - Some(allTuples.size), - Some(skippedRows > 0) - ) + val allRows = frontRows.toList ++ backBuffer.map(_._1).toList + val skippedRows = totalCount - allRows.size + ("table", Some(allRows), Some(totalCount), Some(allRows.size), Some(skippedRows > 0)) } else { - ( - "table", - Some(frontTuples.toList.asJava), - Some(totalCount), - Some(frontTuples.size), - Some(false) - ) + ("table", Some(frontRows.toList), Some(totalCount), Some(frontRows.size), Some(false)) } case None => diff --git a/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html b/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html index cd2f1e0f52a..1ff6d9c4c6d 100644 --- a/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html +++ b/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html @@ -20,7 +20,7 @@