Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 96 additions & 9 deletions ui/src/components/chat/ChatInterface.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import { Message, DataPart, Task, TaskState } from "@a2a-js/sdk";

// Task states where the agent is actively processing — resubscribe to live stream.
const RESUBSCRIBE_TASK_STATES: TaskState[] = ["submitted", "working"];
// Task states that mean the session is busy (used by the cross-tab send guard).
const ACTIVE_TASK_STATES: TaskState[] = ["submitted", "working", "input-required"];
Comment thread
onematchfox marked this conversation as resolved.

interface ChatInterfaceProps {
selectedAgentName: string;
Expand Down Expand Up @@ -213,6 +215,45 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se

const userMessageText = currentInputMessage;

// Cross-tab guard: fetch the latest session state before mutating anything.
// Two cases: (1) another tab is still streaming — reconnect instead of sending;
// (2) another tab completed a turn we haven't loaded — reload so the user sees
// the full context before their next message goes out.
const guardSessionId = session?.id || sessionId;
if (guardSessionId) {
const tasksCheck = await getSessionTasks(guardSessionId);
if (tasksCheck.data) {
const inFlightTask = tasksCheck.data.findLast(
task => ACTIVE_TASK_STATES.includes(task.status?.state as TaskState)
);
Comment on lines +226 to +228
if (inFlightTask) {
if ((inFlightTask.status?.state as TaskState) === "input-required") {
// Another tab surfaced a pending approval — reload to show the HITL UI.
await reloadSessionFromDB();
toast.info("Session is awaiting your input — please review before sending");
} else {
toast.info("This session is already being processed — reconnecting to live updates");
setChatStatus(mapA2AStateToStatus(inFlightTask.status?.state as TaskState));
await streamResubscribedTask(inFlightTask.id);
}
return;
}

// Compare only non-approval messages to avoid false negatives when
// storedMessages includes appended ToolApprovalRequest / AskUserRequest entries.
const dbMessages = extractMessagesFromTasks(tasksCheck.data);
const localMessageCount = storedMessages.filter(m => {
const meta = m.metadata as ADKMetadata | undefined;
return meta?.originalType !== "ToolApprovalRequest" && meta?.originalType !== "AskUserRequest";
}).length;
if (dbMessages.length > localMessageCount) {
await reloadSessionFromDB();
toast.info("New messages loaded — please review before sending");
return;
}
Comment on lines +224 to +253
}
}

setCurrentInputMessage("");
setChatStatus("thinking");
setStoredMessages(prev => [...prev, ...streamingMessages]);
Expand Down Expand Up @@ -473,7 +514,8 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se
}
} finally {
abortControllerRef.current = null;
setChatStatus("ready");
// Don't override input_required that reloadSessionFromDB() may have set.
setChatStatus(prev => prev === "input_required" ? prev : "ready");
setIsStreaming(false);
setStreamingContent("");
}
Expand Down Expand Up @@ -518,13 +560,35 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se
displayText: string,
) => {
const currentSessionId = session?.id || sessionId;
setChatStatus("thinking");
setStreamingContent("");

// Find the taskId from the pending approval message so the A2A framework
// reuses the existing task instead of creating a new one.
// Find the taskId first so the guard can verify the task is still input-required.
const { taskId: approvalTaskId } = getPendingApprovalToolIds();

// Cross-tab guard: another tab may have already submitted this approval.
if (currentSessionId && approvalTaskId) {
const tasksCheck = await getSessionTasks(currentSessionId);
if (tasksCheck.data) {
const approvalTask = tasksCheck.data.findLast(task => task.id === approvalTaskId);
if ((approvalTask?.status?.state as TaskState | undefined) !== "input-required") {
const inFlightTask = tasksCheck.data.findLast(
task => RESUBSCRIBE_TASK_STATES.includes(task.status?.state as TaskState)
);
if (inFlightTask) {
toast.info("Another tab already responded — reconnecting to live updates");
setChatStatus(mapA2AStateToStatus(inFlightTask.status?.state as TaskState));
await streamResubscribedTask(inFlightTask.id);
} else {
await reloadSessionFromDB();
toast.info("Session state changed — please review");
}
return;
}
}
}

setChatStatus("thinking");
setStreamingContent("");

// Stamp approvalDecision on the current pending approval messages so they
// are excluded from getPendingApprovalToolIds on future HITL cycles.
// approvalDecision is either a uniform ToolDecision or a per-tool map
Expand Down Expand Up @@ -657,10 +721,8 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se
* Handle ask_user answers submitted by the user. Sends an "approve" decision
* with the answers payload attached, routed to the pending ask_user task.
*/
const handleAskUserSubmit = (answers: Array<{ answer: string[] }>) => {
const handleAskUserSubmit = async (answers: Array<{ answer: string[] }>) => {
const currentSessionId = session?.id || sessionId;
setChatStatus("thinking");
setStreamingContent("");

// Find the taskId from the pending AskUserRequest message
let askUserTaskId: string | undefined;
Expand All @@ -673,6 +735,31 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se
}
}

// Cross-tab guard: another tab may have already answered this question.
if (currentSessionId && askUserTaskId) {
const tasksCheck = await getSessionTasks(currentSessionId);
if (tasksCheck.data) {
const askTask = tasksCheck.data.findLast(task => task.id === askUserTaskId);
if ((askTask?.status?.state as TaskState | undefined) !== "input-required") {
const inFlightTask = tasksCheck.data.findLast(
task => RESUBSCRIBE_TASK_STATES.includes(task.status?.state as TaskState)
);
if (inFlightTask) {
toast.info("Another tab already responded — reconnecting to live updates");
setChatStatus(mapA2AStateToStatus(inFlightTask.status?.state as TaskState));
await streamResubscribedTask(inFlightTask.id);
} else {
await reloadSessionFromDB();
toast.info("Session state changed — please review");
}
return;
}
}
Comment on lines +738 to +757
}

setChatStatus("thinking");
setStreamingContent("");

// Stamp the ask-user message as resolved so we don't show the form again
const stampAskUser = (msgs: Message[]) => msgs.map(m => {
const meta = m.metadata as Record<string, unknown> | undefined;
Expand Down Expand Up @@ -702,7 +789,7 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se
metadata: { timestamp: Date.now() },
};

streamA2AMessage(a2aMessage, {
await streamA2AMessage(a2aMessage, {
errorLabel: "Ask user response failed",
sessionIdForWait: currentSessionId,
onFinally: () => {
Expand Down
Loading