diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 0943d3e04b4..e29c3849038 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -16,8 +16,12 @@ import { } from '@/lib/workflows/triggers/triggers' import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow' import { + addHttpErrorConsoleEntry, type BlockEventHandlerConfig, createBlockEventHandlers, + addExecutionErrorConsoleEntry as sharedAddExecutionErrorConsoleEntry, + handleExecutionCancelledConsole as sharedHandleExecutionCancelledConsole, + handleExecutionErrorConsole as sharedHandleExecutionErrorConsole, } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils' import { getBlock } from '@/blocks' import type { SerializableExecutionState } from '@/executor/execution/types' @@ -159,22 +163,7 @@ export function useWorkflowExecution() { setActiveBlocks, ]) - /** - * Builds timing fields for execution-level console entries. - */ - const buildExecutionTiming = useCallback((durationMs?: number) => { - const normalizedDuration = durationMs || 0 - return { - durationMs: normalizedDuration, - startedAt: new Date(Date.now() - normalizedDuration).toISOString(), - endedAt: new Date().toISOString(), - } - }, []) - - /** - * Adds an execution-level error entry to the console when appropriate. - */ - const addExecutionErrorConsoleEntry = useCallback( + const handleExecutionErrorConsole = useCallback( (params: { workflowId?: string executionId?: string @@ -184,102 +173,23 @@ export function useWorkflowExecution() { isPreExecutionError?: boolean }) => { if (!params.workflowId) return - - const hasBlockError = params.blockLogs.some((log) => log.error) - const isPreExecutionError = params.isPreExecutionError ?? false - if (!isPreExecutionError && hasBlockError) { - return - } - - const errorMessage = params.error || 'Execution failed' - const isTimeout = errorMessage.toLowerCase().includes('timed out') - const timing = buildExecutionTiming(params.durationMs) - - addConsole({ - input: {}, - output: {}, - success: false, - error: errorMessage, - durationMs: timing.durationMs, - startedAt: timing.startedAt, - executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER, - endedAt: timing.endedAt, + sharedHandleExecutionErrorConsole(addConsole, cancelRunningEntries, { + ...params, workflowId: params.workflowId, - blockId: isPreExecutionError - ? 'validation' - : isTimeout - ? 'timeout-error' - : 'execution-error', - executionId: params.executionId, - blockName: isPreExecutionError - ? 'Workflow Validation' - : isTimeout - ? 'Timeout Error' - : 'Execution Error', - blockType: isPreExecutionError ? 'validation' : 'error', }) }, - [addConsole, buildExecutionTiming] + [addConsole, cancelRunningEntries] ) - /** - * Adds an execution-level cancellation entry to the console. - */ - const addExecutionCancelledConsoleEntry = useCallback( + const handleExecutionCancelledConsole = useCallback( (params: { workflowId?: string; executionId?: string; durationMs?: number }) => { if (!params.workflowId) return - - const timing = buildExecutionTiming(params.durationMs) - addConsole({ - input: {}, - output: {}, - success: false, - error: 'Execution was cancelled', - durationMs: timing.durationMs, - startedAt: timing.startedAt, - executionOrder: Number.MAX_SAFE_INTEGER, - endedAt: timing.endedAt, + sharedHandleExecutionCancelledConsole(addConsole, cancelRunningEntries, { + ...params, workflowId: params.workflowId, - blockId: 'cancelled', - executionId: params.executionId, - blockName: 'Execution Cancelled', - blockType: 'cancelled', }) }, - [addConsole, buildExecutionTiming] - ) - - /** - * Handles workflow-level execution errors for console output. - */ - const handleExecutionErrorConsole = useCallback( - (params: { - workflowId?: string - executionId?: string - error?: string - durationMs?: number - blockLogs: BlockLog[] - isPreExecutionError?: boolean - }) => { - if (params.workflowId) { - cancelRunningEntries(params.workflowId) - } - addExecutionErrorConsoleEntry(params) - }, - [addExecutionErrorConsoleEntry, cancelRunningEntries] - ) - - /** - * Handles workflow-level execution cancellations for console output. - */ - const handleExecutionCancelledConsole = useCallback( - (params: { workflowId?: string; executionId?: string; durationMs?: number }) => { - if (params.workflowId) { - cancelRunningEntries(params.workflowId) - } - addExecutionCancelledConsoleEntry(params) - }, - [addExecutionCancelledConsoleEntry, cancelRunningEntries] + [addConsole, cancelRunningEntries] ) const buildBlockEventHandlers = useCallback( @@ -1319,31 +1229,42 @@ export function useWorkflowExecution() { } else { if (!executor) { try { - let blockId = 'serialization' - let blockName = 'Workflow' - let blockType = 'serializer' - if (error instanceof WorkflowValidationError) { - blockId = error.blockId || blockId - blockName = error.blockName || blockName - blockType = error.blockType || blockType + const httpStatus = + isRecord(error) && typeof error.httpStatus === 'number' ? error.httpStatus : undefined + const storeAddConsole = useTerminalConsoleStore.getState().addConsole + + if (httpStatus && activeWorkflowId) { + addHttpErrorConsoleEntry(storeAddConsole, { + workflowId: activeWorkflowId, + executionId: options?.executionId, + error: normalizedMessage, + httpStatus, + }) + } else if (error instanceof WorkflowValidationError) { + storeAddConsole({ + input: {}, + output: {}, + success: false, + error: normalizedMessage, + durationMs: 0, + startedAt: new Date().toISOString(), + executionOrder: Number.MAX_SAFE_INTEGER, + endedAt: new Date().toISOString(), + workflowId: activeWorkflowId || '', + blockId: error.blockId || 'serialization', + executionId: options?.executionId, + blockName: error.blockName || 'Workflow', + blockType: error.blockType || 'serializer', + }) + } else { + sharedAddExecutionErrorConsoleEntry(storeAddConsole, { + workflowId: activeWorkflowId || '', + executionId: options?.executionId, + error: normalizedMessage, + blockLogs: [], + isPreExecutionError: true, + }) } - - // Use MAX_SAFE_INTEGER so execution errors appear at the end of the log - useTerminalConsoleStore.getState().addConsole({ - input: {}, - output: {}, - success: false, - error: normalizedMessage, - durationMs: 0, - startedAt: new Date().toISOString(), - executionOrder: Number.MAX_SAFE_INTEGER, - endedAt: new Date().toISOString(), - workflowId: activeWorkflowId || '', - blockId, - executionId: options?.executionId, - blockName, - blockType, - }) } catch {} } @@ -1681,8 +1602,8 @@ export function useWorkflowExecution() { accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, - consoleMode: 'add', - includeStartConsoleEntry: false, + consoleMode: 'update', + includeStartConsoleEntry: true, }) await executionStream.executeFromBlock({ diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index c0ca16cc9af..32d286ddf56 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -13,6 +13,7 @@ import type { StreamingExecution, } from '@/executor/types' import { stripCloneSuffixes } from '@/executor/utils/subflow-utils' +import { processSSEStream } from '@/hooks/use-execution-stream' const logger = createLogger('workflow-execution-utils') @@ -406,6 +407,161 @@ export function createBlockEventHandlers( return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted } } +type AddConsoleFn = (entry: Omit) => ConsoleEntry +type CancelRunningEntriesFn = (workflowId: string) => void + +export interface ExecutionTimingFields { + durationMs: number + startedAt: string + endedAt: string +} + +/** + * Builds timing fields for an execution-level console entry. + */ +export function buildExecutionTiming(durationMs?: number): ExecutionTimingFields { + const normalizedDuration = durationMs || 0 + return { + durationMs: normalizedDuration, + startedAt: new Date(Date.now() - normalizedDuration).toISOString(), + endedAt: new Date().toISOString(), + } +} + +export interface ExecutionErrorConsoleParams { + workflowId: string + executionId?: string + error?: string + durationMs?: number + blockLogs: BlockLog[] + isPreExecutionError?: boolean +} + +/** + * Adds an execution-level error entry to the console when no block-level error already covers it. + * Shared between direct user execution and mothership-initiated execution. + */ +export function addExecutionErrorConsoleEntry( + addConsole: AddConsoleFn, + params: ExecutionErrorConsoleParams +): void { + const hasBlockError = params.blockLogs.some((log) => log.error) + const isPreExecutionError = params.isPreExecutionError ?? false + if (!isPreExecutionError && hasBlockError) return + + const errorMessage = params.error || 'Execution failed' + const isTimeout = errorMessage.toLowerCase().includes('timed out') + const timing = buildExecutionTiming(params.durationMs) + + addConsole({ + input: {}, + output: {}, + success: false, + error: errorMessage, + durationMs: timing.durationMs, + startedAt: timing.startedAt, + executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER, + endedAt: timing.endedAt, + workflowId: params.workflowId, + blockId: isPreExecutionError ? 'validation' : isTimeout ? 'timeout-error' : 'execution-error', + executionId: params.executionId, + blockName: isPreExecutionError + ? 'Workflow Validation' + : isTimeout + ? 'Timeout Error' + : 'Execution Error', + blockType: isPreExecutionError ? 'validation' : 'error', + }) +} + +/** + * Cancels running entries and adds an execution-level error console entry. + */ +export function handleExecutionErrorConsole( + addConsole: AddConsoleFn, + cancelRunningEntries: CancelRunningEntriesFn, + params: ExecutionErrorConsoleParams +): void { + cancelRunningEntries(params.workflowId) + addExecutionErrorConsoleEntry(addConsole, params) +} + +export interface HttpErrorConsoleParams { + workflowId: string + executionId?: string + error: string + httpStatus: number +} + +/** + * Adds a console entry for HTTP-level execution errors (non-OK response before SSE streaming). + */ +export function addHttpErrorConsoleEntry( + addConsole: AddConsoleFn, + params: HttpErrorConsoleParams +): void { + const isValidationError = params.httpStatus >= 400 && params.httpStatus < 500 + const now = new Date().toISOString() + addConsole({ + input: {}, + output: {}, + success: false, + error: params.error, + durationMs: 0, + startedAt: now, + executionOrder: 0, + endedAt: now, + workflowId: params.workflowId, + blockId: isValidationError ? 'validation' : 'execution-error', + executionId: params.executionId, + blockName: isValidationError ? 'Workflow Validation' : 'Execution Error', + blockType: isValidationError ? 'validation' : 'error', + }) +} + +export interface CancelledConsoleParams { + workflowId: string + executionId?: string + durationMs?: number +} + +/** + * Adds a console entry for execution cancellation. + */ +export function addCancelledConsoleEntry( + addConsole: AddConsoleFn, + params: CancelledConsoleParams +): void { + const timing = buildExecutionTiming(params.durationMs) + addConsole({ + input: {}, + output: {}, + success: false, + error: 'Execution was cancelled', + durationMs: timing.durationMs, + startedAt: timing.startedAt, + executionOrder: Number.MAX_SAFE_INTEGER, + endedAt: timing.endedAt, + workflowId: params.workflowId, + blockId: 'cancelled', + executionId: params.executionId, + blockName: 'Execution Cancelled', + blockType: 'cancelled', + }) +} + +/** + * Cancels running entries and adds a cancelled console entry. + */ +export function handleExecutionCancelledConsole( + addConsole: AddConsoleFn, + cancelRunningEntries: CancelRunningEntriesFn, + params: CancelledConsoleParams +): void { + cancelRunningEntries(params.workflowId) + addCancelledConsoleEntry(addConsole, params) +} + export interface WorkflowExecutionOptions { workflowId?: string workflowInput?: any @@ -436,7 +592,7 @@ export async function executeWorkflowWithFullLogging( } const executionId = options.executionId || uuidv4() - const { addConsole, updateConsole } = useTerminalConsoleStore.getState() + const { addConsole, updateConsole, cancelRunningEntries } = useTerminalConsoleStore.getState() const { setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, setCurrentExecutionId } = useExecutionStore.getState() const wfId = targetWorkflowId @@ -445,6 +601,7 @@ export async function executeWorkflowWithFullLogging( const activeBlocksSet = new Set() const activeBlockRefCounts = new Map() const executionIdRef = { current: executionId } + const accumulatedBlockLogs: BlockLog[] = [] const blockHandlers = createBlockEventHandlers( { @@ -453,7 +610,7 @@ export async function executeWorkflowWithFullLogging( workflowEdges, activeBlocksSet, activeBlockRefCounts, - accumulatedBlockLogs: [], + accumulatedBlockLogs, accumulatedBlockStates: new Map(), executedBlockIds: new Set(), consoleMode: 'update', @@ -490,16 +647,26 @@ export async function executeWorkflowWithFullLogging( if (!response.ok) { const error = await response.json() - throw new Error(error.error || 'Workflow execution failed') + const errorMessage = error.error || 'Workflow execution failed' + addHttpErrorConsoleEntry(addConsole, { + workflowId: wfId, + executionId, + error: errorMessage, + httpStatus: response.status, + }) + throw new Error(errorMessage) } if (!response.body) { throw new Error('No response body') } - const reader = response.body.getReader() - const decoder = new TextDecoder() - let buffer = '' + const serverExecutionId = response.headers.get('X-Execution-Id') + if (serverExecutionId) { + executionIdRef.current = serverExecutionId + setCurrentExecutionId(wfId, serverExecutionId) + } + let executionResult: ExecutionResult = { success: false, output: {}, @@ -507,89 +674,67 @@ export async function executeWorkflowWithFullLogging( } try { - while (true) { - const { done, value } = await reader.read() - if (done) break - - buffer += decoder.decode(value, { stream: true }) - const lines = buffer.split('\n\n') - buffer = lines.pop() || '' - - for (const line of lines) { - if (!line.trim() || !line.startsWith('data: ')) continue - - const data = line.substring(6).trim() - if (data === '[DONE]') continue - - let event: any - try { - event = JSON.parse(data) - } catch { - continue - } - - switch (event.type) { - case 'execution:started': { - setCurrentExecutionId(wfId, event.executionId) - executionIdRef.current = event.executionId || executionId - break + await processSSEStream( + response.body.getReader(), + { + onExecutionStarted: (data) => { + logger.info('Execution started', { startTime: data.startTime }) + }, + + onBlockStarted: blockHandlers.onBlockStarted, + onBlockCompleted: blockHandlers.onBlockCompleted, + onBlockError: blockHandlers.onBlockError, + onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted, + + onExecutionCompleted: (data) => { + setCurrentExecutionId(wfId, null) + executionResult = { + success: data.success, + output: data.output, + logs: accumulatedBlockLogs, + metadata: { + duration: data.duration, + startTime: data.startTime, + endTime: data.endTime, + }, + } + }, + + onExecutionCancelled: () => { + setCurrentExecutionId(wfId, null) + executionResult = { + success: false, + output: {}, + error: 'Execution was cancelled', + logs: accumulatedBlockLogs, + } + }, + + onExecutionError: (data) => { + setCurrentExecutionId(wfId, null) + const errorMessage = data.error || 'Execution failed' + executionResult = { + success: false, + output: {}, + error: errorMessage, + logs: accumulatedBlockLogs, + metadata: { duration: data.duration }, } - case 'block:started': - blockHandlers.onBlockStarted(event.data) - break - - case 'block:completed': - blockHandlers.onBlockCompleted(event.data) - break - - case 'block:error': - blockHandlers.onBlockError(event.data) - break - - case 'block:childWorkflowStarted': - blockHandlers.onBlockChildWorkflowStarted(event.data) - break - - case 'execution:completed': - setCurrentExecutionId(wfId, null) - executionResult = { - success: event.data.success, - output: event.data.output, - logs: [], - metadata: { - duration: event.data.duration, - startTime: event.data.startTime, - endTime: event.data.endTime, - }, - } - break - - case 'execution:cancelled': - setCurrentExecutionId(wfId, null) - executionResult = { - success: false, - output: {}, - error: 'Execution was cancelled', - logs: [], - } - break - - case 'execution:error': - setCurrentExecutionId(wfId, null) - executionResult = { - success: false, - output: {}, - error: event.data.error || 'Execution failed', - logs: [], - } - break - } - } - } + handleExecutionErrorConsole(addConsole, cancelRunningEntries, { + workflowId: wfId, + executionId: executionIdRef.current, + error: errorMessage, + durationMs: data.duration || 0, + blockLogs: accumulatedBlockLogs, + isPreExecutionError: accumulatedBlockLogs.length === 0, + }) + }, + }, + 'CopilotExecution' + ) } finally { setCurrentExecutionId(wfId, null) - reader.releaseLock() setActiveBlocks(wfId, new Set()) } diff --git a/apps/sim/hooks/use-execution-stream.ts b/apps/sim/hooks/use-execution-stream.ts index 12a7dc8cabf..36fd801db63 100644 --- a/apps/sim/hooks/use-execution-stream.ts +++ b/apps/sim/hooks/use-execution-stream.ts @@ -31,8 +31,9 @@ function isClientDisconnectError(error: any): boolean { /** * Processes SSE events from a response body and invokes appropriate callbacks. + * Exported for use by standalone (non-hook) execution paths like executeWorkflowWithFullLogging. */ -async function processSSEStream( +export async function processSSEStream( reader: ReadableStreamDefaultReader, callbacks: ExecutionStreamCallbacks, logPrefix: string @@ -198,6 +199,7 @@ export function useExecutionStream() { if (errorResponse && typeof errorResponse === 'object') { Object.assign(error, { executionResult: errorResponse }) } + Object.assign(error, { httpStatus: response.status }) throw error } @@ -267,12 +269,15 @@ export function useExecutionStream() { try { errorResponse = await response.json() } catch { - throw new Error(`Server error (${response.status}): ${response.statusText}`) + const error = new Error(`Server error (${response.status}): ${response.statusText}`) + Object.assign(error, { httpStatus: response.status }) + throw error } const error = new Error(errorResponse.error || 'Failed to start execution') if (errorResponse && typeof errorResponse === 'object') { Object.assign(error, { executionResult: errorResponse }) } + Object.assign(error, { httpStatus: response.status }) throw error }