diff --git a/apps/desktop/src/settings/DesktopClientSettings.test.ts b/apps/desktop/src/settings/DesktopClientSettings.test.ts index f666e692860..701e88c1909 100644 --- a/apps/desktop/src/settings/DesktopClientSettings.test.ts +++ b/apps/desktop/src/settings/DesktopClientSettings.test.ts @@ -12,6 +12,7 @@ import * as DesktopEnvironment from "../app/DesktopEnvironment.ts"; import * as DesktopClientSettings from "./DesktopClientSettings.ts"; const clientSettings: ClientSettings = { + autoReconnectSshConnections: true, autoOpenPlanSidebar: false, confirmThreadArchive: true, confirmThreadDelete: false, diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index 7fc7669fe60..afb565522c8 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -185,7 +185,10 @@ import { useServerKeybindings, } from "~/rpc/serverState"; import { sanitizeThreadErrorMessage } from "~/rpc/transportError"; -import { retainThreadDetailSubscription } from "../environments/runtime/service"; +import { + refreshRetainedThreadDetailSubscription, + retainThreadDetailSubscription, +} from "../environments/runtime/service"; import { RightPanelSheet } from "./RightPanelSheet"; import { Button } from "./ui/button"; import { @@ -3095,6 +3098,7 @@ export default function ChatView(props: ChatViewProps) { ...(bootstrap ? { bootstrap } : {}), createdAt: messageCreatedAt, }); + refreshRetainedThreadDetailSubscription(activeThread.environmentId, threadIdForSend); turnStartSucceeded = true; })().catch(async (err: unknown) => { if ( diff --git a/apps/web/src/components/settings/ConnectionsSettings.tsx b/apps/web/src/components/settings/ConnectionsSettings.tsx index 9dab7b61fac..d03796d81f7 100644 --- a/apps/web/src/components/settings/ConnectionsSettings.tsx +++ b/apps/web/src/components/settings/ConnectionsSettings.tsx @@ -117,6 +117,7 @@ import { import { useUiStateStore } from "~/uiStateStore"; import { resolveServerConfigVersionMismatch } from "~/versionSkew"; import { useServerConfig } from "~/rpc/serverState"; +import { useSettings, useUpdateSettings } from "~/hooks/useSettings"; import { connectManagedCloudEnvironment, linkPrimaryEnvironmentToCloud, @@ -1945,6 +1946,10 @@ export function ConnectionsSettings() { DesktopServerExposureState["mode"] | null >(null); const primaryServerConfig = useServerConfig(); + const autoReconnectSshConnections = useSettings( + (settings) => settings.autoReconnectSshConnections, + ); + const { updateSettings } = useUpdateSettings(); const primaryVersionMismatch = resolveServerConfigVersionMismatch(primaryServerConfig); const [isAdvertisedEndpointListExpanded, setIsAdvertisedEndpointListExpanded] = useState(false); const defaultAdvertisedEndpointKey = useUiStateStore( @@ -2856,6 +2861,21 @@ export function ConnectionsSettings() { } /> ); + const renderAutoReconnectSshRow = () => ( + + updateSettings({ autoReconnectSshConnections: Boolean(checked) }) + } + aria-label="Enable Auto-reconnect" + /> + } + /> + ); return ( @@ -2880,6 +2900,7 @@ export function ConnectionsSettings() { {renderNetworkAccessRow()} {renderEndpointRows("endpoint-rail")} {renderTailscaleRow()} + {renderAutoReconnectSshRow()} ) : ( diff --git a/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts b/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts index 675a4868032..c483e96590c 100644 --- a/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts +++ b/apps/web/src/environments/runtime/service.threadSubscriptions.test.ts @@ -1,6 +1,7 @@ import { QueryClient } from "@tanstack/react-query"; import type { WsRpcClient } from "@t3tools/client-runtime"; import { + DEFAULT_CLIENT_SETTINGS, EnvironmentId, ProjectId, ProviderInstanceId, @@ -21,14 +22,32 @@ const mockReadSavedEnvironmentBearerToken = vi.fn(); const mockReadSavedEnvironmentCredential = vi.fn(); const mockSavedEnvironmentRegistrySubscribe = vi.fn(); const mockGetPrimaryKnownEnvironment = vi.hoisted(() => vi.fn()); +const mockSavedEnvironmentRuntimeById: Record> = {}; const mockFetchRemoteSessionState = vi.fn(); const mockResolveRemoteWebSocketConnectionUrl = vi.fn(async () => "ws://remote.example.test/ws"); const mockRemoteHttpRunPromise = vi.fn((effect: Promise) => effect); const mockConnectionReconnects: Array> = []; +const mockGetClientSettings = vi.hoisted(() => vi.fn()); +const mockTransportLifecycleHandlers: Array<{ + readonly onOpen?: () => void; + readonly onClose?: ( + details: { readonly code: number; readonly reason: string }, + context: { readonly intentional: boolean }, + ) => void; +}> = []; let savedEnvironmentRegistryListener: (() => void) | null = null; -function MockWsTransport() { - return undefined; +function MockWsTransport( + _url: unknown, + lifecycleHandlers?: { + readonly onOpen?: () => void; + readonly onClose?: ( + details: { readonly code: number; readonly reason: string }, + context: { readonly intentional: boolean }, + ) => void; + }, +) { + mockTransportLifecycleHandlers.push(lifecycleHandlers ?? {}); } vi.mock("../primary", () => ({ @@ -41,6 +60,11 @@ vi.mock("../../lib/runtime", () => ({ }, })); +vi.mock("~/hooks/useSettings", async (importOriginal) => ({ + ...(await importOriginal()), + getClientSettings: mockGetClientSettings, +})); + vi.mock("./catalog", () => ({ getSavedEnvironmentRecord: mockGetSavedEnvironmentRecord, hasSavedEnvironmentRegistryHydrated: vi.fn(() => true), @@ -60,9 +84,19 @@ vi.mock("./catalog", () => ({ }, useSavedEnvironmentRuntimeStore: { getState: () => ({ - ensure: vi.fn(), - patch: vi.fn(), - clear: vi.fn(), + byId: mockSavedEnvironmentRuntimeById, + ensure: (environmentId: EnvironmentId) => { + mockSavedEnvironmentRuntimeById[environmentId] ??= {}; + }, + patch: (environmentId: EnvironmentId, patch: Record) => { + mockSavedEnvironmentRuntimeById[environmentId] = { + ...mockSavedEnvironmentRuntimeById[environmentId], + ...patch, + }; + }, + clear: (environmentId: EnvironmentId) => { + delete mockSavedEnvironmentRuntimeById[environmentId]; + }, }), }, waitForSavedEnvironmentRegistryHydration: mockWaitForSavedEnvironmentRegistryHydration, @@ -312,11 +346,16 @@ describe("retainThreadDetailSubscription", () => { const token = await mockReadSavedEnvironmentBearerToken(); return token ? { version: 1, method: "bearer", token } : null; }); + mockGetClientSettings.mockReturnValue(DEFAULT_CLIENT_SETTINGS); + for (const key of Object.keys(mockSavedEnvironmentRuntimeById)) { + delete mockSavedEnvironmentRuntimeById[key]; + } mockFetchRemoteSessionState.mockResolvedValue({ authenticated: true, scopes: ["orchestration:read"], }); mockConnectionReconnects.length = 0; + mockTransportLifecycleHandlers.length = 0; }); afterEach(async () => { @@ -357,6 +396,46 @@ describe("retainThreadDetailSubscription", () => { await resetEnvironmentServiceForTests(); }); + it("refreshes a retained thread detail subscription on demand", async () => { + const { + refreshRetainedThreadDetailSubscription, + retainThreadDetailSubscription, + startEnvironmentConnectionService, + resetEnvironmentServiceForTests, + } = await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + const environmentId = EnvironmentId.make("env-1"); + const threadId = ThreadId.make("thread-refresh"); + + const release = retainThreadDetailSubscription(environmentId, threadId); + expect(mockSubscribeThread).toHaveBeenCalledTimes(1); + + expect(refreshRetainedThreadDetailSubscription(environmentId, threadId)).toBe(true); + expect(mockThreadUnsubscribe).toHaveBeenCalledTimes(1); + expect(mockSubscribeThread).toHaveBeenCalledTimes(2); + + release(); + stop(); + await resetEnvironmentServiceForTests(); + }); + + it("does not refresh an unretained thread detail subscription", async () => { + const { refreshRetainedThreadDetailSubscription, resetEnvironmentServiceForTests } = + await import("./service"); + + expect( + refreshRetainedThreadDetailSubscription( + EnvironmentId.make("env-1"), + ThreadId.make("thread-missing"), + ), + ).toBe(false); + expect(mockThreadUnsubscribe).not.toHaveBeenCalled(); + expect(mockSubscribeThread).not.toHaveBeenCalled(); + + await resetEnvironmentServiceForTests(); + }); + it("does not start the primary connection until the known environment has an id", async () => { mockGetPrimaryKnownEnvironment.mockReturnValue({ id: "env-1", @@ -490,6 +569,97 @@ describe("retainThreadDetailSubscription", () => { await resetEnvironmentServiceForTests(); }); + it("refreshes retained thread detail subscriptions after a saved environment reconnect", async () => { + const environmentId = EnvironmentId.make("env-remote"); + const threadId = ThreadId.make("thread-reconnect-refresh"); + const record = { + environmentId, + label: "Remote env", + httpBaseUrl: "http://remote.example.test", + wsBaseUrl: "ws://remote.example.test", + createdAt: "2026-05-01T00:00:00.000Z", + lastConnectedAt: "2026-05-01T00:00:00.000Z", + }; + mockListSavedEnvironmentRecords.mockReturnValue([record]); + mockGetSavedEnvironmentRecord.mockReturnValue(record); + mockReadSavedEnvironmentBearerToken.mockResolvedValue("bearer-token"); + + const { + listEnvironmentConnections, + reconnectSavedEnvironment, + retainThreadDetailSubscription, + startEnvironmentConnectionService, + resetEnvironmentServiceForTests, + } = await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + savedEnvironmentRegistryListener?.(); + await vi.waitFor(() => { + expect( + listEnvironmentConnections().some( + (connection) => connection.environmentId === environmentId, + ), + ).toBe(true); + }); + + const release = retainThreadDetailSubscription(environmentId, threadId); + expect(mockSubscribeThread).toHaveBeenCalledTimes(1); + + await reconnectSavedEnvironment(environmentId); + + expect(mockThreadUnsubscribe).toHaveBeenCalledTimes(1); + expect(mockSubscribeThread).toHaveBeenCalledTimes(2); + + release(); + stop(); + await resetEnvironmentServiceForTests(); + }); + + it("marks saved environment runtime connected after a successful reconnect", async () => { + const environmentId = EnvironmentId.make("env-remote"); + const record = { + environmentId, + label: "Remote env", + httpBaseUrl: "http://remote.example.test", + wsBaseUrl: "ws://remote.example.test", + createdAt: "2026-05-01T00:00:00.000Z", + lastConnectedAt: "2026-05-01T00:00:00.000Z", + }; + mockListSavedEnvironmentRecords.mockReturnValue([record]); + mockGetSavedEnvironmentRecord.mockReturnValue(record); + mockReadSavedEnvironmentBearerToken.mockResolvedValue("bearer-token"); + + const { + listEnvironmentConnections, + reconnectSavedEnvironment, + startEnvironmentConnectionService, + resetEnvironmentServiceForTests, + } = await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + savedEnvironmentRegistryListener?.(); + await vi.waitFor(() => { + expect( + listEnvironmentConnections().some( + (connection) => connection.environmentId === environmentId, + ), + ).toBe(true); + }); + + mockSavedEnvironmentRuntimeById[environmentId] = { + ...mockSavedEnvironmentRuntimeById[environmentId], + connectionState: "disconnected", + disconnectedAt: "2026-05-01T00:00:00.000Z", + }; + + await reconnectSavedEnvironment(environmentId); + + expect(mockSavedEnvironmentRuntimeById[environmentId]?.connectionState).toBe("connected"); + expect(mockSavedEnvironmentRuntimeById[environmentId]?.disconnectedAt).toBeNull(); + stop(); + await resetEnvironmentServiceForTests(); + }); + it("keeps healthy environment streams connected when the browser resumes from the background", async () => { let visibilityState: DocumentVisibilityState = "visible"; const documentTarget = new EventTarget(); @@ -602,6 +772,461 @@ describe("retainThreadDetailSubscription", () => { await resetEnvironmentServiceForTests(); }); + it("auto reconnects saved SSH environments after an unexpected close when enabled", async () => { + const environmentId = EnvironmentId.make("env-ssh"); + const target = { + alias: "devbox", + hostname: "devbox.example.test", + username: null, + port: null, + source: "manual" as const, + }; + const record = { + environmentId, + label: "SSH env", + httpBaseUrl: "http://127.0.0.1:43001", + wsBaseUrl: "ws://127.0.0.1:43001", + createdAt: "2026-05-01T00:00:00.000Z", + lastConnectedAt: "2026-05-01T00:00:00.000Z", + desktopSsh: target, + }; + mockGetClientSettings.mockReturnValue({ + ...DEFAULT_CLIENT_SETTINGS, + autoReconnectSshConnections: true, + }); + mockListSavedEnvironmentRecords.mockReturnValue([record]); + mockGetSavedEnvironmentRecord.mockReturnValue(record); + mockReadSavedEnvironmentBearerToken.mockResolvedValue("ssh-bearer-token"); + vi.stubGlobal("window", { + desktopBridge: { + ensureSshEnvironment: vi.fn(async () => ({ + target, + httpBaseUrl: record.httpBaseUrl, + wsBaseUrl: record.wsBaseUrl, + pairingToken: null, + })), + fetchSshSessionState: vi.fn(async () => ({ + authenticated: true, + scopes: ["orchestration:read"], + })), + issueSshWebSocketTicket: vi.fn(async () => ({ ticket: "ssh-ws-ticket" })), + }, + }); + + const { + listEnvironmentConnections, + resetEnvironmentServiceForTests, + startEnvironmentConnectionService, + } = await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + savedEnvironmentRegistryListener?.(); + await vi.waitFor(() => { + expect( + listEnvironmentConnections().some( + (connection) => connection.environmentId === environmentId, + ), + ).toBe(true); + }); + + const savedConnectionCalls = mockCreateEnvironmentConnection.mock.calls.filter( + ([input]) => input.kind === "saved", + ); + expect(savedConnectionCalls).toHaveLength(1); + const savedReconnect = mockConnectionReconnects.at(-1); + expect(savedReconnect).toBeDefined(); + + const savedLifecycle = mockTransportLifecycleHandlers.find((handlers) => handlers.onClose); + expect(savedLifecycle?.onClose).toBeDefined(); + savedLifecycle?.onClose?.({ code: 1006, reason: "transport lost" }, { intentional: false }); + expect(mockSavedEnvironmentRuntimeById[environmentId]?.connectionState).toBe("connecting"); + await vi.advanceTimersByTimeAsync(5_000); + + expect(savedReconnect).toHaveBeenCalledTimes(1); + expect(mockSavedEnvironmentRuntimeById[environmentId]?.connectionState).toBe("connected"); + await vi.advanceTimersByTimeAsync(5_000); + expect(savedReconnect).toHaveBeenCalledTimes(1); + + stop(); + await resetEnvironmentServiceForTests(); + }); + + it("cancels a pending SSH auto reconnect timer when the user reconnects manually", async () => { + const environmentId = EnvironmentId.make("env-ssh-manual-reconnect"); + const target = { + alias: "devbox-manual", + hostname: "devbox-manual.example.test", + username: null, + port: null, + source: "manual" as const, + }; + const record = { + environmentId, + label: "SSH env manual reconnect", + httpBaseUrl: "http://127.0.0.1:43004", + wsBaseUrl: "ws://127.0.0.1:43004", + createdAt: "2026-05-01T00:00:00.000Z", + lastConnectedAt: "2026-05-01T00:00:00.000Z", + desktopSsh: target, + }; + mockGetClientSettings.mockReturnValue({ + ...DEFAULT_CLIENT_SETTINGS, + autoReconnectSshConnections: true, + }); + mockListSavedEnvironmentRecords.mockReturnValue([record]); + mockGetSavedEnvironmentRecord.mockReturnValue(record); + mockReadSavedEnvironmentBearerToken.mockResolvedValue("ssh-bearer-token"); + vi.stubGlobal("window", { + desktopBridge: { + ensureSshEnvironment: vi.fn(async () => ({ + target, + httpBaseUrl: record.httpBaseUrl, + wsBaseUrl: record.wsBaseUrl, + pairingToken: null, + })), + fetchSshSessionState: vi.fn(async () => ({ + authenticated: true, + scopes: ["orchestration:read"], + })), + issueSshWebSocketTicket: vi.fn(async () => ({ ticket: "ssh-ws-ticket" })), + }, + }); + + const { + listEnvironmentConnections, + reconnectSavedEnvironment, + resetEnvironmentServiceForTests, + startEnvironmentConnectionService, + } = await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + savedEnvironmentRegistryListener?.(); + await vi.waitFor(() => { + expect( + listEnvironmentConnections().some( + (connection) => connection.environmentId === environmentId, + ), + ).toBe(true); + }); + + const savedReconnect = mockConnectionReconnects.at(-1); + expect(savedReconnect).toBeDefined(); + + const savedLifecycle = mockTransportLifecycleHandlers.find((handlers) => handlers.onClose); + savedLifecycle?.onClose?.({ code: 1006, reason: "transport lost" }, { intentional: false }); + await reconnectSavedEnvironment(environmentId); + expect(savedReconnect).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(5_000); + expect(savedReconnect).toHaveBeenCalledTimes(1); + + stop(); + await resetEnvironmentServiceForTests(); + }); + + it("dedupes manual reconnect while SSH auto reconnect is already in progress", async () => { + const environmentId = EnvironmentId.make("env-ssh-overlap-reconnect"); + const target = { + alias: "devbox-overlap", + hostname: "devbox-overlap.example.test", + username: null, + port: null, + source: "manual" as const, + }; + const record = { + environmentId, + label: "SSH env overlap reconnect", + httpBaseUrl: "http://127.0.0.1:43006", + wsBaseUrl: "ws://127.0.0.1:43006", + createdAt: "2026-05-01T00:00:00.000Z", + lastConnectedAt: "2026-05-01T00:00:00.000Z", + desktopSsh: target, + }; + mockGetClientSettings.mockReturnValue({ + ...DEFAULT_CLIENT_SETTINGS, + autoReconnectSshConnections: true, + }); + mockListSavedEnvironmentRecords.mockReturnValue([record]); + mockGetSavedEnvironmentRecord.mockReturnValue(record); + mockReadSavedEnvironmentBearerToken.mockResolvedValue("ssh-bearer-token"); + vi.stubGlobal("window", { + desktopBridge: { + ensureSshEnvironment: vi.fn(async () => ({ + target, + httpBaseUrl: record.httpBaseUrl, + wsBaseUrl: record.wsBaseUrl, + pairingToken: null, + })), + fetchSshSessionState: vi.fn(async () => ({ + authenticated: true, + scopes: ["orchestration:read"], + })), + issueSshWebSocketTicket: vi.fn(async () => ({ ticket: "ssh-ws-ticket" })), + }, + }); + + const { + listEnvironmentConnections, + reconnectSavedEnvironment, + resetEnvironmentServiceForTests, + startEnvironmentConnectionService, + } = await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + savedEnvironmentRegistryListener?.(); + await vi.waitFor(() => { + expect( + listEnvironmentConnections().some( + (connection) => connection.environmentId === environmentId, + ), + ).toBe(true); + }); + + let finishReconnect = () => {}; + const reconnectGate = new Promise((resolve) => { + finishReconnect = resolve; + }); + const savedReconnect = mockConnectionReconnects.at(-1); + expect(savedReconnect).toBeDefined(); + savedReconnect?.mockImplementation(async () => { + await reconnectGate; + }); + + const savedLifecycle = mockTransportLifecycleHandlers.find((handlers) => handlers.onClose); + savedLifecycle?.onClose?.({ code: 1006, reason: "transport lost" }, { intentional: false }); + await vi.advanceTimersByTimeAsync(5_000); + expect(savedReconnect).toHaveBeenCalledTimes(1); + + const manualReconnect = reconnectSavedEnvironment(environmentId); + await Promise.resolve(); + expect(savedReconnect).toHaveBeenCalledTimes(1); + + finishReconnect(); + await manualReconnect; + expect(savedReconnect).toHaveBeenCalledTimes(1); + + stop(); + await resetEnvironmentServiceForTests(); + }); + + it("does not auto reconnect saved SSH environments when disabled", async () => { + const environmentId = EnvironmentId.make("env-ssh-disabled"); + const target = { + alias: "devbox-disabled", + hostname: "devbox-disabled.example.test", + username: null, + port: null, + source: "manual" as const, + }; + const record = { + environmentId, + label: "SSH env disabled", + httpBaseUrl: "http://127.0.0.1:43002", + wsBaseUrl: "ws://127.0.0.1:43002", + createdAt: "2026-05-01T00:00:00.000Z", + lastConnectedAt: "2026-05-01T00:00:00.000Z", + desktopSsh: target, + }; + mockListSavedEnvironmentRecords.mockReturnValue([record]); + mockGetSavedEnvironmentRecord.mockReturnValue(record); + mockReadSavedEnvironmentBearerToken.mockResolvedValue("ssh-bearer-token"); + vi.stubGlobal("window", { + desktopBridge: { + ensureSshEnvironment: vi.fn(async () => ({ + target, + httpBaseUrl: record.httpBaseUrl, + wsBaseUrl: record.wsBaseUrl, + pairingToken: null, + })), + fetchSshSessionState: vi.fn(async () => ({ + authenticated: true, + scopes: ["orchestration:read"], + })), + issueSshWebSocketTicket: vi.fn(async () => ({ ticket: "ssh-ws-ticket" })), + }, + }); + + const { + listEnvironmentConnections, + resetEnvironmentServiceForTests, + startEnvironmentConnectionService, + } = await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + savedEnvironmentRegistryListener?.(); + await vi.waitFor(() => { + expect( + listEnvironmentConnections().some( + (connection) => connection.environmentId === environmentId, + ), + ).toBe(true); + }); + + const savedReconnect = mockConnectionReconnects.at(-1); + const savedLifecycle = mockTransportLifecycleHandlers.find((handlers) => handlers.onClose); + savedLifecycle?.onClose?.({ code: 1006, reason: "transport lost" }, { intentional: false }); + expect(mockSavedEnvironmentRuntimeById[environmentId]?.connectionState).toBe("disconnected"); + await vi.advanceTimersByTimeAsync(5_000); + + expect(savedReconnect).not.toHaveBeenCalled(); + + stop(); + await resetEnvironmentServiceForTests(); + }); + + it("continues auto reconnecting saved SSH environments every five seconds while failures continue", async () => { + const environmentId = EnvironmentId.make("env-ssh-loop"); + const target = { + alias: "devbox-loop", + hostname: "devbox-loop.example.test", + username: null, + port: null, + source: "manual" as const, + }; + const record = { + environmentId, + label: "SSH env loop", + httpBaseUrl: "http://127.0.0.1:43003", + wsBaseUrl: "ws://127.0.0.1:43003", + createdAt: "2026-05-01T00:00:00.000Z", + lastConnectedAt: "2026-05-01T00:00:00.000Z", + desktopSsh: target, + }; + mockGetClientSettings.mockReturnValue({ + ...DEFAULT_CLIENT_SETTINGS, + autoReconnectSshConnections: true, + }); + mockListSavedEnvironmentRecords.mockReturnValue([record]); + mockGetSavedEnvironmentRecord.mockReturnValue(record); + mockReadSavedEnvironmentBearerToken.mockResolvedValue("ssh-bearer-token"); + vi.stubGlobal("window", { + desktopBridge: { + ensureSshEnvironment: vi.fn(async () => ({ + target, + httpBaseUrl: record.httpBaseUrl, + wsBaseUrl: record.wsBaseUrl, + pairingToken: null, + })), + fetchSshSessionState: vi.fn(async () => ({ + authenticated: true, + scopes: ["orchestration:read"], + })), + issueSshWebSocketTicket: vi.fn(async () => ({ ticket: "ssh-ws-ticket" })), + }, + }); + + const { + listEnvironmentConnections, + resetEnvironmentServiceForTests, + startEnvironmentConnectionService, + } = await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + savedEnvironmentRegistryListener?.(); + await vi.waitFor(() => { + expect( + listEnvironmentConnections().some( + (connection) => connection.environmentId === environmentId, + ), + ).toBe(true); + }); + + const savedReconnect = mockConnectionReconnects.at(-1); + expect(savedReconnect).toBeDefined(); + savedReconnect?.mockRejectedValue(new Error("still down")); + + const savedLifecycle = mockTransportLifecycleHandlers.find((handlers) => handlers.onClose); + savedLifecycle?.onClose?.({ code: 1006, reason: "transport lost" }, { intentional: false }); + expect(mockSavedEnvironmentRuntimeById[environmentId]?.connectionState).toBe("connecting"); + await vi.advanceTimersByTimeAsync(5_000); + expect(savedReconnect).toHaveBeenCalledTimes(1); + expect(mockSavedEnvironmentRuntimeById[environmentId]?.connectionState).toBe("connecting"); + + await vi.advanceTimersByTimeAsync(5_000); + expect(savedReconnect).toHaveBeenCalledTimes(2); + expect(mockSavedEnvironmentRuntimeById[environmentId]?.connectionState).toBe("connecting"); + + stop(); + await resetEnvironmentServiceForTests(); + }); + + it("continues auto reconnecting saved SSH environments when the socket opens before reconnect fails", async () => { + const environmentId = EnvironmentId.make("env-ssh-open-then-fail"); + const target = { + alias: "devbox-open-fail", + hostname: "devbox-open-fail.example.test", + username: null, + port: null, + source: "manual" as const, + }; + const record = { + environmentId, + label: "SSH env open then fail", + httpBaseUrl: "http://127.0.0.1:43005", + wsBaseUrl: "ws://127.0.0.1:43005", + createdAt: "2026-05-01T00:00:00.000Z", + lastConnectedAt: "2026-05-01T00:00:00.000Z", + desktopSsh: target, + }; + mockGetClientSettings.mockReturnValue({ + ...DEFAULT_CLIENT_SETTINGS, + autoReconnectSshConnections: true, + }); + mockListSavedEnvironmentRecords.mockReturnValue([record]); + mockGetSavedEnvironmentRecord.mockReturnValue(record); + mockReadSavedEnvironmentBearerToken.mockResolvedValue("ssh-bearer-token"); + vi.stubGlobal("window", { + desktopBridge: { + ensureSshEnvironment: vi.fn(async () => ({ + target, + httpBaseUrl: record.httpBaseUrl, + wsBaseUrl: record.wsBaseUrl, + pairingToken: null, + })), + fetchSshSessionState: vi.fn(async () => ({ + authenticated: true, + scopes: ["orchestration:read"], + })), + issueSshWebSocketTicket: vi.fn(async () => ({ ticket: "ssh-ws-ticket" })), + }, + }); + + const { + listEnvironmentConnections, + resetEnvironmentServiceForTests, + startEnvironmentConnectionService, + } = await import("./service"); + + const stop = startEnvironmentConnectionService(new QueryClient()); + savedEnvironmentRegistryListener?.(); + await vi.waitFor(() => { + expect( + listEnvironmentConnections().some( + (connection) => connection.environmentId === environmentId, + ), + ).toBe(true); + }); + + const savedReconnect = mockConnectionReconnects.at(-1); + expect(savedReconnect).toBeDefined(); + const savedLifecycle = mockTransportLifecycleHandlers.find((handlers) => handlers.onOpen); + savedReconnect?.mockImplementation(async () => { + savedLifecycle?.onOpen?.(); + throw new Error("metadata refresh failed"); + }); + + savedLifecycle?.onClose?.({ code: 1006, reason: "transport lost" }, { intentional: false }); + await vi.advanceTimersByTimeAsync(5_000); + expect(savedReconnect).toHaveBeenCalledTimes(1); + expect(mockSavedEnvironmentRuntimeById[environmentId]?.connectionState).toBe("connecting"); + + await vi.advanceTimersByTimeAsync(5_000); + expect(savedReconnect).toHaveBeenCalledTimes(2); + + stop(); + await resetEnvironmentServiceForTests(); + }); + it("allows a larger idle cache before capacity eviction starts", async () => { const { retainThreadDetailSubscription, diff --git a/apps/web/src/environments/runtime/service.ts b/apps/web/src/environments/runtime/service.ts index 38c62f55aca..a30ceb86dcd 100644 --- a/apps/web/src/environments/runtime/service.ts +++ b/apps/web/src/environments/runtime/service.ts @@ -129,6 +129,9 @@ const pendingSavedEnvironmentConnections = new Map< EnvironmentId, PendingSavedEnvironmentConnection >(); +const pendingSavedEnvironmentReconnects = new Map>(); +const pendingSshAutoReconnectTimeouts = new Map>(); +const sshAutoReconnectEpochs = new Map(); const environmentConnectionListeners = new Set<() => void>(); const providerInvalidationListeners = new Set<() => void>(); const threadDetailSubscriptions = new Map(); @@ -163,6 +166,7 @@ let lastBrowserResumeReconnectAt = Number.NEGATIVE_INFINITY; const THREAD_DETAIL_SUBSCRIPTION_IDLE_EVICTION_MS = 15 * 60 * 1000; const MAX_CACHED_THREAD_DETAIL_SUBSCRIPTIONS = 32; const BROWSER_RESUME_RECONNECT_COOLDOWN_MS = 2_000; +const SSH_AUTO_RECONNECT_DELAY_MS = 5_000; const INITIAL_SERVER_CONFIG_SNAPSHOT_WAIT_MS = 150; const NOOP = () => undefined; const SSH_HTTP_STATUS_RE = /^\[ssh_http:(\d+)\]\s/u; @@ -469,6 +473,38 @@ function attachThreadDetailSubscriptionsForEnvironment(environmentId: Environmen } } +function reattachThreadDetailSubscriptionsForEnvironment(environmentId: EnvironmentId): void { + for (const entry of threadDetailSubscriptions.values()) { + if (entry.environmentId !== environmentId) { + continue; + } + entry.unsubscribe(); + entry.unsubscribe = NOOP; + attachThreadDetailSubscription(entry); + } +} + +export function refreshRetainedThreadDetailSubscription( + environmentId: EnvironmentId, + threadId: ThreadId, +): boolean { + const entry = threadDetailSubscriptions.get( + getThreadDetailSubscriptionKey(environmentId, threadId), + ); + if (!entry) { + return false; + } + + entry.unsubscribe(); + entry.unsubscribe = NOOP; + if (!attachThreadDetailSubscription(entry)) { + watchThreadDetailSubscriptionConnection(entry); + return false; + } + entry.lastAccessedAt = Date.now(); + return true; +} + function reconcileThreadDetailSubscriptionsForEnvironment( environmentId: EnvironmentId, threadIds: ReadonlyArray, @@ -905,6 +941,69 @@ function setRuntimeError(environmentId: EnvironmentId, error: unknown) { }); } +function clearPendingSshAutoReconnect(environmentId: EnvironmentId): void { + sshAutoReconnectEpochs.set(environmentId, (sshAutoReconnectEpochs.get(environmentId) ?? 0) + 1); + cancelPendingSshAutoReconnectTimer(environmentId); +} + +function cancelPendingSshAutoReconnectTimer(environmentId: EnvironmentId): void { + const timeoutId = pendingSshAutoReconnectTimeouts.get(environmentId); + if (!timeoutId) { + return; + } + clearTimeout(timeoutId); + pendingSshAutoReconnectTimeouts.delete(environmentId); +} + +function shouldAutoReconnectSsh(environmentId: EnvironmentId, epoch: number): boolean { + if ((sshAutoReconnectEpochs.get(environmentId) ?? 0) !== epoch) { + return false; + } + + const record = getSavedEnvironmentRecord(environmentId); + if (!record?.desktopSsh) { + return false; + } + if (!getClientSettings().autoReconnectSshConnections) { + return false; + } + + const runtimeState = useSavedEnvironmentRuntimeStore.getState().byId[environmentId]; + return ( + runtimeState?.connectionState === "connecting" || + runtimeState?.connectionState === "disconnected" || + runtimeState?.connectionState === "error" + ); +} + +function scheduleSshAutoReconnect(environmentId: EnvironmentId): void { + if (pendingSshAutoReconnectTimeouts.has(environmentId)) { + return; + } + + const epoch = sshAutoReconnectEpochs.get(environmentId) ?? 0; + const timeoutId = setTimeout(() => { + pendingSshAutoReconnectTimeouts.delete(environmentId); + if (!shouldAutoReconnectSsh(environmentId, epoch)) { + return; + } + + void reconnectSavedEnvironment(environmentId, { + preserveSshAutoReconnectEpoch: true, + }).catch((error) => { + console.warn("SSH auto reconnect failed", { + environmentId, + error: error instanceof Error ? error.message : String(error), + }); + if (shouldAutoReconnectSsh(environmentId, epoch)) { + setRuntimeConnecting(environmentId); + scheduleSshAutoReconnect(environmentId); + } + }); + }, SSH_AUTO_RECONNECT_DELAY_MS); + pendingSshAutoReconnectTimeouts.set(environmentId, timeoutId); +} + function coalesceOrchestrationUiEvents( events: ReadonlyArray, ): OrchestrationEvent[] { @@ -1142,7 +1241,14 @@ function createEnvironmentConnectionHandlers() { }; } -function createWsRpcClient(transport: WsTransport): WsRpcClient { +function createWsRpcClient( + transport: WsTransport, + options?: { readonly resetGlobalReconnectBackoff?: boolean }, +): WsRpcClient { + const resetGlobalReconnectBackoff = options?.resetGlobalReconnectBackoff !== false; + if (!resetGlobalReconnectBackoff) { + return createBaseWsRpcClient(transport); + } return createBaseWsRpcClient(transport, { beforeReconnect: () => resetWsReconnectBackoff(), }); @@ -1256,6 +1362,7 @@ function createSavedEnvironmentClient( setRuntimeConnecting(environmentId); }, onOpen: () => { + cancelPendingSshAutoReconnectTimer(environmentId); setRuntimeConnected(environmentId); }, onError: (message: string) => { @@ -1268,7 +1375,14 @@ function createSavedEnvironmentClient( lastErrorAt: isoNow(), }); }, - onClose: (details: { readonly code: number; readonly reason: string }) => { + onClose: ( + details: { readonly code: number; readonly reason: string }, + context: { readonly intentional: boolean }, + ) => { + const shouldScheduleAutoReconnect = + !context.intentional && + getSavedEnvironmentRecord(environmentId)?.desktopSsh && + getClientSettings().autoReconnectSshConnections; setRuntimeDisconnected( environmentId, appendVersionMismatchHint( @@ -1278,9 +1392,18 @@ function createSavedEnvironmentClient( ), ), ); + if (shouldScheduleAutoReconnect) { + setRuntimeConnecting(environmentId); + scheduleSshAutoReconnect(environmentId); + } }, }, + { + trackGlobalConnectionState: false, + trackGlobalRequestLatency: false, + }, ), + { resetGlobalReconnectBackoff: false }, ); } @@ -1758,6 +1881,7 @@ export async function disconnectSavedEnvironment(environmentId: EnvironmentId): if (connection?.kind === "saved") { await removeConnection(environmentId).catch(() => false); } + clearPendingSshAutoReconnect(environmentId); setRuntimeDisconnected(environmentId); if (record?.desktopSsh && typeof window !== "undefined") { @@ -1766,55 +1890,82 @@ export async function disconnectSavedEnvironment(environmentId: EnvironmentId): } } -export async function reconnectSavedEnvironment(environmentId: EnvironmentId): Promise { +export async function reconnectSavedEnvironment( + environmentId: EnvironmentId, + options?: { readonly preserveSshAutoReconnectEpoch?: boolean }, +): Promise { const record = getSavedEnvironmentRecord(environmentId); if (!record) { throw new Error("Saved environment not found."); } + if (record.desktopSsh) { + if (options?.preserveSshAutoReconnectEpoch) { + cancelPendingSshAutoReconnectTimer(environmentId); + } else { + clearPendingSshAutoReconnect(environmentId); + } + } + + const pendingReconnect = pendingSavedEnvironmentReconnects.get(environmentId); + if (pendingReconnect) { + return await pendingReconnect; + } + + const reconnectPromise = Promise.resolve().then(async () => { + const connection = environmentConnections.get(environmentId); + if (!connection) { + setRuntimeConnecting(environmentId); + try { + await ensureSavedEnvironmentConnection(record); + return; + } catch (error) { + if (isSavedEnvironmentConnectionCancelledError(error)) { + return; + } + setRuntimeError(environmentId, error); + throw error; + } + } - const connection = environmentConnections.get(environmentId); - if (!connection) { setRuntimeConnecting(environmentId); try { - await ensureSavedEnvironmentConnection(record); - return; + if (record.desktopSsh) { + await prepareSavedEnvironmentRecordForConnection(record); + } + await connection.reconnect(); + reattachThreadDetailSubscriptionsForEnvironment(environmentId); + setRuntimeConnected(environmentId); } catch (error) { - if (isSavedEnvironmentConnectionCancelledError(error)) { - return; + if (record.desktopSsh) { + try { + const issued = await issueDesktopSshBearerSession( + getSavedEnvironmentRecord(environmentId) ?? record, + ); + await removeConnection(environmentId).catch(() => false); + await ensureSavedEnvironmentConnection(issued.record, { + bearerToken: issued.bearerToken, + scopes: issued.scopes, + }); + return; + } catch (recoveryError) { + if (isSavedEnvironmentConnectionCancelledError(recoveryError)) { + return; + } + setRuntimeError(environmentId, recoveryError); + throw recoveryError; + } } setRuntimeError(environmentId, error); throw error; } - } - - setRuntimeConnecting(environmentId); + }); + pendingSavedEnvironmentReconnects.set(environmentId, reconnectPromise); try { - if (record.desktopSsh) { - await prepareSavedEnvironmentRecordForConnection(record); + await reconnectPromise; + } finally { + if (pendingSavedEnvironmentReconnects.get(environmentId) === reconnectPromise) { + pendingSavedEnvironmentReconnects.delete(environmentId); } - await connection.reconnect(); - } catch (error) { - if (record.desktopSsh) { - try { - const issued = await issueDesktopSshBearerSession( - getSavedEnvironmentRecord(environmentId) ?? record, - ); - await removeConnection(environmentId).catch(() => false); - await ensureSavedEnvironmentConnection(issued.record, { - bearerToken: issued.bearerToken, - scopes: issued.scopes, - }); - return; - } catch (recoveryError) { - if (isSavedEnvironmentConnectionCancelledError(recoveryError)) { - return; - } - setRuntimeError(environmentId, recoveryError); - throw recoveryError; - } - } - setRuntimeError(environmentId, error); - throw error; } } @@ -1824,6 +1975,7 @@ export async function removeSavedEnvironment(environmentId: EnvironmentId): Prom useSavedEnvironmentRegistryStore.getState().remove(environmentId); useSavedEnvironmentRuntimeStore.getState().clear(environmentId); useStore.getState().removeEnvironmentState(environmentId); + clearPendingSshAutoReconnect(environmentId); await removeSavedEnvironmentBearerToken(environmentId); } @@ -2019,6 +2171,12 @@ export function startEnvironmentConnectionService(queryClient: QueryClient): () stop: () => { unsubscribeSavedEnvironments(); unsubscribeBrowserResumeReconnects(); + for (const timeoutId of pendingSshAutoReconnectTimeouts.values()) { + clearTimeout(timeoutId); + } + pendingSshAutoReconnectTimeouts.clear(); + sshAutoReconnectEpochs.clear(); + pendingSavedEnvironmentReconnects.clear(); queryInvalidationThrottler.cancel(); }, }; @@ -2040,6 +2198,12 @@ export async function resetEnvironmentServiceForTests(): Promise { lastBrowserResumeReconnectAt = Number.NEGATIVE_INFINITY; lastAppliedProjectionVersionByEnvironment.clear(); pendingSavedEnvironmentConnections.clear(); + pendingSavedEnvironmentReconnects.clear(); + for (const timeoutId of pendingSshAutoReconnectTimeouts.values()) { + clearTimeout(timeoutId); + } + pendingSshAutoReconnectTimeouts.clear(); + sshAutoReconnectEpochs.clear(); savedEnvironmentConnectionAttempts.clear(); for (const key of Array.from(threadDetailSubscriptions.keys())) { disposeThreadDetailSubscriptionByKey(key); diff --git a/apps/web/src/localApi.test.ts b/apps/web/src/localApi.test.ts index e50dbd9f5f8..cfe47e7c4b1 100644 --- a/apps/web/src/localApi.test.ts +++ b/apps/web/src/localApi.test.ts @@ -641,6 +641,7 @@ describe("wsApi", () => { it("reads and writes persistence through the desktop bridge when available", async () => { const clientSettings = { + autoReconnectSshConnections: false, autoOpenPlanSidebar: false, confirmThreadArchive: true, confirmThreadDelete: false, @@ -704,6 +705,7 @@ describe("wsApi", () => { const { createLocalApi } = await import("./localApi"); const api = createLocalApi(rpcClientMock as never); const clientSettings = { + autoReconnectSshConnections: false, autoOpenPlanSidebar: false, confirmThreadArchive: true, confirmThreadDelete: false, diff --git a/apps/web/src/rpc/wsTransport.test.ts b/apps/web/src/rpc/wsTransport.test.ts index eb6fb494da2..558ce8ccf42 100644 --- a/apps/web/src/rpc/wsTransport.test.ts +++ b/apps/web/src/rpc/wsTransport.test.ts @@ -261,6 +261,52 @@ describe("WsTransport (web instrumentation)", () => { await transport.dispose(); }); + it("can disable global websocket state tracking for saved environment transports", async () => { + const onOpen = vi.fn(); + const onClose = vi.fn(); + const transport = createTransport( + "ws://remote.example.test", + { + onOpen, + onClose, + }, + { + trackGlobalConnectionState: false, + trackGlobalRequestLatency: false, + }, + ); + + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + const socket = getSocket(); + socket.open(); + socket.close(1006, "remote tunnel lost"); + + await waitFor(() => { + expect(onOpen).toHaveBeenCalledOnce(); + expect(onClose).toHaveBeenCalledWith( + { + code: 1006, + reason: "remote tunnel lost", + }, + { + intentional: false, + }, + ); + }); + expect(getWsConnectionStatus()).toMatchObject({ + attemptCount: 0, + hasConnected: false, + phase: "idle", + reconnectAttemptCount: 0, + reconnectPhase: "idle", + }); + + await transport.dispose(); + }); + it("marks unary requests as slow until the first server ack arrives", async () => { const slowAckThresholdMs = 25; setSlowRpcAckThresholdMsForTests(slowAckThresholdMs); diff --git a/apps/web/src/rpc/wsTransport.ts b/apps/web/src/rpc/wsTransport.ts index 7c3b4303f3a..2f2ed6643db 100644 --- a/apps/web/src/rpc/wsTransport.ts +++ b/apps/web/src/rpc/wsTransport.ts @@ -22,42 +22,68 @@ import { function createWsRpcProtocolLayer( url: WsRpcProtocolSocketUrlProvider, handlers?: WsProtocolLifecycleHandlers, + options?: { + readonly trackGlobalConnectionState?: boolean; + readonly trackGlobalRequestLatency?: boolean; + }, ) { + const trackGlobalConnectionState = options?.trackGlobalConnectionState !== false; + const trackGlobalRequestLatency = options?.trackGlobalRequestLatency !== false; return createSharedWsRpcProtocolLayer(url, handlers, { - telemetryLifecycle: { - onAttempt: recordWsConnectionAttempt, - onOpen: recordWsConnectionOpened, - onError: (message) => { - clearAllTrackedRpcRequests(); - recordWsConnectionErrored(message); - }, - onClose: (details, context) => { - clearAllTrackedRpcRequests(); - if (context.intentional) { - return; + ...(trackGlobalConnectionState + ? { + telemetryLifecycle: { + onAttempt: recordWsConnectionAttempt, + onOpen: recordWsConnectionOpened, + onError: (message: string) => { + clearAllTrackedRpcRequests(); + recordWsConnectionErrored(message); + }, + onClose: ( + details: { readonly code: number; readonly reason: string }, + context: { readonly intentional: boolean }, + ) => { + clearAllTrackedRpcRequests(); + if (context.intentional) { + return; + } + recordWsConnectionClosed(details); + }, + }, } - recordWsConnectionClosed(details); - }, - }, - requestTelemetry: { - onRequestSent: trackRpcRequestSent, - onRequestAcknowledged: acknowledgeRpcRequest, - onClearTrackedRequests: clearAllTrackedRpcRequests, - }, + : {}), + ...(trackGlobalRequestLatency + ? { + requestTelemetry: { + onRequestSent: trackRpcRequestSent, + onRequestAcknowledged: acknowledgeRpcRequest, + onClearTrackedRequests: clearAllTrackedRpcRequests, + }, + } + : {}), }); } -const webWsTransportOptions = { - tracingLayer: ClientTracingLive, - createProtocolLayer: createWsRpcProtocolLayer, - onBeforeReconnect: () => clearAllTrackedRpcRequests(), -} satisfies WsTransportOptions; +export interface WebWsTransportOptions { + readonly trackGlobalConnectionState?: boolean; + readonly trackGlobalRequestLatency?: boolean; +} + +function makeWebWsTransportOptions(options?: WebWsTransportOptions): WsTransportOptions { + const trackGlobalRequestLatency = options?.trackGlobalRequestLatency !== false; + return { + tracingLayer: ClientTracingLive, + createProtocolLayer: (url, handlers) => createWsRpcProtocolLayer(url, handlers, options), + ...(trackGlobalRequestLatency ? { onBeforeReconnect: () => clearAllTrackedRpcRequests() } : {}), + } satisfies WsTransportOptions; +} export class WsTransport extends BaseWsTransport { constructor( url: WsRpcProtocolSocketUrlProvider, lifecycleHandlers?: WsProtocolLifecycleHandlers, + options?: WebWsTransportOptions, ) { - super(url, lifecycleHandlers, webWsTransportOptions); + super(url, lifecycleHandlers, makeWebWsTransportOptions(options)); } } diff --git a/packages/contracts/src/settings.ts b/packages/contracts/src/settings.ts index 66bb7631311..6c2e12bc285 100644 --- a/packages/contracts/src/settings.ts +++ b/packages/contracts/src/settings.ts @@ -40,6 +40,9 @@ export type SidebarThreadPreviewCount = typeof SidebarThreadPreviewCount.Type; export const DEFAULT_SIDEBAR_THREAD_PREVIEW_COUNT: SidebarThreadPreviewCount = 6; export const ClientSettingsSchema = Schema.Struct({ + autoReconnectSshConnections: Schema.Boolean.pipe( + Schema.withDecodingDefault(Effect.succeed(false)), + ), autoOpenPlanSidebar: Schema.Boolean.pipe(Schema.withDecodingDefault(Effect.succeed(true))), confirmThreadArchive: Schema.Boolean.pipe(Schema.withDecodingDefault(Effect.succeed(false))), confirmThreadDelete: Schema.Boolean.pipe(Schema.withDecodingDefault(Effect.succeed(true))), @@ -475,6 +478,7 @@ export const ServerSettingsPatch = Schema.Struct({ export type ServerSettingsPatch = typeof ServerSettingsPatch.Type; export const ClientSettingsPatch = Schema.Struct({ + autoReconnectSshConnections: Schema.optionalKey(Schema.Boolean), autoOpenPlanSidebar: Schema.optionalKey(Schema.Boolean), confirmThreadArchive: Schema.optionalKey(Schema.Boolean), confirmThreadDelete: Schema.optionalKey(Schema.Boolean),