diff --git a/.changeset/quiet-cups-retry.md b/.changeset/quiet-cups-retry.md new file mode 100644 index 0000000000..94078c3cc0 --- /dev/null +++ b/.changeset/quiet-cups-retry.md @@ -0,0 +1,5 @@ +--- +"@modelcontextprotocol/client": patch +--- + +Make StreamableHTTPClientTransport retry standalone SSE streams longer and fail fast after reconnection exhaustion instead of allowing later request responses to disappear behind a dead SSE channel. SSE open failures now include a fallback HTTP status when statusText is empty. diff --git a/packages/client/src/client/streamableHttp.ts b/packages/client/src/client/streamableHttp.ts index cd643c96dc..1b95e72f4c 100644 --- a/packages/client/src/client/streamableHttp.ts +++ b/packages/client/src/client/streamableHttp.ts @@ -22,9 +22,34 @@ const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS: StreamableHTTPReconnectionOp initialReconnectionDelay: 1000, maxReconnectionDelay: 30_000, reconnectionDelayGrowFactor: 1.5, - maxRetries: 2 + maxRetries: 10 }; +function errorMessage(error: unknown): string { + if (error instanceof Error && error.message) { + return error.message; + } + + if (typeof error === 'object' && error) { + const maybeError = error as { name?: string; status?: number; statusText?: string }; + if (maybeError.statusText) { + return maybeError.statusText; + } + if (maybeError.status !== undefined) { + return `HTTP ${maybeError.status}`; + } + if (maybeError.name) { + return maybeError.name; + } + } + + if (typeof error === 'string' && error) { + return error; + } + + return 'unknown'; +} + /** * Options for starting or authenticating an SSE connection */ @@ -74,7 +99,7 @@ export interface StreamableHTTPReconnectionOptions { /** * Maximum number of reconnection attempts before giving up. - * Default is 2. + * Default is 10. */ maxRetries: number; } @@ -184,6 +209,7 @@ export class StreamableHTTPClientTransport implements Transport { private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field private readonly _reconnectionScheduler?: ReconnectionScheduler; private _cancelReconnection?: () => void; + private _standaloneSseReconnectError?: Error; onclose?: () => void; onerror?: (error: Error) => void; @@ -288,12 +314,14 @@ export class StreamableHTTPClientTransport implements Transport { return; } - throw new SdkError(SdkErrorCode.ClientHttpFailedToOpenStream, `Failed to open SSE stream: ${response.statusText}`, { + const statusText = response.statusText || `HTTP ${response.status}`; + throw new SdkError(SdkErrorCode.ClientHttpFailedToOpenStream, `Failed to open SSE stream: ${statusText}`, { status: response.status, statusText: response.statusText }); } + this._standaloneSseReconnectError = undefined; this._handleSseStream(response.body, options, true); } catch (error) { this.onerror?.(error as Error); @@ -328,13 +356,17 @@ export class StreamableHTTPClientTransport implements Transport { * @param lastEventId The ID of the last received event for resumability * @param attemptCount Current reconnection attempt count for this specific stream */ - private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0): void { + private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0, failFutureRequests = false): void { // Use provided options or default options const maxRetries = this._reconnectionOptions.maxRetries; // Check if we've exceeded maximum retry attempts if (attemptCount >= maxRetries) { - this.onerror?.(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`)); + const error = new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`); + if (failFutureRequests) { + this._standaloneSseReconnectError = error; + } + this.onerror?.(error); return; } @@ -345,9 +377,9 @@ export class StreamableHTTPClientTransport implements Transport { this._cancelReconnection = undefined; if (this._abortController?.signal.aborted) return; this._startOrAuthSse(options).catch(error => { - this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); + this.onerror?.(new Error(`Failed to reconnect SSE stream: ${errorMessage(error)}`)); try { - this._scheduleReconnection(options, attemptCount + 1); + this._scheduleReconnection(options, attemptCount + 1, failFutureRequests); } catch (scheduleError) { this.onerror?.(scheduleError instanceof Error ? scheduleError : new Error(String(scheduleError))); } @@ -443,12 +475,13 @@ export class StreamableHTTPClientTransport implements Transport { onresumptiontoken, replayMessageId }, - 0 + 0, + isReconnectable ); } } catch (error) { // Handle stream errors - likely a network disconnect - this.onerror?.(new Error(`SSE stream disconnected: ${error}`)); + this.onerror?.(new Error(`SSE stream disconnected: ${errorMessage(error)}`)); // Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing // Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID) @@ -464,10 +497,11 @@ export class StreamableHTTPClientTransport implements Transport { onresumptiontoken, replayMessageId }, - 0 + 0, + isReconnectable ); } catch (error) { - this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`)); + this.onerror?.(new Error(`Failed to reconnect: ${errorMessage(error)}`)); } } } @@ -538,6 +572,12 @@ export class StreamableHTTPClientTransport implements Transport { return; } + const messages = Array.isArray(message) ? message : [message]; + const hasRequests = messages.some(msg => 'method' in msg && 'id' in msg && msg.id !== undefined); + if (hasRequests && this._standaloneSseReconnectError) { + throw new Error(`SSE stream reconnection failed: ${this._standaloneSseReconnectError.message}`); + } + const headers = await this._commonHeaders(); headers.set('content-type', 'application/json'); const userAccept = headers.get('accept'); @@ -649,11 +689,6 @@ export class StreamableHTTPClientTransport implements Transport { return; } - // Get original message(s) for detecting request IDs - const messages = Array.isArray(message) ? message : [message]; - - const hasRequests = messages.some(msg => 'method' in msg && 'id' in msg && msg.id !== undefined); - // Check the response type const contentType = response.headers.get('content-type'); diff --git a/packages/client/test/client/streamableHttp.test.ts b/packages/client/test/client/streamableHttp.test.ts index b2138b3fa8..02675d7b87 100644 --- a/packages/client/test/client/streamableHttp.test.ts +++ b/packages/client/test/client/streamableHttp.test.ts @@ -927,7 +927,7 @@ describe('StreamableHTTPClientTransport', () => { // ASSERT expect(errorSpy).toHaveBeenCalledWith( expect.objectContaining({ - message: expect.stringContaining('SSE stream disconnected: Error: Network failure') + message: expect.stringContaining('SSE stream disconnected: Network failure') }) ); // THE KEY ASSERTION: A second fetch call proves reconnection was attempted. @@ -1810,6 +1810,31 @@ describe('StreamableHTTPClientTransport', () => { // Clean up the pending reconnection to avoid test pollution transport['_cancelReconnection']?.(); }); + + it('should fail future requests after standalone SSE reconnect attempts are exhausted', async () => { + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 10, + maxRetries: 0, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1 + } + }); + + transport['_scheduleReconnection']({}, 0, true); + + const message: JSONRPCRequest = { + jsonrpc: '2.0', + method: 'tools/call', + params: {}, + id: 'request-after-dead-sse' + }; + + await expect(transport.send(message)).rejects.toThrow( + 'SSE stream reconnection failed: Maximum reconnection attempts (0) exceeded.' + ); + expect(globalThis.fetch).not.toHaveBeenCalled(); + }); }); describe('prevent infinite recursion when server returns 401 after successful auth', () => {