Skip to content

Commit e6fa554

Browse files
committed
fix: clean up streamable HTTP SSE disconnects
1 parent 603342f commit e6fa554

2 files changed

Lines changed: 52 additions & 1 deletion

File tree

src/mcp/server/streamable_http.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -625,9 +625,18 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
625625

626626
# Start the SSE response (this will send headers immediately)
627627
try:
628+
629+
async def run_response_with_cleanup() -> None:
630+
try:
631+
await response(scope, receive, send)
632+
finally:
633+
self._sse_stream_writers.pop(request_id, None)
634+
await sse_stream_writer.aclose()
635+
await self._clean_up_memory_streams(request_id)
636+
628637
# First send the response to establish the SSE connection
629638
async with anyio.create_task_group() as tg:
630-
tg.start_soon(response, scope, receive, send)
639+
tg.start_soon(run_response_with_cleanup)
631640
# Then send the message to be processed by the server
632641
session_message = self._create_session_message(message, request, request_id, protocol_version)
633642
await writer.send(session_message)

tests/server/test_streamable_http_manager.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,48 @@ async def running_manager():
101101
yield manager, app
102102

103103

104+
@pytest.mark.anyio
105+
async def test_streamable_http_post_sse_cleans_up_streams_when_response_returns(monkeypatch: pytest.MonkeyPatch):
106+
transport = StreamableHTTPServerTransport(mcp_session_id=None)
107+
sent_messages: list[Message] = []
108+
body = json.dumps({"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}).encode()
109+
110+
class DisconnectingEventSourceResponse:
111+
def __init__(self, *args: Any, **kwargs: Any) -> None:
112+
pass
113+
114+
async def __call__(self, scope: Scope, receive: Any, send: Any) -> None:
115+
await send({"type": "http.response.start", "status": 200, "headers": []})
116+
117+
async def send(message: Message) -> None:
118+
sent_messages.append(message)
119+
120+
async def receive() -> Message:
121+
return {"type": "http.request", "body": body, "more_body": False}
122+
123+
scope: Scope = {
124+
"type": "http",
125+
"method": "POST",
126+
"path": "/mcp",
127+
"headers": [
128+
(b"accept", b"application/json, text/event-stream"),
129+
(b"content-type", b"application/json"),
130+
],
131+
}
132+
133+
monkeypatch.setattr("mcp.server.streamable_http.EventSourceResponse", DisconnectingEventSourceResponse)
134+
135+
async with transport.connect() as (read_stream, _write_stream):
136+
async with anyio.create_task_group() as tg:
137+
tg.start_soon(transport.handle_request, scope, receive, send)
138+
session_message = await read_stream.receive()
139+
assert session_message.message.method == "tools/list"
140+
141+
assert transport._request_streams == {}
142+
assert transport._sse_stream_writers == {}
143+
assert any(message["type"] == "http.response.start" for message in sent_messages)
144+
145+
104146
@pytest.mark.anyio
105147
async def test_stateful_session_cleanup_on_graceful_exit(running_manager: tuple[StreamableHTTPSessionManager, Server]):
106148
manager, _app = running_manager

0 commit comments

Comments
 (0)