Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quiet-cups-retry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@modelcontextprotocol/client": patch
---

Make StreamableHTTPClientTransport retry standalone SSE streams longer and fail fast after reconnection exhaustion instead of allowing later request responses to disappear behind a dead SSE channel. SSE open failures now include a fallback HTTP status when statusText is empty.
67 changes: 51 additions & 16 deletions packages/client/src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,34 @@ const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS: StreamableHTTPReconnectionOp
initialReconnectionDelay: 1000,
maxReconnectionDelay: 30_000,
reconnectionDelayGrowFactor: 1.5,
maxRetries: 2
maxRetries: 10
};

function errorMessage(error: unknown): string {
if (error instanceof Error && error.message) {
return error.message;
}

if (typeof error === 'object' && error) {
const maybeError = error as { name?: string; status?: number; statusText?: string };
if (maybeError.statusText) {
return maybeError.statusText;
}
if (maybeError.status !== undefined) {
return `HTTP ${maybeError.status}`;
}
if (maybeError.name) {
return maybeError.name;
}
}

if (typeof error === 'string' && error) {
return error;
}

return 'unknown';
}

/**
* Options for starting or authenticating an SSE connection
*/
Expand Down Expand Up @@ -74,7 +99,7 @@ export interface StreamableHTTPReconnectionOptions {

/**
* Maximum number of reconnection attempts before giving up.
* Default is 2.
* Default is 10.
*/
maxRetries: number;
}
Expand Down Expand Up @@ -184,6 +209,7 @@ export class StreamableHTTPClientTransport implements Transport {
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
private readonly _reconnectionScheduler?: ReconnectionScheduler;
private _cancelReconnection?: () => void;
private _standaloneSseReconnectError?: Error;

onclose?: () => void;
onerror?: (error: Error) => void;
Expand Down Expand Up @@ -288,12 +314,14 @@ export class StreamableHTTPClientTransport implements Transport {
return;
}

throw new SdkError(SdkErrorCode.ClientHttpFailedToOpenStream, `Failed to open SSE stream: ${response.statusText}`, {
const statusText = response.statusText || `HTTP ${response.status}`;
throw new SdkError(SdkErrorCode.ClientHttpFailedToOpenStream, `Failed to open SSE stream: ${statusText}`, {
status: response.status,
statusText: response.statusText
});
}

this._standaloneSseReconnectError = undefined;
this._handleSseStream(response.body, options, true);
} catch (error) {
this.onerror?.(error as Error);
Expand Down Expand Up @@ -328,13 +356,17 @@ export class StreamableHTTPClientTransport implements Transport {
* @param lastEventId The ID of the last received event for resumability
* @param attemptCount Current reconnection attempt count for this specific stream
*/
private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0): void {
private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0, failFutureRequests = false): void {
// Use provided options or default options
const maxRetries = this._reconnectionOptions.maxRetries;

// Check if we've exceeded maximum retry attempts
if (attemptCount >= maxRetries) {
this.onerror?.(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`));
const error = new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`);
if (failFutureRequests) {
this._standaloneSseReconnectError = error;
}
this.onerror?.(error);
return;
}

Expand All @@ -345,9 +377,9 @@ export class StreamableHTTPClientTransport implements Transport {
this._cancelReconnection = undefined;
if (this._abortController?.signal.aborted) return;
this._startOrAuthSse(options).catch(error => {
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${errorMessage(error)}`));
try {
this._scheduleReconnection(options, attemptCount + 1);
this._scheduleReconnection(options, attemptCount + 1, failFutureRequests);
} catch (scheduleError) {
this.onerror?.(scheduleError instanceof Error ? scheduleError : new Error(String(scheduleError)));
}
Expand Down Expand Up @@ -443,12 +475,13 @@ export class StreamableHTTPClientTransport implements Transport {
onresumptiontoken,
replayMessageId
},
0
0,
isReconnectable
);
}
} catch (error) {
// Handle stream errors - likely a network disconnect
this.onerror?.(new Error(`SSE stream disconnected: ${error}`));
this.onerror?.(new Error(`SSE stream disconnected: ${errorMessage(error)}`));

// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
// Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID)
Expand All @@ -464,10 +497,11 @@ export class StreamableHTTPClientTransport implements Transport {
onresumptiontoken,
replayMessageId
},
0
0,
isReconnectable
);
} catch (error) {
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
this.onerror?.(new Error(`Failed to reconnect: ${errorMessage(error)}`));
}
}
}
Expand Down Expand Up @@ -538,6 +572,12 @@ export class StreamableHTTPClientTransport implements Transport {
return;
}

const messages = Array.isArray(message) ? message : [message];
const hasRequests = messages.some(msg => 'method' in msg && 'id' in msg && msg.id !== undefined);
if (hasRequests && this._standaloneSseReconnectError) {
throw new Error(`SSE stream reconnection failed: ${this._standaloneSseReconnectError.message}`);
}

const headers = await this._commonHeaders();
headers.set('content-type', 'application/json');
const userAccept = headers.get('accept');
Expand Down Expand Up @@ -649,11 +689,6 @@ export class StreamableHTTPClientTransport implements Transport {
return;
}

// Get original message(s) for detecting request IDs
const messages = Array.isArray(message) ? message : [message];

const hasRequests = messages.some(msg => 'method' in msg && 'id' in msg && msg.id !== undefined);

// Check the response type
const contentType = response.headers.get('content-type');

Expand Down
27 changes: 26 additions & 1 deletion packages/client/test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ describe('StreamableHTTPClientTransport', () => {
// ASSERT
expect(errorSpy).toHaveBeenCalledWith(
expect.objectContaining({
message: expect.stringContaining('SSE stream disconnected: Error: Network failure')
message: expect.stringContaining('SSE stream disconnected: Network failure')
})
);
// THE KEY ASSERTION: A second fetch call proves reconnection was attempted.
Expand Down Expand Up @@ -1810,6 +1810,31 @@ describe('StreamableHTTPClientTransport', () => {
// Clean up the pending reconnection to avoid test pollution
transport['_cancelReconnection']?.();
});

it('should fail future requests after standalone SSE reconnect attempts are exhausted', async () => {
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions: {
initialReconnectionDelay: 10,
maxRetries: 0,
maxReconnectionDelay: 1000,
reconnectionDelayGrowFactor: 1
}
});

transport['_scheduleReconnection']({}, 0, true);

const message: JSONRPCRequest = {
jsonrpc: '2.0',
method: 'tools/call',
params: {},
id: 'request-after-dead-sse'
};

await expect(transport.send(message)).rejects.toThrow(
'SSE stream reconnection failed: Maximum reconnection attempts (0) exceeded.'
);
expect(globalThis.fetch).not.toHaveBeenCalled();
});
});

describe('prevent infinite recursion when server returns 401 after successful auth', () => {
Expand Down
Loading