From f9482deac1b14881ebb5b884d4f718f56a8590e9 Mon Sep 17 00:00:00 2001 From: kishansomaiya Date: Tue, 26 May 2026 14:47:46 +0530 Subject: [PATCH 1/2] perf: room-broadcast NEW_CHANGES fan-out for steady-state clients behind feature flag (#7780) --- settings.json.docker | 6 + settings.json.template | 9 ++ src/node/handler/PadMessageHandler.ts | 126 ++++++++++++++++-- src/node/utils/Settings.ts | 6 + .../roomBroadcastNewChanges-defaults.test.ts | 8 ++ 5 files changed, 147 insertions(+), 8 deletions(-) create mode 100644 src/tests/backend-new/specs/roomBroadcastNewChanges-defaults.test.ts diff --git a/settings.json.docker b/settings.json.docker index 5da12e7e82b..2f9c0911bfc 100644 --- a/settings.json.docker +++ b/settings.json.docker @@ -683,6 +683,12 @@ */ "socketTransportProtocols" : ["websocket", "polling"], + /* + * Performance experiment (#7780): room-broadcast NEW_CHANGES to steady-state + * recipients, keep per-socket catch-up for stragglers. + */ + "roomBroadcastNewChanges": "${ROOM_BROADCAST_NEW_CHANGES:false}", + "socketIo": { /* * Maximum permitted client message size (in bytes). This controls the diff --git a/settings.json.template b/settings.json.template index db4b0ba3a62..1ba99eb7cb3 100644 --- a/settings.json.template +++ b/settings.json.template @@ -765,6 +765,15 @@ */ "socketTransportProtocols" : ["websocket", "polling"], + /* + * Performance experiment (#7780): use room-broadcast NEW_CHANGES fan-out for + * steady-state recipients (clients at head-1), while stragglers still catch + * up via per-socket emits. + * + * Keep disabled unless explicitly benchmarking or validating this behavior. + */ + "roomBroadcastNewChanges": false, + "socketIo": { /* * Maximum permitted client message size (in bytes). This controls the diff --git a/src/node/handler/PadMessageHandler.ts b/src/node/handler/PadMessageHandler.ts index 592e5b9e857..68194d8ee4c 100644 --- a/src/node/handler/PadMessageHandler.ts +++ b/src/node/handler/PadMessageHandler.ts @@ -1010,6 +1010,122 @@ exports.updatePadClients = async (pad: PadType) => { // benefits of running this in parallel, // but benefit of reusing cached revision object is HUGE const revCache:MapArrayType = {}; + const forWireCache:MapArrayType = {}; + + const getRevision = async (rev: any) => { + let revision = revCache[rev]; + if (!revision) { + revision = await pad.getRevision(rev); + revCache[rev] = revision; + } + return revision; + }; + + const getForWire = (rev: number, revision: any) => { + let forWire = forWireCache[rev]; + if (!forWire) { + forWire = prepareForWire(revision.changeset, pad.pool); + forWireCache[rev] = forWire; + } + return forWire; + }; + + if (settings.roomBroadcastNewChanges) { + const headRev = pad.getHeadRevisionNumber(); + const syncedSocketIds: string[] = []; + const stragglerSockets: any[] = []; + + for (const socket of roomSockets) { + const sessioninfo = sessioninfos[socket.id]; + // The user might have disconnected since _getRoomSockets() was called. + if (sessioninfo == null) continue; + if (sessioninfo.rev === headRev - 1) { + syncedSocketIds.push(socket.id); + } else { + stragglerSockets.push(socket); + } + } + + // Broadcast the latest revision once for all steady-state recipients (head-1). + if (syncedSocketIds.length > 0 && headRev >= 0) { + const revision = await getRevision(headRev); + const author = revision.meta.author; + const currentTime = revision.meta.timestamp; + const forWire = getForWire(headRev, revision); + const exemplarSession = sessioninfos[syncedSocketIds[0]]; + const msg = { + type: 'COLLABROOM', + data: { + type: 'NEW_CHANGES', + newRev: headRev, + changeset: forWire.translated, + apool: forWire.pool, + author, + currentTime, + timeDelta: currentTime - exemplarSession.time, + }, + }; + const ns = socketio?.sockets; + if (ns != null) { + if (stragglerSockets.length > 0) { + ns.to(pad.id).except(stragglerSockets.map((s) => s.id)).emit('message', msg); + } else { + ns.to(pad.id).emit('message', msg); + } + } else { + // Fallback to direct socket emits if namespace is unavailable. + for (const socketId of syncedSocketIds) { + const socket = roomSockets.find((s) => s.id === socketId); + if (socket != null) socket.emit('message', msg); + } + } + recordSocketEmit('NEW_CHANGES'); + + for (const socketId of syncedSocketIds) { + const sessioninfo = sessioninfos[socketId]; + if (sessioninfo == null || sessioninfo.rev !== headRev - 1) continue; + sessioninfo.time = currentTime; + sessioninfo.rev = headRev; + } + } + + // Stragglers still need per-socket catch-up to preserve rev+1 semantics. + await Promise.all(stragglerSockets.map(async (socket) => { + const sessioninfo = sessioninfos[socket.id]; + if (sessioninfo == null) return; + + while (sessioninfo.rev < pad.getHeadRevisionNumber()) { + const r = sessioninfo.rev + 1; + const revision = await getRevision(r); + const author = revision.meta.author; + const currentTime = revision.meta.timestamp; + const forWire = getForWire(r, revision); + + const msg = { + type: 'COLLABROOM', + data: { + type: 'NEW_CHANGES', + newRev: r, + changeset: forWire.translated, + apool: forWire.pool, + author, + currentTime, + timeDelta: currentTime - sessioninfo.time, + }, + }; + try { + socket.emit('message', msg); + recordSocketEmit('NEW_CHANGES'); + } catch (err:any) { + messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`); + return; + } + sessioninfo.time = currentTime; + sessioninfo.rev = r; + } + })); + return; + } await Promise.all(roomSockets.map(async (socket) => { const sessioninfo = sessioninfos[socket.id]; @@ -1018,17 +1134,11 @@ exports.updatePadClients = async (pad: PadType) => { while (sessioninfo.rev < pad.getHeadRevisionNumber()) { const r = sessioninfo.rev + 1; - let revision = revCache[r]; - if (!revision) { - revision = await pad.getRevision(r); - revCache[r] = revision; - } + const revision = await getRevision(r); const author = revision.meta.author; - const revChangeset = revision.changeset; const currentTime = revision.meta.timestamp; - - const forWire = prepareForWire(revChangeset, pad.pool); + const forWire = getForWire(r, revision); const msg = { type: 'COLLABROOM', data: { diff --git a/src/node/utils/Settings.ts b/src/node/utils/Settings.ts index 021d3f6bfb6..7a33935dd72 100644 --- a/src/node/utils/Settings.ts +++ b/src/node/utils/Settings.ts @@ -275,6 +275,7 @@ export type SettingsType = { ipLogging: 'full' | 'truncated' | 'anonymous', automaticReconnectionTimeout: number, loadTest: boolean, + roomBroadcastNewChanges: boolean, scalingDiveMetrics: boolean, dumpOnUncleanExit: boolean, indentationOnNewLine: boolean, @@ -698,6 +699,11 @@ const settings: SettingsType = { * Disable Load Testing */ loadTest: false, + /** + * Use room-broadcast for steady-state NEW_CHANGES fan-out. Disabled by default so this + * optimization can be A/B tested safely. + */ + roomBroadcastNewChanges: false, /** * Expose extra Prometheus metrics designed for the scaling-dive load-test harness * (ether/etherpad#7756): etherpad_pad_users{padId}, etherpad_changeset_apply_duration_seconds, diff --git a/src/tests/backend-new/specs/roomBroadcastNewChanges-defaults.test.ts b/src/tests/backend-new/specs/roomBroadcastNewChanges-defaults.test.ts new file mode 100644 index 00000000000..909caaf902a --- /dev/null +++ b/src/tests/backend-new/specs/roomBroadcastNewChanges-defaults.test.ts @@ -0,0 +1,8 @@ +import {describe, it, expect} from 'vitest'; +import settings from '../../../node/utils/Settings'; + +describe('room broadcast NEW_CHANGES defaults', () => { + it('roomBroadcastNewChanges defaults to false', () => { + expect(settings.roomBroadcastNewChanges).toBe(false); + }); +}); From e5c2df1d0b189ba86b1668489002b265fe3493b3 Mon Sep 17 00:00:00 2001 From: kishansomaiya Date: Tue, 26 May 2026 15:29:19 +0530 Subject: [PATCH 2/2] Updated the comments --- settings.json.docker | 2 +- settings.json.template | 2 +- src/node/handler/PadMessageHandler.ts | 3 --- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/settings.json.docker b/settings.json.docker index 2f9c0911bfc..6aa64a7183e 100644 --- a/settings.json.docker +++ b/settings.json.docker @@ -684,7 +684,7 @@ "socketTransportProtocols" : ["websocket", "polling"], /* - * Performance experiment (#7780): room-broadcast NEW_CHANGES to steady-state + * room-broadcast NEW_CHANGES to steady-state * recipients, keep per-socket catch-up for stragglers. */ "roomBroadcastNewChanges": "${ROOM_BROADCAST_NEW_CHANGES:false}", diff --git a/settings.json.template b/settings.json.template index 1ba99eb7cb3..c433372ec87 100644 --- a/settings.json.template +++ b/settings.json.template @@ -766,7 +766,7 @@ "socketTransportProtocols" : ["websocket", "polling"], /* - * Performance experiment (#7780): use room-broadcast NEW_CHANGES fan-out for + * use room-broadcast NEW_CHANGES fan-out for * steady-state recipients (clients at head-1), while stragglers still catch * up via per-socket emits. * diff --git a/src/node/handler/PadMessageHandler.ts b/src/node/handler/PadMessageHandler.ts index 68194d8ee4c..b9eb8b7fdee 100644 --- a/src/node/handler/PadMessageHandler.ts +++ b/src/node/handler/PadMessageHandler.ts @@ -1046,7 +1046,6 @@ exports.updatePadClients = async (pad: PadType) => { } } - // Broadcast the latest revision once for all steady-state recipients (head-1). if (syncedSocketIds.length > 0 && headRev >= 0) { const revision = await getRevision(headRev); const author = revision.meta.author; @@ -1073,7 +1072,6 @@ exports.updatePadClients = async (pad: PadType) => { ns.to(pad.id).emit('message', msg); } } else { - // Fallback to direct socket emits if namespace is unavailable. for (const socketId of syncedSocketIds) { const socket = roomSockets.find((s) => s.id === socketId); if (socket != null) socket.emit('message', msg); @@ -1089,7 +1087,6 @@ exports.updatePadClients = async (pad: PadType) => { } } - // Stragglers still need per-socket catch-up to preserve rev+1 semantics. await Promise.all(stragglerSockets.map(async (socket) => { const sessioninfo = sessioninfos[socket.id]; if (sessioninfo == null) return;