From 1666e026df8a064d9598d47c2aeaccf25a0292dc Mon Sep 17 00:00:00 2001 From: Brian Fox <878612+onematchfox@users.noreply.github.com> Date: Thu, 21 May 2026 14:21:25 +0200 Subject: [PATCH 1/2] feat(ui): add guard to prevent out-of-sync sends when working cross-tabs Before sending a new message, check the latest session state from the DB. If another tab is still streaming, reconnect to that stream instead of sending. If another tab completed a turn that this tab hasn't loaded, reload the messages and prompt the user to review before sending. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> --- ui/src/components/chat/ChatInterface.tsx | 30 ++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/ui/src/components/chat/ChatInterface.tsx b/ui/src/components/chat/ChatInterface.tsx index 778c50163..206b76ff5 100644 --- a/ui/src/components/chat/ChatInterface.tsx +++ b/ui/src/components/chat/ChatInterface.tsx @@ -31,6 +31,8 @@ import { Message, DataPart, Task, TaskState } from "@a2a-js/sdk"; // Task states where the agent is actively processing — resubscribe to live stream. const RESUBSCRIBE_TASK_STATES: TaskState[] = ["submitted", "working"]; +// Task states that mean the session is busy (used by the cross-tab send guard). +const ACTIVE_TASK_STATES: TaskState[] = ["submitted", "working", "input-required"]; interface ChatInterfaceProps { selectedAgentName: string; @@ -213,6 +215,34 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se const userMessageText = currentInputMessage; + // Cross-tab guard: fetch the latest session state before mutating anything. + // Two cases: (1) another tab is still streaming — reconnect instead of sending; + // (2) another tab completed a turn we haven't loaded — reload so the user sees + // the full context before their next message goes out. + const guardSessionId = session?.id || sessionId; + if (guardSessionId) { + const tasksCheck = await getSessionTasks(guardSessionId); + if (tasksCheck.data) { + const inFlightTask = tasksCheck.data.findLast( + task => ACTIVE_TASK_STATES.includes(task.status?.state as TaskState) + ); + if (inFlightTask) { + toast.info("This session is already being processed — reconnecting to live updates"); + setChatStatus(mapA2AStateToStatus(inFlightTask.status?.state as TaskState)); + await streamResubscribedTask(inFlightTask.id); + return; + } + + const dbMessages = extractMessagesFromTasks(tasksCheck.data); + if (dbMessages.length > storedMessages.length) { + setStoredMessages(dbMessages); + setSessionStats(extractTokenStatsFromTasks(tasksCheck.data)); + toast.info("New messages loaded — please review before sending"); + return; + } + } + } + setCurrentInputMessage(""); setChatStatus("thinking"); setStoredMessages(prev => [...prev, ...streamingMessages]); From b0f05308e6419339c2cfa2f4ad52dc6caae5b6c5 Mon Sep 17 00:00:00 2001 From: Brian Fox <878612+onematchfox@users.noreply.github.com> Date: Fri, 22 May 2026 15:39:26 +0200 Subject: [PATCH 2/2] fix(ui): address PR review comments on cross-tab guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - input-required tasks in the send guard now call reloadSessionFromDB() instead of streamResubscribedTask() — there is no live stream to reconnect to when the session is awaiting HITL input - newer-messages path now uses reloadSessionFromDB() (handles approval messages correctly) and compares only non-approval storedMessages to avoid false negatives when ToolApprovalRequest / AskUserRequest entries are appended to storedMessages - streamResubscribedTask finally block uses functional setChatStatus so it cannot override input_required set by reloadSessionFromDB() - apply cross-tab guard to sendApprovalDecision and handleAskUserSubmit so tool confirmation buttons and ask_user responses are also protected against double-submission from another tab Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> --- ui/src/components/chat/ChatInterface.tsx | 87 ++++++++++++++++++++---- 1 file changed, 72 insertions(+), 15 deletions(-) diff --git a/ui/src/components/chat/ChatInterface.tsx b/ui/src/components/chat/ChatInterface.tsx index 206b76ff5..c2c6a71f1 100644 --- a/ui/src/components/chat/ChatInterface.tsx +++ b/ui/src/components/chat/ChatInterface.tsx @@ -227,16 +227,27 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se task => ACTIVE_TASK_STATES.includes(task.status?.state as TaskState) ); if (inFlightTask) { - toast.info("This session is already being processed — reconnecting to live updates"); - setChatStatus(mapA2AStateToStatus(inFlightTask.status?.state as TaskState)); - await streamResubscribedTask(inFlightTask.id); + if ((inFlightTask.status?.state as TaskState) === "input-required") { + // Another tab surfaced a pending approval — reload to show the HITL UI. + await reloadSessionFromDB(); + toast.info("Session is awaiting your input — please review before sending"); + } else { + toast.info("This session is already being processed — reconnecting to live updates"); + setChatStatus(mapA2AStateToStatus(inFlightTask.status?.state as TaskState)); + await streamResubscribedTask(inFlightTask.id); + } return; } + // Compare only non-approval messages to avoid false negatives when + // storedMessages includes appended ToolApprovalRequest / AskUserRequest entries. const dbMessages = extractMessagesFromTasks(tasksCheck.data); - if (dbMessages.length > storedMessages.length) { - setStoredMessages(dbMessages); - setSessionStats(extractTokenStatsFromTasks(tasksCheck.data)); + const localMessageCount = storedMessages.filter(m => { + const meta = m.metadata as ADKMetadata | undefined; + return meta?.originalType !== "ToolApprovalRequest" && meta?.originalType !== "AskUserRequest"; + }).length; + if (dbMessages.length > localMessageCount) { + await reloadSessionFromDB(); toast.info("New messages loaded — please review before sending"); return; } @@ -503,7 +514,8 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se } } finally { abortControllerRef.current = null; - setChatStatus("ready"); + // Don't override input_required that reloadSessionFromDB() may have set. + setChatStatus(prev => prev === "input_required" ? prev : "ready"); setIsStreaming(false); setStreamingContent(""); } @@ -548,13 +560,35 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se displayText: string, ) => { const currentSessionId = session?.id || sessionId; - setChatStatus("thinking"); - setStreamingContent(""); - // Find the taskId from the pending approval message so the A2A framework - // reuses the existing task instead of creating a new one. + // Find the taskId first so the guard can verify the task is still input-required. const { taskId: approvalTaskId } = getPendingApprovalToolIds(); + // Cross-tab guard: another tab may have already submitted this approval. + if (currentSessionId && approvalTaskId) { + const tasksCheck = await getSessionTasks(currentSessionId); + if (tasksCheck.data) { + const approvalTask = tasksCheck.data.findLast(task => task.id === approvalTaskId); + if ((approvalTask?.status?.state as TaskState | undefined) !== "input-required") { + const inFlightTask = tasksCheck.data.findLast( + task => RESUBSCRIBE_TASK_STATES.includes(task.status?.state as TaskState) + ); + if (inFlightTask) { + toast.info("Another tab already responded — reconnecting to live updates"); + setChatStatus(mapA2AStateToStatus(inFlightTask.status?.state as TaskState)); + await streamResubscribedTask(inFlightTask.id); + } else { + await reloadSessionFromDB(); + toast.info("Session state changed — please review"); + } + return; + } + } + } + + setChatStatus("thinking"); + setStreamingContent(""); + // Stamp approvalDecision on the current pending approval messages so they // are excluded from getPendingApprovalToolIds on future HITL cycles. // approvalDecision is either a uniform ToolDecision or a per-tool map @@ -687,10 +721,8 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se * Handle ask_user answers submitted by the user. Sends an "approve" decision * with the answers payload attached, routed to the pending ask_user task. */ - const handleAskUserSubmit = (answers: Array<{ answer: string[] }>) => { + const handleAskUserSubmit = async (answers: Array<{ answer: string[] }>) => { const currentSessionId = session?.id || sessionId; - setChatStatus("thinking"); - setStreamingContent(""); // Find the taskId from the pending AskUserRequest message let askUserTaskId: string | undefined; @@ -703,6 +735,31 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se } } + // Cross-tab guard: another tab may have already answered this question. + if (currentSessionId && askUserTaskId) { + const tasksCheck = await getSessionTasks(currentSessionId); + if (tasksCheck.data) { + const askTask = tasksCheck.data.findLast(task => task.id === askUserTaskId); + if ((askTask?.status?.state as TaskState | undefined) !== "input-required") { + const inFlightTask = tasksCheck.data.findLast( + task => RESUBSCRIBE_TASK_STATES.includes(task.status?.state as TaskState) + ); + if (inFlightTask) { + toast.info("Another tab already responded — reconnecting to live updates"); + setChatStatus(mapA2AStateToStatus(inFlightTask.status?.state as TaskState)); + await streamResubscribedTask(inFlightTask.id); + } else { + await reloadSessionFromDB(); + toast.info("Session state changed — please review"); + } + return; + } + } + } + + setChatStatus("thinking"); + setStreamingContent(""); + // Stamp the ask-user message as resolved so we don't show the form again const stampAskUser = (msgs: Message[]) => msgs.map(m => { const meta = m.metadata as Record | undefined; @@ -732,7 +789,7 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se metadata: { timestamp: Date.now() }, }; - streamA2AMessage(a2aMessage, { + await streamA2AMessage(a2aMessage, { errorLabel: "Ask user response failed", sessionIdForWait: currentSessionId, onFinally: () => {