diff --git a/.changeset/fix-inmemory-event-store-sse-resume.md b/.changeset/fix-inmemory-event-store-sse-resume.md new file mode 100644 index 0000000000..dba5fed2f4 --- /dev/null +++ b/.changeset/fix-inmemory-event-store-sse-resume.md @@ -0,0 +1,5 @@ +--- +'@modelcontextprotocol/sdk': patch +--- + +Fix `InMemoryEventStore` replay for stream IDs that contain underscores, including the standalone Streamable HTTP SSE stream. diff --git a/src/examples/shared/inMemoryEventStore.ts b/src/examples/shared/inMemoryEventStore.ts index d4d02eb913..89b2ad399d 100644 --- a/src/examples/shared/inMemoryEventStore.ts +++ b/src/examples/shared/inMemoryEventStore.ts @@ -20,8 +20,7 @@ export class InMemoryEventStore implements EventStore { * Extracts the stream ID from an event ID */ private getStreamIdFromEventId(eventId: string): string { - const parts = eventId.split('_'); - return parts.length > 0 ? parts[0] : ''; + return this.events.get(eventId)?.streamId ?? ''; } /** diff --git a/test/integration-tests/taskResumability.test.ts b/test/integration-tests/taskResumability.test.ts index 187a3d2ff7..d6eeffb074 100644 --- a/test/integration-tests/taskResumability.test.ts +++ b/test/integration-tests/taskResumability.test.ts @@ -9,6 +9,37 @@ import { InMemoryEventStore } from '../../src/examples/shared/inMemoryEventStore import { zodTestMatrix, type ZodMatrixEntry } from '../../src/__fixtures__/zodTestMatrix.js'; import { listenOnRandomPort } from '../helpers/http.js'; +describe('InMemoryEventStore', () => { + it('replays events for stream IDs that contain underscores', async () => { + const dateNow = vi.spyOn(Date, 'now'); + dateNow.mockReturnValueOnce(1000).mockReturnValueOnce(1001); + const random = vi.spyOn(Math, 'random'); + random.mockReturnValueOnce(0.1).mockReturnValueOnce(0.2); + + try { + const eventStore = new InMemoryEventStore(); + const streamId = '_GET_stream'; + const firstMessage = { jsonrpc: '2.0' as const, method: 'first' }; + const secondMessage = { jsonrpc: '2.0' as const, method: 'second' }; + const firstEventId = await eventStore.storeEvent(streamId, firstMessage); + const secondEventId = await eventStore.storeEvent(streamId, secondMessage); + const replayed: Array<{ eventId: string; message: unknown }> = []; + + const replayedStreamId = await eventStore.replayEventsAfter(firstEventId, { + send: async (eventId, message) => { + replayed.push({ eventId, message }); + } + }); + + expect(replayedStreamId).toBe(streamId); + expect(replayed).toEqual([{ eventId: secondEventId, message: secondMessage }]); + } finally { + dateNow.mockRestore(); + random.mockRestore(); + } + }); +}); + describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { const { z } = entry; describe('Transport resumability', () => {