diff --git a/packages/server/__mocks__/typeorm.ts b/packages/server/__mocks__/typeorm.ts index ffcdfa36558..0548b3a44eb 100644 --- a/packages/server/__mocks__/typeorm.ts +++ b/packages/server/__mocks__/typeorm.ts @@ -29,6 +29,7 @@ module.exports = { JoinColumn: decorator, Unique: decorator, DataSource: jest.fn(), + Equal: findOperator('equal'), In: findOperator('in'), Between: findOperator('between'), MoreThanOrEqual: findOperator('moreThanOrEqual'), diff --git a/packages/server/src/utils/__fixtures__/agentflowTestNode.js b/packages/server/src/utils/__fixtures__/agentflowTestNode.js new file mode 100644 index 00000000000..1496be785f4 --- /dev/null +++ b/packages/server/src/utils/__fixtures__/agentflowTestNode.js @@ -0,0 +1,16 @@ +class TestAgentflowNode { + async run(nodeData, finalInput) { + const output = nodeData.inputs?.testOutput ?? { content: nodeData.label } + return { + id: nodeData.id, + name: nodeData.name, + input: { + content: finalInput ?? '' + }, + output, + state: nodeData.inputs?.testState ?? {} + } + } +} + +module.exports = { nodeClass: TestAgentflowNode } diff --git a/packages/server/src/utils/buildAgentflow.test.ts b/packages/server/src/utils/buildAgentflow.test.ts new file mode 100644 index 00000000000..37c0b8da985 --- /dev/null +++ b/packages/server/src/utils/buildAgentflow.test.ts @@ -0,0 +1,205 @@ +import path from 'path' +import { executeAgentFlow } from './buildAgentflow' + +type StoredEntity = Record + +const WORKSPACE_ID = 'workspace-1' +const SESSION_ID = 'session-1' +const CHAT_ID = 'chat-1' + +class MemoryRepository { + private rows: StoredEntity[] = [] + private nextId = 1 + + constructor(initialRows: StoredEntity[] = []) { + this.rows = initialRows + } + + create(entity: StoredEntity) { + return { ...entity } + } + + merge(target: StoredEntity, source: StoredEntity) { + Object.assign(target, source) + return target + } + + async save(entity: StoredEntity) { + if (!entity.id) entity.id = `entity-${this.nextId++}` + const existingIndex = this.rows.findIndex((row) => row.id === entity.id) + if (existingIndex >= 0) { + this.rows[existingIndex] = { ...this.rows[existingIndex], ...entity } + return this.rows[existingIndex] + } + const row = { ...entity, createdDate: entity.createdDate ?? new Date(this.nextId) } + this.rows.push(row) + return row + } + + async find(options?: { where?: Record; order?: Record }) { + let result = [...this.rows] + if (options?.where) { + result = result.filter((row) => + Object.entries(options.where ?? {}).every(([key, value]) => value === undefined || row[key] === value) + ) + } + if (options?.order?.createdDate === 'DESC') { + result.sort((a, b) => Number(b.createdDate ?? 0) - Number(a.createdDate ?? 0)) + } + return result + } + + async findBy() { + return [...this.rows] + } + + async findOneBy(where: Record) { + return this.rows.find((row) => Object.entries(where).every(([key, value]) => row[key] === value)) + } + + async findOne(options: { where?: Record }) { + if (!options.where) return this.rows[0] + return this.findOneBy(options.where) + } + + createQueryBuilder() { + return { + where: jest.fn().mockReturnThis(), + orWhere: jest.fn().mockReturnThis(), + orderBy: jest.fn().mockReturnThis(), + getMany: jest.fn().mockResolvedValue([]) + } + } +} + +const makeDataSource = () => { + const executionRepository = new MemoryRepository() + const chatMessageRepository = new MemoryRepository() + const variableRepository = new MemoryRepository() + + return { + executionRepository, + getRepository: (entity: { name?: string }) => { + if (entity.name === 'Execution') return executionRepository + if (entity.name === 'ChatMessage') return chatMessageRepository + if (entity.name === 'Variable') return variableRepository + throw new Error(`Unexpected repository: ${entity.name}`) + } + } +} + +const makeNode = (id: string, name: string, label: string, testOutput?: Record) => ({ + id, + type: 'agentflowNode', + position: { x: 0, y: 0 }, + data: { + id, + name, + label, + inputs: { + startInputType: name === 'startAgentflow' ? 'chatInput' : undefined, + testOutput + }, + inputParams: [] + } +}) + +const makeEdge = (source: string, target: string, outputIndex = 0) => ({ + id: `${source}-${target}`, + source, + target, + sourceHandle: `${source}-output-${outputIndex}`, + targetHandle: `${target}-input-0` +}) + +describe('executeAgentFlow converging conditional paths', () => { + const fixturePath = path.join(__dirname, '__fixtures__', 'agentflowTestNode.js') + const componentNodes = { + startAgentflow: { filePath: fixturePath }, + conditionAgentflow: { filePath: fixturePath }, + humanInputAgentflow: { filePath: fixturePath }, + llmAgentflow: { filePath: fixturePath } + } as any + + const baseRuntimeParams = { + componentNodes, + chatId: CHAT_ID, + evaluationRunId: undefined, + telemetry: { sendTelemetry: jest.fn().mockResolvedValue(undefined) } as any, + usageCacheManager: {} as any, + cachePool: {} as any, + sseStreamer: undefined, + baseURL: '', + isInternal: false, + orgId: 'org-1', + workspaceId: WORKSPACE_ID, + subscriptionId: 'sub-1', + productId: 'product-1' + } + + it('continues to a downstream merge node after resuming a human-input branch', async () => { + const dataSource = makeDataSource() + const conditionId = 'conditionAgentflow_0' + const humanId = 'humanInputAgentflow_0' + const mergeId = 'llmAgentflow_merge' + + const nodes = [ + makeNode('startAgentflow_0', 'startAgentflow', 'Start', { content: 'start' }), + makeNode(conditionId, 'conditionAgentflow', 'Condition', { + conditions: [ + { type: 'string', value1: 'x', operation: 'notEmpty', isFulfilled: true }, + { type: 'string', value1: '', operation: 'isEmpty', isFulfilled: false } + ], + content: 'condition selected human branch' + }), + makeNode(humanId, 'humanInputAgentflow', 'Human Input', { + conditions: [{ type: 'approve', isFulfilled: true }], + content: 'human approved' + }), + makeNode(mergeId, 'llmAgentflow', 'Merge Node', { content: 'merge executed' }) + ] + const edges = [ + makeEdge('startAgentflow_0', conditionId), + makeEdge(conditionId, humanId, 0), + makeEdge(conditionId, mergeId, 1), + makeEdge(humanId, mergeId, 0) + ] + const chatflow = { + id: 'flow-1', + name: 'Converging human flow', + flowData: JSON.stringify({ nodes, edges, viewport: { x: 0, y: 0, zoom: 1 } }), + workspaceId: WORKSPACE_ID + } as any + + const firstRun = await executeAgentFlow({ + ...baseRuntimeParams, + appDataSource: dataSource as any, + chatflow, + incomingInput: { + question: 'start', + overrideConfig: { sessionId: SESSION_ID } + } + } as any) + + expect(firstRun.agentFlowExecutedData.map((data: any) => data.nodeId)).toEqual(['startAgentflow_0', conditionId, humanId]) + expect(firstRun.agentFlowExecutedData.at(-1).status).toBe('STOPPED') + + const resumedRun = await executeAgentFlow({ + ...baseRuntimeParams, + appDataSource: dataSource as any, + chatflow, + incomingInput: { + question: 'resume', + overrideConfig: { sessionId: SESSION_ID }, + humanInput: { + startNodeId: humanId, + type: 'approve', + feedback: 'approved' + } + } + } as any) + + expect(resumedRun.agentFlowExecutedData.map((data: any) => data.nodeId)).toContain(mergeId) + expect(resumedRun.text).toBe('merge executed') + }) +}) diff --git a/packages/server/src/utils/buildAgentflow.ts b/packages/server/src/utils/buildAgentflow.ts index e38ccbec092..9cdc7b68dd7 100644 --- a/packages/server/src/utils/buildAgentflow.ts +++ b/packages/server/src/utils/buildAgentflow.ts @@ -789,20 +789,15 @@ function hasReceivedRequiredInputs(waitingNode: IWaitingNode): boolean { } /** - * Determines which nodes should be ignored based on condition results - * @param currentNode - The node being processed - * @param result - The execution result from the node - * @param edges - All edges in the workflow - * @param nodeId - Current node ID - * @returns Array of node IDs that should be ignored + * Determines which decision output edges were not selected. */ -async function determineNodesToIgnore( +async function determineEdgesToIgnore( currentNode: IReactFlowNode, result: any, edges: IReactFlowEdge[], nodeId: string -): Promise { - const ignoreNodeIds: string[] = [] +): Promise { + const ignoredEdges: IReactFlowEdge[] = [] // Check if this is a decision node const isDecisionNode = @@ -833,12 +828,96 @@ async function determineNodesToIgnore( const ignoreEdge = edges.find((edge) => edge.source === nodeId && edge.sourceHandle === `${nodeId}-output-${index}`) if (ignoreEdge) { - ignoreNodeIds.push(ignoreEdge.target) + ignoredEdges.push(ignoreEdge) } } } - return ignoreNodeIds + return ignoredEdges +} + +function propagateSkippedInput({ + sourceId, + targetId, + graph, + nodes, + edges, + nodeExecutionQueue, + waitingNodes +}: { + sourceId: string + targetId: string + graph: Record + nodes: IReactFlowNode[] + edges: IReactFlowEdge[] + nodeExecutionQueue: INodeQueue[] + waitingNodes: Map +}) { + let waitingNode = waitingNodes.get(targetId) + if (!waitingNode) { + waitingNode = setupNodeDependencies(targetId, edges, nodes) + waitingNodes.set(targetId, waitingNode) + } + + if (!waitingNode.receivedInputs.has(sourceId)) { + waitingNode.receivedInputs.set(sourceId, null) + } + + if (!hasReceivedRequiredInputs(waitingNode)) return + + waitingNodes.delete(targetId) + const combinedInputs = combineNodeInputs(waitingNode.receivedInputs) + + if (combinedInputs === null) { + for (const downstreamId of graph[targetId] || []) { + propagateSkippedInput({ + sourceId: targetId, + targetId: downstreamId, + graph, + nodes, + edges, + nodeExecutionQueue, + waitingNodes + }) + } + return + } + + if (!nodeExecutionQueue.some((queuedNode) => queuedNode.nodeId === targetId)) { + nodeExecutionQueue.push({ + nodeId: targetId, + data: combinedInputs, + inputs: Object.fromEntries(waitingNode.receivedInputs) + }) + } +} + +function propagateSkippedEdges({ + ignoredEdges, + graph, + nodes, + edges, + nodeExecutionQueue, + waitingNodes +}: { + ignoredEdges: IReactFlowEdge[] + graph: Record + nodes: IReactFlowNode[] + edges: IReactFlowEdge[] + nodeExecutionQueue: INodeQueue[] + waitingNodes: Map +}) { + for (const ignoredEdge of ignoredEdges) { + propagateSkippedInput({ + sourceId: ignoredEdge.source, + targetId: ignoredEdge.target, + graph, + nodes, + edges, + nodeExecutionQueue, + waitingNodes + }) + } } /** @@ -868,14 +947,23 @@ async function processNodeOutputs({ const currentNode = nodes.find((n) => n.id === nodeId) if (!currentNode) return { humanInput: updatedHumanInput } - // Get nodes to ignore based on conditions - const ignoreNodeIds = await determineNodesToIgnore(currentNode, result, edges, nodeId) - if (ignoreNodeIds.length) { - logger.debug(` ⏭️ Skipping nodes: [${ignoreNodeIds.join(', ')}]`) + // Get edges to ignore based on conditions + const ignoredEdges = await determineEdgesToIgnore(currentNode, result, edges, nodeId) + const ignoredChildIds = new Set(ignoredEdges.map((edge) => edge.target)) + if (ignoredEdges.length) { + logger.debug(` ⏭️ Skipping edges: [${ignoredEdges.map((edge) => `${edge.source}->${edge.target}`).join(', ')}]`) + propagateSkippedEdges({ + ignoredEdges, + graph, + nodes, + edges, + nodeExecutionQueue, + waitingNodes + }) } for (const childId of childNodeIds) { - if (ignoreNodeIds.includes(childId)) continue + if (ignoredChildIds.has(childId)) continue const childNode = nodes.find((n) => n.id === childId) if (!childNode) continue @@ -1871,6 +1959,25 @@ export const executeAgentFlow = async ({ // Update humanInput with the resolved startNodeId humanInput.startNodeId = startNodeId + + for (const execData of executionData) { + if (execData.status !== 'FINISHED') continue + + const executedNode = nodes.find((node) => node.id === execData.nodeId) + if (!executedNode) continue + + const ignoredEdges = await determineEdgesToIgnore(executedNode, execData.data, edges, execData.nodeId) + if (!ignoredEdges.length) continue + + propagateSkippedEdges({ + ignoredEdges, + graph, + nodes, + edges, + nodeExecutionQueue, + waitingNodes + }) + } } else if (isRecursive && parentExecutionId) { const { startingNodeIds: startingNodeIdsFromFlow } = getStartingNode(nodeDependencies) startingNodeIds.push(...startingNodeIdsFromFlow)