[codex] Fix agentflow resume through converging condition paths#6489
[codex] Fix agentflow resume through converging condition paths#6489cat0825 wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the agentflow execution logic to track and propagate skipped edges and inputs rather than just ignoring nodes, ensuring downstream merge nodes execute correctly when converging conditional paths are used (e.g., after resuming from a human-input node). It also adds corresponding tests and mock fixtures. The review feedback identifies a critical issue in the newly introduced propagateSkippedInput function, where workflows with cyclic paths (loops) could trigger infinite recursion and cause a stack overflow crash, and provides a solution using a visited set to track the recursion path.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| function propagateSkippedInput({ | ||
| sourceId, | ||
| targetId, | ||
| graph, | ||
| nodes, | ||
| edges, | ||
| nodeExecutionQueue, | ||
| waitingNodes | ||
| }: { | ||
| sourceId: string | ||
| targetId: string | ||
| graph: Record<string, string[]> | ||
| nodes: IReactFlowNode[] | ||
| edges: IReactFlowEdge[] | ||
| nodeExecutionQueue: INodeQueue[] | ||
| waitingNodes: Map<string, IWaitingNode> | ||
| }) { | ||
| let waitingNode = waitingNodes.get(targetId) | ||
| if (!waitingNode) { | ||
| waitingNode = setupNodeDependencies(targetId, edges, nodes) | ||
| waitingNodes.set(targetId, waitingNode) | ||
| } | ||
|
|
||
| if (!waitingNode.receivedInputs.has(sourceId)) { | ||
| waitingNode.receivedInputs.set(sourceId, null) | ||
| } | ||
|
|
||
| if (!hasReceivedRequiredInputs(waitingNode)) return | ||
|
|
||
| waitingNodes.delete(targetId) | ||
| const combinedInputs = combineNodeInputs(waitingNode.receivedInputs) | ||
|
|
||
| if (combinedInputs === null) { | ||
| for (const downstreamId of graph[targetId] || []) { | ||
| propagateSkippedInput({ | ||
| sourceId: targetId, | ||
| targetId: downstreamId, | ||
| graph, | ||
| nodes, | ||
| edges, | ||
| nodeExecutionQueue, | ||
| waitingNodes | ||
| }) | ||
| } | ||
| return | ||
| } | ||
|
|
||
| if (!nodeExecutionQueue.some((queuedNode) => queuedNode.nodeId === targetId)) { | ||
| nodeExecutionQueue.push({ | ||
| nodeId: targetId, | ||
| data: combinedInputs, | ||
| inputs: Object.fromEntries(waitingNode.receivedInputs) | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
In workflows with cyclic paths (loops), recursive propagation of skipped inputs can lead to infinite recursion and a Maximum call stack size exceeded crash. To prevent this, track the active recursion path using a visited set (acting as a recursion stack) and return early if a cycle is detected.
function propagateSkippedInput({
sourceId,
targetId,
graph,
nodes,
edges,
nodeExecutionQueue,
waitingNodes,
visited = new Set<string>()
}: {
sourceId: string
targetId: string
graph: Record<string, string[]>
nodes: IReactFlowNode[]
edges: IReactFlowEdge[]
nodeExecutionQueue: INodeQueue[]
waitingNodes: Map<string, IWaitingNode>
visited?: Set<string>
}) {
if (visited.has(targetId)) return
visited.add(targetId)
let waitingNode = waitingNodes.get(targetId)
if (!waitingNode) {
waitingNode = setupNodeDependencies(targetId, edges, nodes)
waitingNodes.set(targetId, waitingNode)
}
if (!waitingNode.receivedInputs.has(sourceId)) {
waitingNode.receivedInputs.set(sourceId, null)
}
if (!hasReceivedRequiredInputs(waitingNode)) {
visited.delete(targetId)
return
}
waitingNodes.delete(targetId)
const combinedInputs = combineNodeInputs(waitingNode.receivedInputs)
if (combinedInputs === null) {
for (const downstreamId of graph[targetId] || []) {
propagateSkippedInput({
sourceId: targetId,
targetId: downstreamId,
graph,
nodes,
edges,
nodeExecutionQueue,
waitingNodes,
visited
})
}
visited.delete(targetId)
return
}
if (!nodeExecutionQueue.some((queuedNode) => queuedNode.nodeId === targetId)) {
nodeExecutionQueue.push({
nodeId: targetId,
data: combinedInputs,
inputs: Object.fromEntries(waitingNode.receivedInputs)
})
}
visited.delete(targetId)
}
Summary
nullso merge nodes do not wait forever for branches that were not selected.Equalused by workspace search options.Closes #5501
Root cause
Previously, unfulfilled condition outputs skipped target nodes by node id. In flows where an unselected branch and a selected human-input branch converge into the same downstream node, the downstream node kept waiting for the unselected branch input after resume, so execution stopped before the merge node could run.
Validation
PATH=/Users/qianyuhe/.cache/codex-runtimes/codex-primary-runtime/dependencies/node/bin:$PATH corepack pnpm@10.26.0 --dir packages/server exec jest src/utils/buildAgentflow.test.ts --runInBandPATH=/Users/qianyuhe/.cache/codex-runtimes/codex-primary-runtime/dependencies/node/bin:$PATH corepack pnpm@10.26.0 --dir packages/server exec jest src/utils/sanitizeFlowData.test.ts --runInBandPATH=/Users/qianyuhe/.cache/codex-runtimes/codex-primary-runtime/dependencies/node/bin:$PATH corepack pnpm@10.26.0 exec prettier --check packages/server/src/utils/buildAgentflow.ts packages/server/src/utils/buildAgentflow.test.ts packages/server/src/utils/__fixtures__/agentflowTestNode.js packages/server/__mocks__/typeorm.tsPATH=/Users/qianyuhe/.cache/codex-runtimes/codex-primary-runtime/dependencies/node/bin:$PATH corepack pnpm@10.26.0 --dir packages/server build