Skip to content

Commit b0ff78b

Browse files
authored
Merge pull request #136 from QuantGeekDev/fix/session-resilience-streamable-http
fix(http-stream): prevent session destruction on transient errors
2 parents edcbfe3 + 7f1b460 commit b0ff78b

2 files changed

Lines changed: 283 additions & 10 deletions

File tree

src/transports/http/server.ts

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,22 @@ export class HttpStreamTransport extends AbstractTransport {
135135
authData = (authResult as AuthResult).data as RequestContextData || {};
136136
}
137137

138+
// Allow re-initialization even when a stale session ID is provided.
139+
// Clients like Cline may keep sending the old session ID header after
140+
// a session is lost (server restart, transport error, etc.).
141+
const isReInitialize = sessionId && !this._transports[sessionId] && body && isInitializeRequest(body);
142+
138143
// Handle different request scenarios
139144
if (sessionId && this._transports[sessionId]) {
140145
// Existing session
141146
transport = this._transports[sessionId];
142147
logger.debug(`Reusing existing session: ${sessionId}`);
143-
} else if (isInitialize) {
144-
// New session initialization
145-
logger.info('Creating new session for initialization request');
148+
} else if (isInitialize || isReInitialize) {
149+
if (isReInitialize) {
150+
logger.info(`Stale session ID ${sessionId} — creating new session for re-initialization`);
151+
} else {
152+
logger.info('Creating new session for initialization request');
153+
}
146154

147155
transport = new StreamableHTTPServerTransport({
148156
sessionIdGenerator: () => randomUUID(),
@@ -161,10 +169,11 @@ export class HttpStreamTransport extends AbstractTransport {
161169
};
162170

163171
transport.onerror = (error) => {
164-
logger.error(`Transport error for session: ${error}`);
165-
if (transport.sessionId) {
166-
delete this._transports[transport.sessionId];
167-
}
172+
// Log the error but do NOT remove the session. The SDK fires onerror
173+
// for transient issues (parse errors, failed SSE writes) that don't
174+
// invalidate the session. Removing the session here causes "Session
175+
// not found" errors on subsequent requests from the same client.
176+
logger.error(`Transport error for session ${transport.sessionId}: ${error}`);
168177
};
169178

170179
transport.onmessage = async (message: JSONRPCMessage) => {
@@ -182,7 +191,7 @@ export class HttpStreamTransport extends AbstractTransport {
182191
this.sendError(res, 400, -32000, 'Bad Request: No valid session ID provided');
183192
return;
184193
} else {
185-
// Session ID provided but not found
194+
// Session ID provided but not found (and not an initialize request)
186195
this.sendError(res, 404, -32001, 'Session not found');
187196
return;
188197
}
@@ -268,8 +277,11 @@ export class HttpStreamTransport extends AbstractTransport {
268277
}
269278

270279
if (failedSessions.length > 0) {
271-
failedSessions.forEach((sessionId) => delete this._transports[sessionId]);
272-
logger.warn(`Failed to send message to ${failedSessions.length} sessions.`);
280+
// Log but don't remove sessions on transient send failures.
281+
// The SDK throws when no SSE stream is currently open for a request ID,
282+
// which is a normal condition (e.g. client momentarily between requests).
283+
// The session itself remains valid for future requests.
284+
logger.warn(`Failed to broadcast to ${failedSessions.length} session(s) — sessions preserved for future requests.`);
273285
}
274286
}
275287

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
import { describe, it, expect, beforeEach, afterEach } from '@jest/globals';
2+
import { HttpStreamTransport } from '../../../src/transports/http/server.js';
3+
import http from 'node:http';
4+
5+
/**
6+
* Regression tests for session resilience in HttpStreamTransport.
7+
*
8+
* These cover two bugs that caused "Session not found" (-32001) errors
9+
* for clients like Cline after a session was established:
10+
*
11+
* 1. onerror callback destroyed sessions on transient SDK errors (parse errors,
12+
* failed SSE writes) — the session should survive these.
13+
* 2. Re-initialization with a stale session ID was rejected with 404 instead
14+
* of creating a new session.
15+
*/
16+
describe('HttpStreamTransport — Session Resilience', () => {
17+
let transport: HttpStreamTransport;
18+
let testPort: number;
19+
// Track open requests so we can clean them up before closing the transport
20+
let openRequests: http.ClientRequest[];
21+
22+
const initializeBody = {
23+
jsonrpc: '2.0',
24+
method: 'initialize',
25+
params: {
26+
protocolVersion: '2024-11-05',
27+
capabilities: {},
28+
clientInfo: { name: 'test-client', version: '1.0.0' },
29+
},
30+
id: 1,
31+
};
32+
33+
beforeEach(() => {
34+
testPort = 4000 + Math.floor(Math.random() * 1000);
35+
openRequests = [];
36+
transport = new HttpStreamTransport({
37+
port: testPort,
38+
endpoint: '/mcp',
39+
responseMode: 'stream',
40+
});
41+
});
42+
43+
afterEach(async () => {
44+
// Destroy any open HTTP connections so the server can shut down cleanly
45+
for (const req of openRequests) {
46+
req.destroy();
47+
}
48+
openRequests = [];
49+
50+
if (transport.isRunning()) {
51+
await transport.close();
52+
}
53+
});
54+
55+
function getTransports(): Record<string, any> {
56+
return (transport as any)._transports;
57+
}
58+
59+
function wait(ms: number): Promise<void> {
60+
return new Promise((resolve) => setTimeout(resolve, ms));
61+
}
62+
63+
/**
64+
* Polls until at least one session exists in the transport map.
65+
* Throws after the timeout if no session appears.
66+
*/
67+
async function waitForSession(timeoutMs = 2000): Promise<string> {
68+
const start = Date.now();
69+
while (Date.now() - start < timeoutMs) {
70+
const ids = Object.keys(getTransports());
71+
if (ids.length > 0) return ids[0];
72+
await wait(20);
73+
}
74+
throw new Error('Timed out waiting for session to be created');
75+
}
76+
77+
/**
78+
* Fire-and-forget POST. The SSE response may never complete (no MCP server),
79+
* so we don't await the response — we track the request for cleanup.
80+
*/
81+
function firePost(body: any, sessionId?: string): void {
82+
const headers: http.OutgoingHttpHeaders = {
83+
'Content-Type': 'application/json',
84+
'Accept': 'application/json, text/event-stream',
85+
};
86+
if (sessionId) {
87+
headers['Mcp-Session-Id'] = sessionId;
88+
}
89+
const bodyStr = JSON.stringify(body);
90+
91+
const req = http.request({
92+
hostname: 'localhost',
93+
port: testPort,
94+
path: '/mcp',
95+
method: 'POST',
96+
headers: { ...headers, 'Content-Length': Buffer.byteLength(bodyStr) },
97+
});
98+
req.on('error', () => {});
99+
req.write(bodyStr);
100+
req.end();
101+
102+
openRequests.push(req);
103+
}
104+
105+
/**
106+
* Full request/response for non-streaming responses (errors, 404s, etc.)
107+
*/
108+
function makeRequest(
109+
body: any,
110+
sessionId?: string,
111+
): Promise<{ statusCode: number; headers: http.IncomingHttpHeaders; body: string }> {
112+
return new Promise((resolve, reject) => {
113+
const headers: http.OutgoingHttpHeaders = {
114+
'Content-Type': 'application/json',
115+
'Accept': 'application/json, text/event-stream',
116+
};
117+
if (sessionId) {
118+
headers['Mcp-Session-Id'] = sessionId;
119+
}
120+
const bodyStr = JSON.stringify(body);
121+
122+
const req = http.request(
123+
{
124+
hostname: 'localhost',
125+
port: testPort,
126+
path: '/mcp',
127+
method: 'POST',
128+
headers: { ...headers, 'Content-Length': Buffer.byteLength(bodyStr) },
129+
},
130+
(res) => {
131+
let responseBody = '';
132+
res.on('data', (chunk: Buffer) => {
133+
responseBody += chunk.toString();
134+
});
135+
res.on('end', () => {
136+
resolve({
137+
statusCode: res.statusCode!,
138+
headers: res.headers,
139+
body: responseBody,
140+
});
141+
});
142+
},
143+
);
144+
req.on('error', reject);
145+
req.write(bodyStr);
146+
req.end();
147+
148+
openRequests.push(req);
149+
});
150+
}
151+
152+
// ---------------------------------------------------------------------------
153+
// Bug 1: onerror must NOT destroy sessions
154+
// ---------------------------------------------------------------------------
155+
describe('onerror should not destroy sessions', () => {
156+
it('should keep session alive after onerror fires on the internal transport', async () => {
157+
await transport.start();
158+
transport.onmessage = async () => {};
159+
160+
firePost(initializeBody);
161+
const sessionId = await waitForSession();
162+
163+
// Simulate the SDK firing onerror (e.g. parse error on a bad request)
164+
const internalTransport = getTransports()[sessionId];
165+
internalTransport.onerror?.(new Error('Simulated transient error'));
166+
167+
// Session must still be in the map
168+
expect(getTransports()[sessionId]).toBeDefined();
169+
});
170+
171+
it('should keep session alive after multiple onerror events', async () => {
172+
await transport.start();
173+
transport.onmessage = async () => {};
174+
175+
firePost(initializeBody);
176+
const sessionId = await waitForSession();
177+
const internalTransport = getTransports()[sessionId];
178+
179+
internalTransport.onerror?.(new Error('Error 1'));
180+
internalTransport.onerror?.(new Error('Error 2'));
181+
internalTransport.onerror?.(new Error('Error 3'));
182+
183+
expect(getTransports()[sessionId]).toBeDefined();
184+
});
185+
});
186+
187+
// ---------------------------------------------------------------------------
188+
// Bug 2: Re-initialization with stale session ID
189+
// ---------------------------------------------------------------------------
190+
describe('re-initialization with stale session ID', () => {
191+
it('should create a new session instead of returning 404', async () => {
192+
await transport.start();
193+
transport.onmessage = async () => {};
194+
195+
// Send an initialize request with a session ID that doesn't exist
196+
firePost(initializeBody, 'stale-session-id-that-does-not-exist');
197+
const sessionId = await waitForSession();
198+
199+
// A new session was created (not rejected with 404)
200+
expect(sessionId).not.toBe('stale-session-id-that-does-not-exist');
201+
expect(getTransports()[sessionId]).toBeDefined();
202+
});
203+
204+
it('should still reject non-initialize requests with unknown session IDs', async () => {
205+
await transport.start();
206+
transport.onmessage = async () => {};
207+
208+
const response = await makeRequest(
209+
{ jsonrpc: '2.0', method: 'tools/list', id: 1 },
210+
'nonexistent-session-id',
211+
);
212+
213+
expect(response.statusCode).toBe(404);
214+
expect(response.body).toContain('Session not found');
215+
});
216+
});
217+
218+
// ---------------------------------------------------------------------------
219+
// onclose SHOULD still clean up sessions (correct behavior preserved)
220+
// ---------------------------------------------------------------------------
221+
describe('onclose should still remove sessions', () => {
222+
it('should remove session when transport is closed', async () => {
223+
await transport.start();
224+
transport.onmessage = async () => {};
225+
226+
firePost(initializeBody);
227+
const sessionId = await waitForSession();
228+
expect(getTransports()[sessionId]).toBeDefined();
229+
230+
// Simulate the SDK calling close (as it does for DELETE requests)
231+
const internalTransport = getTransports()[sessionId];
232+
await internalTransport.close();
233+
234+
expect(getTransports()[sessionId]).toBeUndefined();
235+
});
236+
});
237+
238+
// ---------------------------------------------------------------------------
239+
// Broadcast send failures should not destroy sessions
240+
// ---------------------------------------------------------------------------
241+
describe('broadcast failures should not destroy sessions', () => {
242+
it('should preserve sessions after a failed broadcast send', async () => {
243+
await transport.start();
244+
transport.onmessage = async () => {};
245+
246+
firePost(initializeBody);
247+
const sessionId = await waitForSession();
248+
249+
// Monkey-patch the internal transport's send to throw
250+
const internalTransport = getTransports()[sessionId];
251+
internalTransport.send = async () => {
252+
throw new Error('Simulated send failure');
253+
};
254+
255+
// Broadcast — should NOT remove the session
256+
await transport.send({ jsonrpc: '2.0', method: 'notification/test' });
257+
258+
expect(getTransports()[sessionId]).toBeDefined();
259+
});
260+
});
261+
});

0 commit comments

Comments
 (0)