diff --git a/.changeset/perf-frame-decoder-index-pointer.md b/.changeset/perf-frame-decoder-index-pointer.md new file mode 100644 index 0000000000..23e6a08dd9 --- /dev/null +++ b/.changeset/perf-frame-decoder-index-pointer.md @@ -0,0 +1,5 @@ +--- +'@tanstack/start-client-core': patch +--- + +perf: drop consumed chunks from the client frame decoder buffer with an O(1) head pointer instead of `Array.prototype.shift()` (O(n)). The previous approach degraded to O(n²) when a single large frame (e.g. a big `RawStream` payload) was assembled from many small network reads. diff --git a/packages/start-client-core/src/client-rpc/frame-decoder.ts b/packages/start-client-core/src/client-rpc/frame-decoder.ts index dbdd605bf2..f1fa0d3a4a 100644 --- a/packages/start-client-core/src/client-rpc/frame-decoder.ts +++ b/packages/start-client-core/src/client-rpc/frame-decoder.ts @@ -133,6 +133,11 @@ export function createFrameDecoder( inputReader = reader const bufferList: Array = [] + // Index of the first un-consumed chunk in bufferList. Advancing this + // pointer is O(1); using bufferList.shift() to drop a consumed chunk is + // O(n) and degrades to O(n^2) when a single large frame is assembled from + // many small chunks (e.g. a big RawStream payload split across reads). + let bufferHead = 0 let totalLength = 0 /** @@ -146,7 +151,7 @@ export function createFrameDecoder( } | null { if (totalLength < FRAME_HEADER_SIZE) return null - const first = bufferList[0]! + const first = bufferList[bufferHead]! // Fast path: header fits entirely in first chunk (common case) if (first.length >= FRAME_HEADER_SIZE) { @@ -170,7 +175,7 @@ export function createFrameDecoder( const headerBytes = new Uint8Array(FRAME_HEADER_SIZE) let offset = 0 let remaining = FRAME_HEADER_SIZE - for (let i = 0; i < bufferList.length && remaining > 0; i++) { + for (let i = bufferHead; i < bufferList.length && remaining > 0; i++) { const chunk = bufferList[i]! const toCopy = Math.min(chunk.length, remaining) headerBytes.set(chunk.subarray(0, toCopy), offset) @@ -205,9 +210,8 @@ export function createFrameDecoder( let offset = 0 let remaining = count - while (remaining > 0 && bufferList.length > 0) { - const chunk = bufferList[0] - if (!chunk) break + while (remaining > 0 && bufferHead < bufferList.length) { + const chunk = bufferList[bufferHead]! const toCopy = Math.min(chunk.length, remaining) result.set(chunk.subarray(0, toCopy), offset) @@ -215,12 +219,26 @@ export function createFrameDecoder( remaining -= toCopy if (toCopy === chunk.length) { - bufferList.shift() + // Whole chunk consumed: release it and advance the head pointer + // (O(1)) instead of bufferList.shift() (O(n)). + bufferList[bufferHead++] = EMPTY_BUFFER } else { - bufferList[0] = chunk.subarray(toCopy) + bufferList[bufferHead] = chunk.subarray(toCopy) } } + // Drop consumed chunks so bufferList doesn't grow unbounded over a + // long-lived stream. Fully drained is the common terminal state and + // resets in O(1); otherwise splice off the consumed prefix once it grows + // past a small threshold (amortized O(1) per consumed chunk). + if (bufferHead === bufferList.length) { + bufferList.length = 0 + bufferHead = 0 + } else if (bufferHead >= 32) { + bufferList.splice(0, bufferHead) + bufferHead = 0 + } + totalLength -= count return result } diff --git a/packages/start-client-core/tests/frame-decoder.test.ts b/packages/start-client-core/tests/frame-decoder.test.ts index 29b8974dea..40047edcd2 100644 --- a/packages/start-client-core/tests/frame-decoder.test.ts +++ b/packages/start-client-core/tests/frame-decoder.test.ts @@ -558,5 +558,90 @@ describe('frame-decoder', () => { const { done: finalDone } = await reader.read() expect(finalDone).toBe(true) }) + + it('reassembles a large chunk payload delivered one byte at a time', async () => { + // Forces the header slow path AND many whole-chunk consumptions within a + // single extract, exercising the head-pointer advance + fully-drained + // reset. With the previous bufferList.shift() this path was O(n^2). + const payload = new Uint8Array(200) + for (let i = 0; i < payload.length; i++) payload[i] = (i * 7) % 256 + + const jsonFrame = encodeJSONFrame('{"ref":21}') + const chunkFrame = encodeChunkFrame(21, payload) + const endFrame = encodeEndFrame(21) + + const combined = new Uint8Array( + jsonFrame.length + chunkFrame.length + endFrame.length, + ) + combined.set(jsonFrame, 0) + combined.set(chunkFrame, jsonFrame.length) + combined.set(endFrame, jsonFrame.length + chunkFrame.length) + + const input = new ReadableStream({ + start(controller) { + for (let i = 0; i < combined.length; i++) { + controller.enqueue(combined.subarray(i, i + 1)) + } + controller.close() + }, + }) + + const { getOrCreateStream, jsonChunks } = createFrameDecoder(input) + const stream21 = getOrCreateStream(21) + + const jsonReader = jsonChunks.getReader() + const { value: jsonValue } = await jsonReader.read() + expect(jsonValue).toBe('{"ref":21}') + + const rawReader = stream21.getReader() + const received: Array = [] + while (true) { + const { done, value } = await rawReader.read() + if (done) break + if (value) received.push(...value) + } + expect(received).toEqual(Array.from(payload)) + }) + + it('decodes many frames when reads never align with frame boundaries', async () => { + // 100-byte frames fed in 7-byte reads never align until the very end, so + // consumed chunks accumulate and the head pointer climbs past the + // compaction threshold repeatedly — exercising the splice() prefix drop. + const FRAME_COUNT = 7 + const expected: Array = [] + const frames: Array = [] + for (let i = 0; i < FRAME_COUNT; i++) { + const payload = `frame-${i}`.padEnd(91, '.') // 91 bytes => 100-byte frame + expected.push(payload) + frames.push(encodeJSONFrame(payload)) + } + + const totalLen = frames.reduce((acc, f) => acc + f.length, 0) + const combined = new Uint8Array(totalLen) + let offset = 0 + for (const f of frames) { + combined.set(f, offset) + offset += f.length + } + + const input = new ReadableStream({ + start(controller) { + for (let i = 0; i < combined.length; i += 7) { + controller.enqueue(combined.subarray(i, i + 7)) + } + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + const received: Array = [] + while (true) { + const { done, value } = await reader.read() + if (done) break + received.push(value) + } + expect(received).toEqual(expected) + }) }) })