diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dcc92f1..04dc7fa3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,10 @@ - Every `coder.*` command now records a `command.invoked` telemetry event with its duration and outcome, so command latency and failures are captured alongside other local telemetry. +- Connection lifecycle now records local telemetry: SSH process + discovery/loss/recovery with sampled network info, and reconnecting + WebSocket open, drop, reconnect, and state transitions, so connection + stability is captured alongside other local telemetry. ### Fixed diff --git a/src/api/coderApi.ts b/src/api/coderApi.ts index 57e27117..e350127f 100644 --- a/src/api/coderApi.ts +++ b/src/api/coderApi.ts @@ -36,6 +36,10 @@ import { } from "../logging/types"; import { sizeOf } from "../logging/utils"; import { getHeaderCommand } from "../settings/headers"; +import { + NOOP_TELEMETRY_REPORTER, + type TelemetryReporter, +} from "../telemetry/reporter"; import { HttpStatusCode, WebSocketCloseCode } from "../websocket/codes"; import { type UnidirectionalStream, @@ -49,6 +53,7 @@ import { import { ConnectionState, ReconnectingWebSocket, + type ReconnectingWebSocketOptions, type SocketFactory, } from "../websocket/reconnectingWebSocket"; import { SseConnection } from "../websocket/sseConnection"; @@ -86,7 +91,10 @@ export class CoderApi extends Api implements vscode.Disposable { >(); private readonly configWatcher: vscode.Disposable; - private constructor(private readonly output: Logger) { + private constructor( + private readonly output: Logger, + private readonly telemetry: TelemetryReporter, + ) { super(); this.configWatcher = this.watchConfigChanges(); } @@ -99,8 +107,9 @@ export class CoderApi extends Api implements vscode.Disposable { baseUrl: string, token: string | undefined, output: Logger, + telemetry: TelemetryReporter = NOOP_TELEMETRY_REPORTER, ): CoderApi { - const client = new CoderApi(output); + const client = new CoderApi(output, telemetry); client.setCredentials(baseUrl, token); setupInterceptors(client, output); @@ -449,18 +458,21 @@ export class CoderApi extends Api implements vscode.Disposable { private async createReconnectingSocket( socketFactory: SocketFactory, ): Promise> { + const options: ReconnectingWebSocketOptions = { + telemetry: this.telemetry, + onCertificateRefreshNeeded: async () => { + const refreshCommand = getRefreshCommand(); + if (!refreshCommand) { + return false; + } + return refreshCertificates(refreshCommand, this.output); + }, + }; + const reconnectingSocket = await ReconnectingWebSocket.create( socketFactory, this.output, - { - onCertificateRefreshNeeded: async () => { - const refreshCommand = getRefreshCommand(); - if (!refreshCommand) { - return false; - } - return refreshCertificates(refreshCommand, this.output); - }, - }, + options, () => this.reconnectingSockets.delete(reconnectingSocket), ); diff --git a/src/extension.ts b/src/extension.ts index 0ba96aff..23d5258b 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -114,6 +114,7 @@ export async function activate(ctx: vscode.ExtensionContext): Promise { (await secretsManager.getSessionAuth(deployment?.safeHostname ?? "")) ?.token, output, + serviceContainer.getTelemetryService(), ); ctx.subscriptions.push(client); diff --git a/src/instrumentation/ssh.ts b/src/instrumentation/ssh.ts new file mode 100644 index 00000000..690d2ff2 --- /dev/null +++ b/src/instrumentation/ssh.ts @@ -0,0 +1,175 @@ +import type { NetworkInfo } from "../remote/sshProcess"; +import type { TelemetryReporter } from "../telemetry/reporter"; + +const NETWORK_SAMPLE_INTERVAL_MS = 60_000; +const NETWORK_LATENCY_CHANGE_RATIO = 0.1; + +export type ProcessLossCause = "stale_network_info" | "missing_network_info"; + +interface NetworkSample { + readonly emittedAtMs: number; + readonly p2p: boolean; + readonly preferredDerp: string; + readonly latencyMs: number; +} + +interface ProcessDiscoveryResult { + readonly pid: number | undefined; + readonly attempts: number; +} + +export class SshTelemetry { + readonly #telemetry: TelemetryReporter; + #processStartedAtMs: number | undefined; + #processLostAtMs: number | undefined; + #lastNetworkSample: NetworkSample | undefined; + + public constructor(telemetry: TelemetryReporter) { + this.#telemetry = telemetry; + } + + public traceProcessDiscovery( + fn: () => Promise, + ): Promise { + return this.#telemetry.trace("ssh.process.discovered", async (span) => { + const { pid, attempts } = await fn(); + span.setProperty("found", String(pid !== undefined)); + span.setMeasurement("attempts", attempts); + return pid; + }); + } + + public processStarted(): void { + this.#processStartedAtMs = performance.now(); + this.#processLostAtMs = undefined; + } + + public processLost(cause: ProcessLossCause): void { + if ( + this.#processStartedAtMs === undefined || + this.#processLostAtMs !== undefined + ) { + return; + } + const now = performance.now(); + this.#processLostAtMs = now; + this.#telemetry.log( + "ssh.process.lost", + { cause }, + { uptimeMs: now - this.#processStartedAtMs }, + ); + } + + public processRecovered(): void { + if (this.#processLostAtMs === undefined) { + return; + } + this.#telemetry.log( + "ssh.process.recovered", + {}, + { recoveryDurationMs: performance.now() - this.#processLostAtMs }, + ); + this.#processLostAtMs = undefined; + } + + /** Handover to a different SSH process. Always emits `ssh.process.replaced`, + * even when the prior process was already lost (replacement is operationally + * distinct from recovery). */ + public processReplaced(): void { + const now = performance.now(); + if (this.#processStartedAtMs !== undefined) { + const wasLost = this.#processLostAtMs !== undefined; + const measurements: Record = { + previousUptimeMs: now - this.#processStartedAtMs, + }; + if (this.#processLostAtMs !== undefined) { + measurements.lostDurationMs = now - this.#processLostAtMs; + } + this.#telemetry.log( + "ssh.process.replaced", + { wasLost: String(wasLost) }, + measurements, + ); + } + this.#processStartedAtMs = now; + this.#processLostAtMs = undefined; + this.#lastNetworkSample = undefined; + } + + /** Terminal teardown signal. Emits regardless of prior lost state so + * consumers always see a session-ending event. */ + public disposed(): void { + if (this.#processStartedAtMs === undefined) { + return; + } + const now = performance.now(); + const wasLost = this.#processLostAtMs !== undefined; + this.#telemetry.log( + "ssh.process.disposed", + { wasLost: String(wasLost) }, + { uptimeMs: now - this.#processStartedAtMs }, + ); + this.#processStartedAtMs = undefined; + this.#processLostAtMs = undefined; + this.#lastNetworkSample = undefined; + } + + public networkSampled(network: NetworkInfo): void { + const now = performance.now(); + const previous = this.#lastNetworkSample; + if (previous && !shouldEmitSample(previous, network, now)) { + return; + } + + this.#lastNetworkSample = { + emittedAtMs: now, + p2p: network.p2p, + preferredDerp: network.preferred_derp, + latencyMs: network.latency, + }; + this.#telemetry.log( + "ssh.network.sampled", + { + p2p: String(network.p2p), + preferredDerp: network.preferred_derp, + }, + { + latencyMs: network.latency, + downloadMbits: bytesPerSecondToMbits(network.download_bytes_sec), + uploadMbits: bytesPerSecondToMbits(network.upload_bytes_sec), + }, + ); + } +} + +/** Emit on p2p flip, DERP change, large latency swing, or heartbeat interval. */ +function shouldEmitSample( + previous: NetworkSample, + current: NetworkInfo, + now: number, +): boolean { + if (now - previous.emittedAtMs >= NETWORK_SAMPLE_INTERVAL_MS) { + return true; + } + if (current.p2p !== previous.p2p) { + return true; + } + if (current.preferred_derp !== previous.preferredDerp) { + return true; + } + return hasMeaningfulLatencyChange(current.latency, previous.latencyMs); +} + +function hasMeaningfulLatencyChange( + current: number, + previous: number, +): boolean { + if (previous === 0) { + return current !== 0; + } + return Math.abs(current - previous) / previous > NETWORK_LATENCY_CHANGE_RATIO; +} + +function bytesPerSecondToMbits(bytesPerSecond: number): number { + return (bytesPerSecond * 8) / 1_000_000; +} diff --git a/src/instrumentation/websocket.ts b/src/instrumentation/websocket.ts new file mode 100644 index 00000000..9d52ecdd --- /dev/null +++ b/src/instrumentation/websocket.ts @@ -0,0 +1,175 @@ +import type { CallerProperties } from "../telemetry/event"; +import type { TelemetryReporter } from "../telemetry/reporter"; +import type { ConnectionState } from "../websocket/reconnectingWebSocket"; + +export type ConnectionStateReason = + | "initial_connect" + | "manual_reconnect" + | "certificate_refresh" + | "scheduled_reconnect" + | "open" + | "disconnect" + | "dispose" + | "unrecoverable_close" + | "unrecoverable_http" + | "certificate_error" + | "connection_error" + | "normal_close" + | "unexpected_close"; + +export type ConnectionDropCause = + | "manual_disconnect" + | "replaced" + | "unrecoverable_close" + | "normal_close" + | "unexpected_close" + | "disposed" + | "error"; + +type ReconnectOutcome = + | { readonly result: "success" } + | { + readonly result: "error"; + readonly terminationReason: ConnectionStateReason; + }; + +interface ReconnectCycle { + readonly startMs: number; + readonly reason: ConnectionStateReason; + attempts: number; +} + +interface DropOptions { + cause: ConnectionDropCause; + code?: number; + error?: unknown; +} + +export class WebSocketTelemetry { + readonly #telemetry: TelemetryReporter; + #connectStartedAtMs: number | undefined; + #connectionOpenedAtMs: number | undefined; + #connectionDropEmitted = false; + #reconnectCycle: ReconnectCycle | undefined; + + public constructor(telemetry: TelemetryReporter) { + this.#telemetry = telemetry; + } + + public stateTransition( + from: ConnectionState, + to: ConnectionState, + reason: ConnectionStateReason, + ): void { + this.#telemetry.log("connection.state_transitioned", { + from, + to, + reason, + }); + } + + /** Stamp the connect-start time; counts an attempt if a cycle is open. */ + public connectStarted(): void { + this.#connectStartedAtMs = performance.now(); + if (this.#reconnectCycle) { + this.#reconnectCycle.attempts += 1; + } + } + + public opened(route: string): void { + const now = performance.now(); + const start = this.#connectStartedAtMs ?? now; + this.#connectionOpenedAtMs = now; + this.#connectionDropEmitted = false; + this.#connectStartedAtMs = undefined; + this.#telemetry.log( + "connection.opened", + { route }, + { connectDurationMs: now - start }, + ); + this.#finishReconnect({ result: "success" }); + } + + public dropped( + cause: ConnectionDropCause, + closeCode?: number, + error?: unknown, + ): void { + if ( + this.#connectionOpenedAtMs === undefined || + this.#connectionDropEmitted + ) { + return; + } + + const properties: CallerProperties = { cause }; + if (closeCode !== undefined) { + properties.closeCode = String(closeCode); + } + const measurements = { + connectionDurationMs: performance.now() - this.#connectionOpenedAtMs, + }; + if (error === undefined) { + this.#telemetry.log("connection.dropped", properties, measurements); + } else { + this.#telemetry.logError( + "connection.dropped", + error, + properties, + measurements, + ); + } + this.#connectionDropEmitted = true; + } + + public reset(): void { + this.#connectStartedAtMs = undefined; + this.#connectionOpenedAtMs = undefined; + this.#connectionDropEmitted = false; + this.#reconnectCycle = undefined; + } + + /** Open a reconnect cycle. No-op if one is already open. */ + public reconnectStarted(reason: ConnectionStateReason): void { + if (this.#reconnectCycle) { + return; + } + this.#reconnectCycle = { + startMs: performance.now(), + reason, + attempts: 0, + }; + } + + /** Drop and end the reconnect cycle as a failure. */ + public terminated(reason: ConnectionStateReason, options: DropOptions): void { + this.dropped(options.cause, options.code, options.error); + this.#finishReconnect({ result: "error", terminationReason: reason }); + } + + /** Drop and (re)open a reconnect cycle. */ + public retrying(reason: ConnectionStateReason, options: DropOptions): void { + this.dropped(options.cause, options.code, options.error); + this.reconnectStarted(reason); + } + + #finishReconnect(outcome: ReconnectOutcome): void { + const cycle = this.#reconnectCycle; + if (!cycle) { + return; + } + this.#reconnectCycle = undefined; + + const properties: Record = { + result: outcome.result, + reason: cycle.reason, + }; + if (outcome.result === "error") { + properties.terminationReason = outcome.terminationReason; + } + this.#telemetry.log("connection.reconnect_resolved", properties, { + attempts: cycle.attempts, + totalDurationMs: performance.now() - cycle.startMs, + }); + } +} diff --git a/src/remote/remote.ts b/src/remote/remote.ts index 0f8714d5..237b7f0b 100644 --- a/src/remote/remote.ts +++ b/src/remote/remote.ts @@ -190,7 +190,12 @@ export class Remote { // break this connection. We could force close the remote session or // disallow logging out/in altogether, but for now just use a separate // client to remain unaffected by whatever the plugin is doing. - const workspaceClient = CoderApi.create(baseUrlRaw, token, this.logger); + const workspaceClient = CoderApi.create( + baseUrlRaw, + token, + this.logger, + this.serviceContainer.getTelemetryService(), + ); disposables.push(workspaceClient); // Create 401 interceptor - handles auth failures with re-login dialog @@ -505,6 +510,7 @@ export class Remote { logger: this.logger, codeLogDir: this.pathResolver.getCodeLogDir(), remoteSshExtensionId, + telemetry: this.serviceContainer.getTelemetryService(), }); disposables.push(sshMonitor); diff --git a/src/remote/sshProcess.ts b/src/remote/sshProcess.ts index a117fa71..77b1d68a 100644 --- a/src/remote/sshProcess.ts +++ b/src/remote/sshProcess.ts @@ -3,12 +3,14 @@ import * as fs from "node:fs/promises"; import * as path from "node:path"; import * as vscode from "vscode"; +import { SshTelemetry, type ProcessLossCause } from "../instrumentation/ssh"; import { findPort } from "../util"; import { cleanupFiles } from "../util/fileCleanup"; import { NetworkStatusReporter } from "./networkStatus"; import type { Logger } from "../logging/logger"; +import type { TelemetryReporter } from "../telemetry/reporter"; /** * Network information from the Coder CLI. @@ -40,6 +42,7 @@ export interface SshProcessMonitorOptions { // For port-based SSH process discovery codeLogDir: string; remoteSshExtensionId: string; + telemetry: TelemetryReporter; } // 1 hour cleanup threshold for old network info files @@ -56,8 +59,11 @@ const CLEANUP_MAX_LOG_FILES = 20; export class SshProcessMonitor implements vscode.Disposable { private readonly statusBarItem: vscode.StatusBarItem; private readonly options: Required< - SshProcessMonitorOptions & { proxyLogDir: string | undefined } - >; + Omit + > & { + readonly proxyLogDir: string | undefined; + }; + private readonly telemetry: SshTelemetry; private readonly _onLogFilePathChange = new vscode.EventEmitter< string | undefined @@ -77,7 +83,10 @@ export class SshProcessMonitor implements vscode.Disposable { private disposed = false; private currentPid: number | undefined; private logFilePath: string | undefined; - private pendingTimeout: NodeJS.Timeout | undefined; + private readonly pendingDelays = new Set<{ + timer: NodeJS.Timeout; + resolve: () => void; + }>(); private lastStaleSearchTime = 0; private readonly reporter: NetworkStatusReporter; @@ -126,6 +135,7 @@ export class SshProcessMonitor implements vscode.Disposable { // Matches the SSH update interval networkPollInterval: options.networkPollInterval ?? 3000, }; + this.telemetry = new SshTelemetry(options.telemetry); this.statusBarItem = vscode.window.createStatusBarItem( vscode.StatusBarAlignment.Left, 1000, @@ -181,11 +191,14 @@ export class SshProcessMonitor implements vscode.Disposable { if (this.disposed) { return; } + this.telemetry.disposed(); this.disposed = true; - if (this.pendingTimeout) { - clearTimeout(this.pendingTimeout); - this.pendingTimeout = undefined; + // Unblock all in-flight delay() calls so concurrent loops exit promptly. + for (const handle of this.pendingDelays) { + clearTimeout(handle.timer); + handle.resolve(); } + this.pendingDelays.clear(); this.statusBarItem.dispose(); this._onLogFilePathChange.dispose(); this._onPidChange.dispose(); @@ -199,10 +212,14 @@ export class SshProcessMonitor implements vscode.Disposable { return; } await new Promise((resolve) => { - this.pendingTimeout = setTimeout(() => { - this.pendingTimeout = undefined; - resolve(); - }, ms); + const handle = { + timer: setTimeout(() => { + this.pendingDelays.delete(handle); + resolve(); + }, ms), + resolve, + }; + this.pendingDelays.add(handle); }); } @@ -211,18 +228,33 @@ export class SshProcessMonitor implements vscode.Disposable { * Starts monitoring when it finds the process through the port. */ private async searchForProcess(): Promise { + const pid = await this.telemetry.traceProcessDiscovery(() => + this.discoverSshProcess(), + ); + if (pid === undefined || this.disposed) { + return; + } + + this.setCurrentPid(pid); + this.startMonitoring(); + } + + private async discoverSshProcess(): Promise<{ + pid: number | undefined; + attempts: number; + }> { const { discoveryPollIntervalMs, maxDiscoveryBackoffMs, logger, sshHost } = this.options; - let attempt = 0; + let attempts = 0; let currentBackoff = discoveryPollIntervalMs; let lastFoundPort: number | undefined; while (!this.disposed) { - attempt++; + attempts++; - if (attempt === 1 || attempt % 10 === 0) { + if (attempts === 1 || attempts % 10 === 0) { logger.debug( - `SSH process search attempt ${attempt} for host: ${sshHost}`, + `SSH process search attempt ${attempts} for host: ${sshHost}`, ); } @@ -244,14 +276,14 @@ export class SshProcessMonitor implements vscode.Disposable { } if (pid !== undefined) { - this.setCurrentPid(pid); - this.startMonitoring(); - return; + return { pid, attempts }; } await this.delay(currentBackoff); currentBackoff = Math.min(currentBackoff * 2, maxDiscoveryBackoffMs); } + + return { pid: undefined, attempts }; } /** @@ -307,16 +339,24 @@ export class SshProcessMonitor implements vscode.Disposable { this.currentPid = pid; if (previousPid === undefined) { + this.telemetry.processStarted(); this.options.logger.info(`SSH connection established (PID: ${pid})`); this._onPidChange.fire(pid); - } else if (previousPid !== pid) { - this.options.logger.info( - `SSH process changed from ${previousPid} to ${pid}`, - ); - this.logFilePath = undefined; - this._onLogFilePathChange.fire(undefined); - this._onPidChange.fire(pid); + return; + } + + if (previousPid === pid) { + this.telemetry.processRecovered(); + return; } + + this.telemetry.processReplaced(); + this.options.logger.info( + `SSH process changed from ${previousPid} to ${pid}`, + ); + this.logFilePath = undefined; + this._onLogFilePathChange.fire(undefined); + this._onPidChange.fire(pid); } /** @@ -408,6 +448,7 @@ export class SshProcessMonitor implements vscode.Disposable { const network = JSON.parse(content) as NetworkInfo; const isStale = ageMs > networkPollInterval * 2; this.reporter.update(network, isStale); + this.telemetry.networkSampled(network); } } catch (error) { readFailures++; @@ -420,6 +461,10 @@ export class SshProcessMonitor implements vscode.Disposable { } if (searchReason !== undefined) { + const lossCause: ProcessLossCause = + readFailures >= maxReadFailures + ? "missing_network_info" + : "stale_network_info"; const timeSinceLastSearch = Date.now() - this.lastStaleSearchTime; if (timeSinceLastSearch < staleThreshold) { await this.delay(staleThreshold - timeSinceLastSearch); @@ -427,6 +472,7 @@ export class SshProcessMonitor implements vscode.Disposable { } logger.debug(`${searchReason}, searching for new SSH process`); + this.telemetry.processLost(lossCause); // searchForProcess will update PID if a different process is found this.lastStaleSearchTime = Date.now(); await this.searchForProcess(); diff --git a/src/telemetry/reporter.ts b/src/telemetry/reporter.ts new file mode 100644 index 00000000..e46811b4 --- /dev/null +++ b/src/telemetry/reporter.ts @@ -0,0 +1,29 @@ +import { NOOP_SPAN, type Span } from "./span"; + +import type { CallerMeasurements, CallerProperties } from "./event"; + +export interface TelemetryReporter { + log( + eventName: string, + properties?: CallerProperties, + measurements?: CallerMeasurements, + ): void; + logError( + eventName: string, + error: unknown, + properties?: CallerProperties, + measurements?: CallerMeasurements, + ): void; + trace( + eventName: string, + fn: (span: Span) => Promise, + properties?: CallerProperties, + measurements?: CallerMeasurements, + ): Promise; +} + +export const NOOP_TELEMETRY_REPORTER: TelemetryReporter = { + log: () => undefined, + logError: () => undefined, + trace: (_eventName, fn) => fn(NOOP_SPAN), +}; diff --git a/src/telemetry/service.ts b/src/telemetry/service.ts index 9ada725d..cc78a638 100644 --- a/src/telemetry/service.ts +++ b/src/telemetry/service.ts @@ -19,6 +19,8 @@ import { import { newSpanId, newTraceId } from "./ids"; import { NOOP_SPAN, type Span } from "./span"; +import type { TelemetryReporter } from "./reporter"; + const LEVEL_ORDER: Readonly> = { off: 0, local: 1, @@ -44,7 +46,7 @@ interface EmitOptions extends Partial { * by `minLevel` and may self-gate. `dispose` flushes are best-effort since * VS Code does not await deactivation. */ -export class TelemetryService implements vscode.Disposable { +export class TelemetryService implements vscode.Disposable, TelemetryReporter { #level: TelemetryLevel; #nextSequence = 0; #deploymentUrl = ""; @@ -141,6 +143,8 @@ export class TelemetryService implements vscode.Disposable { spanOpts: SpanOptions, ): Promise { const eventId = newSpanId(); + const spanProperties = { ...properties }; + const spanMeasurements = { ...measurements }; const { traceId, traceLevel } = spanOpts; const span: Span = { traceId, @@ -161,13 +165,19 @@ export class TelemetryService implements vscode.Disposable { { traceId, parentEventId: eventId, traceLevel }, ); }, + setProperty(name: string, value: string): void { + spanProperties[name] = value; + }, + setMeasurement(name: string, value: number): void { + spanMeasurements[name] = value; + }, }; return this.#emitTimed( eventId, eventName, () => fn(span), - properties, - measurements, + spanProperties, + spanMeasurements, spanOpts, ); } diff --git a/src/telemetry/span.ts b/src/telemetry/span.ts index 626c4bb4..0e27696b 100644 --- a/src/telemetry/span.ts +++ b/src/telemetry/span.ts @@ -1,4 +1,4 @@ -import { type CallerProperties } from "./event"; +import type { CallerProperties } from "./event"; /** * Parent span handle. Children's `eventName` composes as `${parent.eventName}.${phaseName}`. @@ -16,6 +16,10 @@ export interface Span { properties?: CallerProperties, measurements?: Record, ): Promise; + /** Add or replace a property on the event emitted for this span. */ + setProperty(name: string, value: string): void; + /** Add or replace a measurement on the event emitted for this span. */ + setMeasurement(name: string, value: number): void; } /** No-op `Span` used when telemetry is off. Runs phase fns but emits nothing. */ @@ -31,4 +35,6 @@ export const NOOP_SPAN: Span = { ): Promise { return fn(NOOP_SPAN); }, + setProperty: () => undefined, + setMeasurement: () => undefined, }; diff --git a/src/websocket/reconnectingWebSocket.ts b/src/websocket/reconnectingWebSocket.ts index bd33a247..fb9e1307 100644 --- a/src/websocket/reconnectingWebSocket.ts +++ b/src/websocket/reconnectingWebSocket.ts @@ -1,5 +1,10 @@ import { ClientCertificateError } from "../error/clientCertificateError"; import { toError } from "../error/errorUtils"; +import { + WebSocketTelemetry, + type ConnectionDropCause, + type ConnectionStateReason, +} from "../instrumentation/websocket"; import { WebSocketCloseCode, @@ -11,12 +16,18 @@ import { import type { WebSocketEventType } from "coder/site/src/utils/OneWayWebSocket"; import type { Logger } from "../logging/logger"; +import type { TelemetryReporter } from "../telemetry/reporter"; import type { + CloseEvent, EventHandler, UnidirectionalStream, } from "./eventStreamConnection"; +function toCloseEventError(event: CloseEvent): Error { + return new Error(`WebSocket closed with code ${event.code}: ${event.reason}`); +} + /** * Connection states for the ReconnectingWebSocket state machine. */ @@ -103,6 +114,7 @@ export interface ReconnectingWebSocketOptions { initialBackoffMs?: number; maxBackoffMs?: number; jitterFactor?: number; + telemetry: TelemetryReporter; /** Callback invoked when a refreshable certificate error is detected. Returns true if refresh succeeded. */ onCertificateRefreshNeeded: () => Promise; } @@ -112,7 +124,8 @@ export class ReconnectingWebSocket< > implements UnidirectionalStream { readonly #socketFactory: SocketFactory; readonly #logger: Logger; - readonly #options: Required; + readonly #telemetry: WebSocketTelemetry; + readonly #options: Required>; readonly #eventHandlers: { [K in WebSocketEventType]: Set>; } = { @@ -133,19 +146,21 @@ export class ReconnectingWebSocket< /** * Dispatch an action to transition state. Returns true if transition is allowed. */ - #dispatch(action: StateAction): boolean { - const newState = reduceState(this.#state, action); - if (newState === this.#state) { + #dispatch(action: StateAction, reason: ConnectionStateReason): boolean { + const previousState = this.#state; + const newState = reduceState(previousState, action); + if (newState === previousState) { // Allow CONNECT from CONNECTING as a "restart" operation if ( action.type === "CONNECT" && - this.#state === ConnectionState.CONNECTING + previousState === ConnectionState.CONNECTING ) { return true; } return false; } this.#state = newState; + this.#telemetry.stateTransition(previousState, newState, reason); return true; } @@ -157,6 +172,7 @@ export class ReconnectingWebSocket< ) { this.#socketFactory = socketFactory; this.#logger = logger; + this.#telemetry = new WebSocketTelemetry(options.telemetry); this.#options = { initialBackoffMs: options.initialBackoffMs ?? 250, maxBackoffMs: options.maxBackoffMs ?? 30000, @@ -180,8 +196,7 @@ export class ReconnectingWebSocket< onDispose, ); - // connect() handles all errors internally - await instance.connect(); + await instance.connect("initial_connect"); return instance; } @@ -228,10 +243,18 @@ export class ReconnectingWebSocket< * Resumes the socket if previously disconnected via disconnect(). */ public reconnect(): void { + this.#reconnectInternal("manual_reconnect"); + } + + #reconnectInternal(reason: ConnectionStateReason): void { if (this.#state === ConnectionState.DISPOSED) { return; } + if (this.#state !== ConnectionState.IDLE) { + this.#telemetry.reconnectStarted(reason); + } + if (this.#state === ConnectionState.DISCONNECTED) { this.#backoffMs = this.#options.initialBackoffMs; this.#certRefreshAttempted = false; // User-initiated reconnect, allow retry @@ -242,18 +265,33 @@ export class ReconnectingWebSocket< this.#reconnectTimeoutId = null; } - // connect() handles all errors internally - void this.connect(); + void this.connect(reason); } /** * Temporarily disconnect the socket. Can be resumed via reconnect(). */ public disconnect(code?: number, reason?: string): void { - if (!this.#dispatch({ type: "DISCONNECT" })) { + this.disconnectWithReason("disconnect", "manual_disconnect", { + code, + closeReason: reason, + }); + } + + private disconnectWithReason( + reason: ConnectionStateReason, + cause: ConnectionDropCause, + options: { code?: number; closeReason?: string; error?: unknown } = {}, + ): void { + if (!this.#dispatch({ type: "DISCONNECT" }, reason)) { return; } - this.clearCurrentSocket(code, reason); + this.#telemetry.terminated(reason, { + cause, + code: options.code, + error: options.error, + }); + this.clearCurrentSocket(options.code, options.closeReason); } public close(code?: number, reason?: string): void { @@ -273,13 +311,14 @@ export class ReconnectingWebSocket< this.dispose(code, reason); } - private async connect(): Promise { - if (!this.#dispatch({ type: "CONNECT" })) { + private async connect(reason: ConnectionStateReason): Promise { + if (!this.#dispatch({ type: "CONNECT" }, reason)) { return; } + this.#telemetry.connectStarted(); try { - // Close any existing socket before creating a new one if (this.#currentSocket) { + this.#telemetry.dropped("replaced", WebSocketCloseCode.NORMAL); this.#currentSocket.close( WebSocketCloseCode.NORMAL, "Replacing connection", @@ -303,9 +342,10 @@ export class ReconnectingWebSocket< return; } - if (!this.#dispatch({ type: "OPEN" })) { + if (!this.#dispatch({ type: "OPEN" }, "open")) { return; } + this.#telemetry.opened(this.#route); // Reset backoff on successful connection this.#backoffMs = this.#options.initialBackoffMs; this.#certRefreshAttempted = false; @@ -334,42 +374,64 @@ export class ReconnectingWebSocket< }); socket.addEventListener("close", (event) => { - if (this.#currentSocket !== socket) { - return; + if (this.#currentSocket === socket) { + this.handleSocketClose(event); } + }); + } catch (error) { + await this.handleConnectionError(error); + } + } - if ( - this.#state === ConnectionState.DISPOSED || - this.#state === ConnectionState.DISCONNECTED - ) { - return; - } + private handleSocketClose(event: CloseEvent): void { + if ( + this.#state === ConnectionState.DISPOSED || + this.#state === ConnectionState.DISCONNECTED + ) { + return; + } - this.executeHandlers("close", event); + this.executeHandlers("close", event); - if (UNRECOVERABLE_WS_CLOSE_CODES.has(event.code)) { - this.#logger.error( - `WebSocket connection closed with unrecoverable error code ${event.code}`, - ); - this.disconnect(); - return; - } - - if (NORMAL_CLOSURE_CODES.has(event.code)) { - return; - } + if (UNRECOVERABLE_WS_CLOSE_CODES.has(event.code)) { + this.#logger.error( + `WebSocket connection closed with unrecoverable error code ${event.code}`, + ); + this.disconnectWithReason("unrecoverable_close", "unrecoverable_close", { + code: event.code, + closeReason: event.reason, + error: toCloseEventError(event), + }); + return; + } - this.scheduleReconnect(); + if (NORMAL_CLOSURE_CODES.has(event.code)) { + this.disconnectWithReason("normal_close", "normal_close", { + code: event.code, + closeReason: event.reason, }); - } catch (error) { - await this.handleConnectionError(error); + return; } + + this.scheduleReconnect("unexpected_close", "unexpected_close", { + code: event.code, + error: toCloseEventError(event), + }); } - private scheduleReconnect(): void { - if (!this.#dispatch({ type: "SCHEDULE_RETRY" })) { + private scheduleReconnect( + reason: ConnectionStateReason, + cause: ConnectionDropCause, + options: { code?: number; error?: unknown } = {}, + ): void { + if (!this.#dispatch({ type: "SCHEDULE_RETRY" }, reason)) { return; } + this.#telemetry.retrying(reason, { + cause, + code: options.code, + error: options.error, + }); const jitter = this.#backoffMs * this.#options.jitterFactor * (Math.random() * 2 - 1); @@ -381,29 +443,13 @@ export class ReconnectingWebSocket< this.#reconnectTimeoutId = setTimeout(() => { this.#reconnectTimeoutId = null; - // connect() handles all errors internally - void this.connect(); + void this.connect("scheduled_reconnect"); }, delayMs); this.#backoffMs = Math.min(this.#backoffMs * 2, this.#options.maxBackoffMs); } - /** - * Attempt to refresh certificates and return true if refresh succeeded. - */ - private async attemptCertificateRefresh(): Promise { - try { - return await this.#options.onCertificateRefreshNeeded(); - } catch (refreshError) { - this.#logger.error("Error during certificate refresh:", refreshError); - return false; - } - } - - /** - * Handle client certificate errors by attempting refresh for refreshable errors. - * @returns true if refresh succeeded. - */ + /** Returns true if refresh succeeded and the caller should retry. */ private async handleClientCertificateError( certError: ClientCertificateError, ): Promise { @@ -415,17 +461,20 @@ export class ReconnectingWebSocket< } if (certError.isRefreshable) { - this.#certRefreshAttempted = true; // Mark that we're attempting + this.#certRefreshAttempted = true; this.#logger.info( `Client certificate error (alert ${certError.alertCode}), attempting refresh...`, ); - if (await this.attemptCertificateRefresh()) { - this.#logger.info("Certificate refresh succeeded, reconnecting..."); - return true; + try { + if (await this.#options.onCertificateRefreshNeeded()) { + this.#logger.info("Certificate refresh succeeded, reconnecting..."); + return true; + } + } catch (refreshError) { + this.#logger.error("Error during certificate refresh:", refreshError); } } - // Show notification for failed/non-refreshable errors void certError.showNotification(); return false; } @@ -467,7 +516,7 @@ export class ReconnectingWebSocket< `Unrecoverable HTTP error during connection for ${this.#route}`, error, ); - this.disconnect(); + this.disconnectWithReason("unrecoverable_http", "error", { error }); return; } @@ -475,15 +524,15 @@ export class ReconnectingWebSocket< const certError = ClientCertificateError.fromError(error); if (certError) { if (await this.handleClientCertificateError(certError)) { - this.reconnect(); + this.#reconnectInternal("certificate_refresh"); } else { - this.disconnect(); + this.disconnectWithReason("certificate_error", "error", { error }); } return; } this.#logger.warn(`WebSocket connection failed for ${this.#route}`, error); - this.scheduleReconnect(); + this.scheduleReconnect("connection_error", "error", { error }); } /** @@ -500,9 +549,10 @@ export class ReconnectingWebSocket< } private dispose(code?: number, reason?: string): void { - if (!this.#dispatch({ type: "DISPOSE" })) { + if (!this.#dispatch({ type: "DISPOSE" }, "dispose")) { return; } + this.#telemetry.terminated("dispose", { cause: "disposed", code }); this.clearCurrentSocket(code, reason); for (const set of Object.values(this.#eventHandlers)) { @@ -522,5 +572,6 @@ export class ReconnectingWebSocket< this.#currentSocket.close(code, reason); this.#currentSocket = null; } + this.#telemetry.reset(); } } diff --git a/test/mocks/telemetry.ts b/test/mocks/telemetry.ts index 4a9c1886..7ddc13eb 100644 --- a/test/mocks/telemetry.ts +++ b/test/mocks/telemetry.ts @@ -1,10 +1,14 @@ import { vi } from "vitest"; -import type { - TelemetryEvent, - TelemetryLevel, - TelemetrySink, +import { + buildSession, + type TelemetryEvent, + type TelemetryLevel, + type TelemetrySink, } from "@/telemetry/event"; +import { TelemetryService } from "@/telemetry/service"; + +import { createMockLogger, MockConfigurationProvider } from "./testHelpers"; /** * In-memory `TelemetrySink` for tests. Captures every written event and @@ -25,4 +29,26 @@ export class TestSink implements TelemetrySink { write(event: TelemetryEvent): void { this.events.push(event); } + + eventsNamed(name: string): TelemetryEvent[] { + return this.events.filter((e) => e.eventName === name); + } +} + +/** + * Build a real `TelemetryService` wired to a single `TestSink` for assertions. + * Sets `coder.telemetry.level` to `"local"` so locally-scoped events flow. + */ +export function createTestTelemetry(): { + telemetry: TelemetryService; + sink: TestSink; +} { + new MockConfigurationProvider().set("coder.telemetry.level", "local"); + const sink = new TestSink(); + const telemetry = new TelemetryService( + buildSession("test", "test-session"), + [sink], + createMockLogger(), + ); + return { telemetry, sink }; } diff --git a/test/unit/instrumentation/ssh.test.ts b/test/unit/instrumentation/ssh.test.ts new file mode 100644 index 00000000..fbeea890 --- /dev/null +++ b/test/unit/instrumentation/ssh.test.ts @@ -0,0 +1,225 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { SshTelemetry } from "@/instrumentation/ssh"; + +import { createTestTelemetry } from "../../mocks/telemetry"; +import { makeNetworkInfo } from "../../mocks/testHelpers"; + +function setup() { + const { telemetry, sink } = createTestTelemetry(); + return { ssh: new SshTelemetry(telemetry), sink }; +} + +describe("SshTelemetry", () => { + describe("traceProcessDiscovery", () => { + interface DiscoveryCase { + pid: number | undefined; + attempts: number; + found: string; + } + it.each([ + { pid: 123, attempts: 2, found: "true" }, + { pid: undefined, attempts: 5, found: "false" }, + ])( + "emits found=$found and attempts=$attempts based on the result", + async ({ pid, attempts, found }) => { + const { ssh, sink } = setup(); + + await ssh.traceProcessDiscovery(() => + Promise.resolve({ pid, attempts }), + ); + + const [event] = sink.eventsNamed("ssh.process.discovered"); + expect(event.properties).toMatchObject({ result: "success", found }); + expect(event.measurements.attempts).toBe(attempts); + }, + ); + }); + + describe("processReplaced", () => { + it("emits a replacement (not recovery) when the prior process was lost", () => { + const { ssh, sink } = setup(); + + ssh.processStarted(); + ssh.processLost("stale_network_info"); + ssh.processReplaced(); + + expect(sink.events.map((e) => e.eventName)).toEqual([ + "ssh.process.lost", + "ssh.process.replaced", + ]); + const [replaced] = sink.eventsNamed("ssh.process.replaced"); + expect(replaced.properties).toMatchObject({ wasLost: "true" }); + expect(replaced.measurements).toMatchObject({ + previousUptimeMs: expect.any(Number), + lostDurationMs: expect.any(Number), + }); + }); + + it("emits a replacement event for an instant handover", () => { + const { ssh, sink } = setup(); + + ssh.processStarted(); + ssh.processReplaced(); + + const replaced = sink.eventsNamed("ssh.process.replaced"); + expect(replaced).toHaveLength(1); + expect(replaced[0].properties).toMatchObject({ wasLost: "false" }); + expect(replaced[0].measurements).toMatchObject({ + previousUptimeMs: expect.any(Number), + }); + expect(replaced[0].measurements.lostDurationMs).toBeUndefined(); + }); + + it("emits nothing if there was no prior process", () => { + const { ssh, sink } = setup(); + + ssh.processReplaced(); + + expect(sink.events).toHaveLength(0); + }); + }); + + describe("processRecovered", () => { + it("is a no-op when the process is not currently lost", () => { + const { ssh, sink } = setup(); + + ssh.processStarted(); + ssh.processRecovered(); + + expect(sink.events).toHaveLength(0); + }); + + it("emits ssh.process.recovered with recoveryDurationMs after a loss", () => { + const { ssh, sink } = setup(); + + ssh.processStarted(); + ssh.processLost("stale_network_info"); + ssh.processRecovered(); + + const [event] = sink.eventsNamed("ssh.process.recovered"); + expect(event.measurements.recoveryDurationMs).toEqual(expect.any(Number)); + }); + + it("does not double-emit when called twice without another loss", () => { + const { ssh, sink } = setup(); + + ssh.processStarted(); + ssh.processLost("stale_network_info"); + ssh.processRecovered(); + ssh.processRecovered(); + + expect(sink.eventsNamed("ssh.process.recovered")).toHaveLength(1); + }); + }); + + describe("processLost", () => { + it("is a no-op when there is no started process", () => { + const { ssh, sink } = setup(); + + ssh.processLost("stale_network_info"); + + expect(sink.events).toHaveLength(0); + }); + + it("does not double-emit when called twice without a recovery", () => { + const { ssh, sink } = setup(); + + ssh.processStarted(); + ssh.processLost("stale_network_info"); + ssh.processLost("missing_network_info"); + + const lost = sink.eventsNamed("ssh.process.lost"); + expect(lost).toHaveLength(1); + expect(lost[0].properties.cause).toBe("stale_network_info"); + }); + }); + + describe("disposed", () => { + it("is a no-op when there is no started process", () => { + const { ssh, sink } = setup(); + + ssh.disposed(); + + expect(sink.events).toHaveLength(0); + }); + + interface DisposedCase { + name: string; + lose: boolean; + wasLost: string; + } + it.each([ + { name: "from a healthy state", lose: false, wasLost: "false" }, + { name: "after the process was lost", lose: true, wasLost: "true" }, + ])("emits a terminal event $name", ({ lose, wasLost }) => { + const { ssh, sink } = setup(); + + ssh.processStarted(); + if (lose) { + ssh.processLost("stale_network_info"); + } + ssh.disposed(); + + const [event] = sink.eventsNamed("ssh.process.disposed"); + expect(event.properties).toMatchObject({ wasLost }); + expect(event.measurements.uptimeMs).toEqual(expect.any(Number)); + }); + }); + + describe("networkSampled", () => { + beforeEach(() => vi.useFakeTimers()); + afterEach(() => vi.useRealTimers()); + + it.each([ + { name: "no change in window", next: {}, advanceMs: 1_000, expected: 1 }, + { + name: "small latency change (under 10%)", + next: { latency: 51 }, + advanceMs: 1_000, + expected: 1, + }, + { name: "p2p flip", next: { p2p: false }, advanceMs: 1_000, expected: 2 }, + { + name: "DERP region change", + next: { preferred_derp: "SFO" }, + advanceMs: 1_000, + expected: 2, + }, + { + name: "large latency swing (over 10%)", + next: { latency: 100 }, + advanceMs: 1_000, + expected: 2, + }, + { + name: "heartbeat after 60s without change", + next: {}, + advanceMs: 60_000, + expected: 2, + }, + ])("$name -> $expected sample(s)", ({ next, advanceMs, expected }) => { + const { ssh, sink } = setup(); + + ssh.networkSampled(makeNetworkInfo()); + vi.advanceTimersByTime(advanceMs); + ssh.networkSampled(makeNetworkInfo(next)); + + expect(sink.eventsNamed("ssh.network.sampled")).toHaveLength(expected); + }); + + it("includes p2p, preferredDerp, latency, and bandwidth in the emitted sample", () => { + const { ssh, sink } = setup(); + + ssh.networkSampled(makeNetworkInfo({ latency: 25 })); + + const [sample] = sink.eventsNamed("ssh.network.sampled"); + expect(sample.properties).toEqual({ p2p: "true", preferredDerp: "NYC" }); + expect(sample.measurements).toMatchObject({ + latencyMs: 25, + downloadMbits: expect.any(Number), + uploadMbits: expect.any(Number), + }); + }); + }); +}); diff --git a/test/unit/instrumentation/websocket.test.ts b/test/unit/instrumentation/websocket.test.ts new file mode 100644 index 00000000..e4420fa8 --- /dev/null +++ b/test/unit/instrumentation/websocket.test.ts @@ -0,0 +1,205 @@ +import { describe, expect, it } from "vitest"; + +import { WebSocketTelemetry } from "@/instrumentation/websocket"; +import { ConnectionState } from "@/websocket/reconnectingWebSocket"; + +import { createTestTelemetry } from "../../mocks/telemetry"; + +function setup() { + const { telemetry, sink } = createTestTelemetry(); + return { ws: new WebSocketTelemetry(telemetry), sink }; +} + +describe("WebSocketTelemetry", () => { + describe("stateTransition", () => { + it("emits a connection.state_transitioned event with from/to/reason", () => { + const { ws, sink } = setup(); + + ws.stateTransition( + ConnectionState.IDLE, + ConnectionState.CONNECTING, + "initial_connect", + ); + + const [event] = sink.eventsNamed("connection.state_transitioned"); + expect(event.properties).toEqual({ + from: "IDLE", + to: "CONNECTING", + reason: "initial_connect", + }); + }); + }); + + describe("opened", () => { + it("emits connection.opened with route and connect duration", () => { + const { ws, sink } = setup(); + + ws.connectStarted(); + ws.opened("/api/test"); + + const [event] = sink.eventsNamed("connection.opened"); + expect(event).toMatchObject({ + properties: { route: "/api/test" }, + measurements: { connectDurationMs: expect.any(Number) }, + }); + }); + + it("uses 0 duration when connectStarted was not called", () => { + const { ws, sink } = setup(); + + ws.opened("/api/test"); + + const [event] = sink.eventsNamed("connection.opened"); + expect(event.measurements.connectDurationMs).toBe(0); + }); + }); + + describe("dropped", () => { + it("is silent when no connection has been opened", () => { + const { ws, sink } = setup(); + + ws.dropped("error"); + + expect(sink.events).toHaveLength(0); + }); + + it("emits connection.dropped with cause and close code", () => { + const { ws, sink } = setup(); + + ws.opened("/api/test"); + ws.dropped("unexpected_close", 1006); + + const [event] = sink.eventsNamed("connection.dropped"); + expect(event.properties).toMatchObject({ + cause: "unexpected_close", + closeCode: "1006", + }); + expect(event.measurements.connectionDurationMs).toEqual( + expect.any(Number), + ); + }); + + it("emits via logError when an error is provided", () => { + const { ws, sink } = setup(); + + ws.opened("/api/test"); + ws.dropped("error", 1006, new Error("boom")); + + const [event] = sink.eventsNamed("connection.dropped"); + expect(event.error).toMatchObject({ message: "boom" }); + }); + + it("does not double-emit when called twice", () => { + const { ws, sink } = setup(); + + ws.opened("/api/test"); + ws.dropped("normal_close"); + ws.dropped("error"); + + expect(sink.eventsNamed("connection.dropped")).toHaveLength(1); + }); + }); + + describe("reset", () => { + it("clears state so a subsequent dropped is silent", () => { + const { ws, sink } = setup(); + + ws.opened("/api/test"); + ws.reset(); + ws.dropped("error"); + + expect(sink.eventsNamed("connection.dropped")).toHaveLength(0); + }); + + it("clears any open reconnect cycle", () => { + const { ws, sink } = setup(); + + ws.reconnectStarted("manual_reconnect"); + ws.connectStarted(); + ws.reset(); + ws.opened("/api/test"); + + expect(sink.eventsNamed("connection.reconnect_resolved")).toHaveLength(0); + }); + }); + + describe("reconnect cycle", () => { + it("emits connection.reconnect_resolved with success when opened closes the cycle", () => { + const { ws, sink } = setup(); + + ws.reconnectStarted("manual_reconnect"); + ws.connectStarted(); + ws.opened("/api/test"); + + const [event] = sink.eventsNamed("connection.reconnect_resolved"); + expect(event).toMatchObject({ + properties: { result: "success", reason: "manual_reconnect" }, + measurements: { attempts: 1, totalDurationMs: expect.any(Number) }, + }); + }); + + it("emits connection.reconnect_resolved with error when terminated closes the cycle", () => { + const { ws, sink } = setup(); + + ws.reconnectStarted("manual_reconnect"); + ws.terminated("unrecoverable_http", { cause: "error" }); + + const [event] = sink.eventsNamed("connection.reconnect_resolved"); + expect(event.properties).toEqual({ + result: "error", + reason: "manual_reconnect", + terminationReason: "unrecoverable_http", + }); + }); + + it("does not emit when terminated is called outside a cycle", () => { + const { ws, sink } = setup(); + + ws.terminated("dispose", { cause: "disposed" }); + + expect(sink.eventsNamed("connection.reconnect_resolved")).toHaveLength(0); + }); + + it("counts each connectStarted as an attempt within the cycle", () => { + const { ws, sink } = setup(); + + ws.reconnectStarted("scheduled_reconnect"); + ws.connectStarted(); + ws.connectStarted(); + ws.connectStarted(); + ws.opened("/api/test"); + + expect( + sink.eventsNamed("connection.reconnect_resolved")[0].measurements + .attempts, + ).toBe(3); + }); + + it("ignores reconnectStarted while a cycle is already open", () => { + const { ws, sink } = setup(); + + ws.reconnectStarted("manual_reconnect"); + ws.reconnectStarted("scheduled_reconnect"); + ws.opened("/api/test"); + + expect( + sink.eventsNamed("connection.reconnect_resolved")[0].properties.reason, + ).toBe("manual_reconnect"); + }); + + it("retrying drops the connection and opens a cycle", () => { + const { ws, sink } = setup(); + + ws.opened("/api/test"); + ws.retrying("unexpected_close", { + cause: "unexpected_close", + code: 1006, + }); + + expect(sink.eventsNamed("connection.dropped")).toHaveLength(1); + + ws.opened("/api/test"); + expect(sink.eventsNamed("connection.reconnect_resolved")).toHaveLength(1); + }); + }); +}); diff --git a/test/unit/remote/sshProcess.test.ts b/test/unit/remote/sshProcess.test.ts index f562ea5d..fed7e4ec 100644 --- a/test/unit/remote/sshProcess.test.ts +++ b/test/unit/remote/sshProcess.test.ts @@ -10,7 +10,9 @@ import { type NetworkInfo, type SshProcessMonitorOptions, } from "@/remote/sshProcess"; +import { NOOP_TELEMETRY_REPORTER } from "@/telemetry/reporter"; +import { createTestTelemetry } from "../../mocks/telemetry"; import { createMockLogger, makeNetworkInfo, @@ -312,6 +314,203 @@ describe("SshProcessMonitor", () => { }); }); + describe("telemetry", () => { + const sshLog = { + "/logs/ms-vscode-remote.remote-ssh/1-Remote - SSH.log": + "-> socksPort 12345 ->", + }; + + function keepNetworkFileFresh(filePath: string, ms = 90_000): void { + const mtime = (Date.now() + ms) / 1000; + vol.utimesSync(filePath, mtime, mtime); + } + + it("emits a process discovered trace once a PID is found", async () => { + const { telemetry, sink } = createTestTelemetry(); + vol.fromJSON(sshLog); + + const monitor = createMonitor({ telemetry }); + await waitForEvent(monitor.onPidChange); + + expect(sink.eventsNamed("ssh.process.discovered")[0]).toMatchObject({ + properties: { result: "success", found: "true" }, + measurements: { + durationMs: expect.any(Number), + attempts: expect.any(Number), + }, + }); + }); + + it("emits found=false when discovery is abandoned by dispose", async () => { + const { telemetry, sink } = createTestTelemetry(); + vol.fromJSON({ + "/logs/ms-vscode-remote.remote-ssh/1-Remote - SSH.log": + "-> socksPort 12345 ->", + }); + vi.mocked(find).mockResolvedValue([]); + + const monitor = createMonitor({ telemetry }); + // Allow at least one discovery iteration to start, then abandon it. + await new Promise((r) => setTimeout(r, 30)); + monitor.dispose(); + + await waitFor( + () => sink.eventsNamed("ssh.process.discovered").length > 0, + ); + expect(sink.eventsNamed("ssh.process.discovered")[0]).toMatchObject({ + properties: { result: "success", found: "false" }, + }); + }); + + it("emits ssh.process.disposed when dispose follows a successful discovery", async () => { + const { telemetry, sink } = createTestTelemetry(); + vol.fromJSON(sshLog); + + const monitor = createMonitor({ telemetry }); + await waitForEvent(monitor.onPidChange); + monitor.dispose(); + + expect(sink.eventsNamed("ssh.process.disposed")[0]).toMatchObject({ + properties: { wasLost: "false" }, + measurements: { uptimeMs: expect.any(Number) }, + }); + }); + + it("emits ssh.process.disposed with wasLost=true when dispose follows a loss", async () => { + const { telemetry, sink } = createTestTelemetry(); + vol.fromJSON({ + ...sshLog, + "/network/999.json": makeNetworkJson(), + }); + // Second discovery hangs so the lost->disposed sequence is observable. + vi.mocked(find) + .mockResolvedValueOnce([{ pid: 999, ppid: 1, name: "ssh", cmd: "ssh" }]) + .mockResolvedValue([]); + + const monitor = createMonitor({ + networkInfoPath: "/network", + networkPollInterval: 10, + telemetry, + }); + await waitForEvent(monitor.onPidChange); + await waitFor(() => sink.eventsNamed("ssh.process.lost").length > 0); + monitor.dispose(); + + const disposed = sink.eventsNamed("ssh.process.disposed"); + expect(disposed).toHaveLength(1); + expect(disposed[0].properties).toMatchObject({ wasLost: "true" }); + }); + + it("emits missing_network_info as the loss cause when reads fail repeatedly", async () => { + vi.useFakeTimers(); + const { telemetry, sink } = createTestTelemetry(); + vol.fromJSON(sshLog); + // No /network/999.json. Every read fails until threshold is reached. + vi.mocked(find).mockResolvedValue([ + { pid: 999, ppid: 1, name: "ssh", cmd: "ssh" }, + ]); + + const monitor = createMonitor({ + networkInfoPath: "/network", + networkPollInterval: 10, + telemetry, + }); + + await vi.advanceTimersByTimeAsync(500); + await vi.waitFor(() => + expect(sink.eventsNamed("ssh.process.lost").length).toBeGreaterThan(0), + ); + + expect(sink.eventsNamed("ssh.process.lost")[0].properties).toMatchObject({ + cause: "missing_network_info", + }); + + monitor.dispose(); + }); + + it("emits ssh.process.replaced (not recovered) when a different PID takes over", async () => { + const { telemetry, sink } = createTestTelemetry(); + vol.fromJSON({ + ...sshLog, + "/network/999.json": makeNetworkJson(), + }); + vi.mocked(find) + .mockResolvedValueOnce([{ pid: 999, ppid: 1, name: "ssh", cmd: "ssh" }]) + .mockResolvedValue([{ pid: 888, ppid: 1, name: "ssh", cmd: "ssh" }]); + + const monitor = createMonitor({ + networkInfoPath: "/network", + networkPollInterval: 10, + telemetry, + }); + await waitForEvent(monitor.onPidChange); + await waitFor(() => sink.eventsNamed("ssh.process.replaced").length > 0); + // Halt monitoring before the negative assertion so the 888 loop + // can't race ahead and emit its own lost/recovered cycle. + monitor.dispose(); + + const replaced = sink.eventsNamed("ssh.process.replaced"); + expect(replaced[0].properties).toMatchObject({ wasLost: "true" }); + expect(replaced[0].measurements).toMatchObject({ + previousUptimeMs: expect.any(Number), + lostDurationMs: expect.any(Number), + }); + expect(sink.eventsNamed("ssh.process.recovered")).toHaveLength(0); + }); + + it("emits lost and recovered events around a stale-network reconnect", async () => { + const { telemetry, sink } = createTestTelemetry(); + vol.fromJSON({ + ...sshLog, + "/network/999.json": makeNetworkJson(), + }); + vi.mocked(find).mockResolvedValue([ + { pid: 999, ppid: 1, name: "ssh", cmd: "ssh" }, + ]); + + const monitor = createMonitor({ + networkInfoPath: "/network", + networkPollInterval: 10, + telemetry, + }); + await waitForEvent(monitor.onPidChange); + await waitFor( + () => + sink.eventsNamed("ssh.process.lost").length > 0 && + sink.eventsNamed("ssh.process.recovered").length > 0, + ); + + expect(sink.eventsNamed("ssh.process.lost")[0]).toMatchObject({ + properties: { cause: "stale_network_info" }, + measurements: { uptimeMs: expect.any(Number) }, + }); + expect(sink.eventsNamed("ssh.process.recovered")[0]).toMatchObject({ + measurements: { recoveryDurationMs: expect.any(Number) }, + }); + }); + + it("forwards network reads to the telemetry as samples", async () => { + const { telemetry, sink } = createTestTelemetry(); + vol.fromJSON({ + ...sshLog, + "/network/999.json": makeNetworkJson({ latency: 25 }), + }); + keepNetworkFileFresh("/network/999.json"); + + createMonitor({ + networkInfoPath: "/network", + networkPollInterval: 10, + telemetry, + }); + await waitFor(() => sink.eventsNamed("ssh.network.sampled").length > 0); + + expect(sink.eventsNamed("ssh.network.sampled")[0]).toMatchObject({ + properties: { p2p: "true", preferredDerp: "NYC" }, + measurements: { latencyMs: 25 }, + }); + }); + }); + describe("log file discovery", () => { it("finds log file matching PID pattern", async () => { vol.fromJSON({ @@ -812,6 +1011,7 @@ describe("SshProcessMonitor", () => { discoveryPollIntervalMs: 10, maxDiscoveryBackoffMs: 100, networkPollInterval: 10, + telemetry: NOOP_TELEMETRY_REPORTER, ...overrides, }); activeMonitors.push(monitor); diff --git a/test/unit/websocket/reconnectingWebSocket.test.ts b/test/unit/websocket/reconnectingWebSocket.test.ts index 8e028e7f..3dafb38d 100644 --- a/test/unit/websocket/reconnectingWebSocket.test.ts +++ b/test/unit/websocket/reconnectingWebSocket.test.ts @@ -1,5 +1,9 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { + NOOP_TELEMETRY_REPORTER, + type TelemetryReporter, +} from "@/telemetry/reporter"; import { WebSocketCloseCode, HttpStatusCode } from "@/websocket/codes"; import { ConnectionState, @@ -7,6 +11,7 @@ import { type SocketFactory, } from "@/websocket/reconnectingWebSocket"; +import { createTestTelemetry } from "../../mocks/telemetry"; import { createMockLogger } from "../../mocks/testHelpers"; import type { CloseEvent, Event as WsEvent } from "ws"; @@ -100,11 +105,7 @@ describe("ReconnectingWebSocket", () => { }); // create() returns a disconnected instance instead of throwing - const ws = await ReconnectingWebSocket.create( - factory, - createMockLogger(), - { onCertificateRefreshNeeded: () => Promise.resolve(false) }, - ); + const ws = await fromFactory(factory); // Should be disconnected after unrecoverable HTTP error expect(ws.state).toBe(ConnectionState.DISCONNECTED); @@ -363,7 +364,9 @@ describe("ReconnectingWebSocket", () => { it("calls onDispose callback once, even with multiple close() calls", async () => { let disposeCount = 0; - const { ws } = await createReconnectingWebSocket(() => ++disposeCount); + const { ws } = await createReconnectingWebSocket({ + onDispose: () => ++disposeCount, + }); ws.close(); ws.close(); @@ -374,9 +377,9 @@ describe("ReconnectingWebSocket", () => { it("suspends (not disposes) on unrecoverable WebSocket close code", async () => { let disposeCount = 0; - const { ws, sockets } = await createReconnectingWebSocket( - () => ++disposeCount, - ); + const { ws, sockets } = await createReconnectingWebSocket({ + onDispose: () => ++disposeCount, + }); sockets[0].fireOpen(); expect(ws.state).toBe(ConnectionState.CONNECTED); @@ -403,9 +406,9 @@ describe("ReconnectingWebSocket", () => { it("does not call onDispose callback during reconnection", async () => { let disposeCount = 0; - const { ws, sockets } = await createReconnectingWebSocket( - () => ++disposeCount, - ); + const { ws, sockets } = await createReconnectingWebSocket({ + onDispose: () => ++disposeCount, + }); sockets[0].fireOpen(); sockets[0].fireClose({ @@ -587,6 +590,103 @@ describe("ReconnectingWebSocket", () => { }); }); + describe("Telemetry wiring", () => { + it("walks the state machine through a full reconnect lifecycle", async () => { + const { telemetry, sink } = createTestTelemetry(); + const { ws, sockets } = await createReconnectingWebSocket({ telemetry }); + + sockets[0].fireOpen(); + sockets[0].fireClose({ + code: WebSocketCloseCode.ABNORMAL, + reason: "Network error", + }); + await vi.advanceTimersByTimeAsync(300); + sockets[1].fireOpen(); + ws.close(); + + expect( + sink + .eventsNamed("connection.state_transitioned") + .map((e) => e.properties), + ).toEqual([ + { from: "IDLE", to: "CONNECTING", reason: "initial_connect" }, + { from: "CONNECTING", to: "CONNECTED", reason: "open" }, + { + from: "CONNECTED", + to: "AWAITING_RETRY", + reason: "unexpected_close", + }, + { + from: "AWAITING_RETRY", + to: "CONNECTING", + reason: "scheduled_reconnect", + }, + { from: "CONNECTING", to: "CONNECTED", reason: "open" }, + { from: "CONNECTED", to: "DISPOSED", reason: "dispose" }, + ]); + + expect(sink.eventsNamed("connection.opened")).toHaveLength(2); + expect(sink.eventsNamed("connection.dropped")).toHaveLength(2); + expect(sink.eventsNamed("connection.reconnect_resolved")).toMatchObject([ + { + properties: { result: "success", reason: "unexpected_close" }, + measurements: { attempts: 1, totalDurationMs: expect.any(Number) }, + }, + ]); + }); + + it("emits a normal-close drop and disconnects on server-initiated close", async () => { + const { telemetry, sink } = createTestTelemetry(); + const { ws, sockets } = await createReconnectingWebSocket({ telemetry }); + + sockets[0].fireOpen(); + sockets[0].fireClose({ + code: WebSocketCloseCode.GOING_AWAY, + reason: "server restarting", + }); + + expect(ws.state).toBe(ConnectionState.DISCONNECTED); + const dropped = sink.eventsNamed("connection.dropped"); + expect(dropped).toHaveLength(1); + expect(dropped[0].properties).toMatchObject({ + cause: "normal_close", + closeCode: String(WebSocketCloseCode.GOING_AWAY), + }); + expect( + sink + .eventsNamed("connection.state_transitioned") + .map((e) => e.properties.reason), + ).toContain("normal_close"); + + ws.close(); + }); + + it("records certificate_refresh as the reconnect reason on successful refresh", async () => { + const { telemetry, sink } = createTestTelemetry(); + const sockets: MockSocket[] = []; + const factory = vi.fn(() => { + const socket = createMockSocket(); + sockets.push(socket); + return Promise.resolve(socket); + }); + const ws = await fromFactory(factory, { + telemetry, + onCertificateRefreshNeeded: () => Promise.resolve(true), + }); + + sockets[0].fireOpen(); + sockets[0].fireError(new Error("ssl alert certificate_expired")); + await vi.waitFor(() => expect(sockets).toHaveLength(2)); + sockets[1].fireOpen(); + + const reconnected = sink.eventsNamed("connection.reconnect_resolved"); + expect(reconnected).toHaveLength(1); + expect(reconnected[0].properties.reason).toBe("certificate_refresh"); + + ws.close(); + }); + }); + describe("Certificate Refresh", () => { const setupRefreshTest = async (onRefresh: () => Promise) => { const sockets: MockSocket[] = []; @@ -596,7 +696,9 @@ describe("ReconnectingWebSocket", () => { sockets.push(socket); return Promise.resolve(socket); }); - const ws = await fromFactory(factory, undefined, refreshCallback); + const ws = await fromFactory(factory, { + onCertificateRefreshNeeded: refreshCallback, + }); sockets[0].fireOpen(); return { ws, sockets, refreshCallback }; }; @@ -747,7 +849,15 @@ function createMockSocket(): MockSocket { }; } -async function createReconnectingWebSocket(onDispose?: () => void): Promise<{ +interface FactoryOptions { + onDispose?: () => void; + onCertificateRefreshNeeded?: () => Promise; + telemetry?: TelemetryReporter; +} + +async function createReconnectingWebSocket( + options: FactoryOptions = {}, +): Promise<{ ws: ReconnectingWebSocket; sockets: MockSocket[]; }> { @@ -757,15 +867,14 @@ async function createReconnectingWebSocket(onDispose?: () => void): Promise<{ sockets.push(socket); return Promise.resolve(socket); }); - const ws = await fromFactory(factory, onDispose); - - // We start with one socket + const ws = await fromFactory(factory, options); expect(sockets).toHaveLength(1); - return { ws, sockets }; } -async function createReconnectingWebSocketWithErrorControl(): Promise<{ +async function createReconnectingWebSocketWithErrorControl( + options: FactoryOptions = {}, +): Promise<{ ws: ReconnectingWebSocket; sockets: MockSocket[]; setFactoryError: (error: Error | null) => void; @@ -782,7 +891,7 @@ async function createReconnectingWebSocketWithErrorControl(): Promise<{ return Promise.resolve(socket); }); - const ws = await fromFactory(factory); + const ws = await fromFactory(factory, options); expect(sockets).toHaveLength(1); return { @@ -796,17 +905,17 @@ async function createReconnectingWebSocketWithErrorControl(): Promise<{ async function fromFactory( factory: SocketFactory, - onDispose?: () => void, - onCertificateRefreshNeeded?: () => Promise, + options: FactoryOptions = {}, ): Promise> { return await ReconnectingWebSocket.create( factory, createMockLogger(), { + telemetry: options.telemetry ?? NOOP_TELEMETRY_REPORTER, onCertificateRefreshNeeded: - onCertificateRefreshNeeded ?? (() => Promise.resolve(false)), + options.onCertificateRefreshNeeded ?? (() => Promise.resolve(false)), }, - onDispose, + options.onDispose, ); }