diff --git a/settings.json.docker b/settings.json.docker index 5da12e7e82b..6aa64a7183e 100644 --- a/settings.json.docker +++ b/settings.json.docker @@ -683,6 +683,12 @@ */ "socketTransportProtocols" : ["websocket", "polling"], + /* + * room-broadcast NEW_CHANGES to steady-state + * recipients, keep per-socket catch-up for stragglers. + */ + "roomBroadcastNewChanges": "${ROOM_BROADCAST_NEW_CHANGES:false}", + "socketIo": { /* * Maximum permitted client message size (in bytes). This controls the diff --git a/settings.json.template b/settings.json.template index db4b0ba3a62..c433372ec87 100644 --- a/settings.json.template +++ b/settings.json.template @@ -765,6 +765,15 @@ */ "socketTransportProtocols" : ["websocket", "polling"], + /* + * use room-broadcast NEW_CHANGES fan-out for + * steady-state recipients (clients at head-1), while stragglers still catch + * up via per-socket emits. + * + * Keep disabled unless explicitly benchmarking or validating this behavior. + */ + "roomBroadcastNewChanges": false, + "socketIo": { /* * Maximum permitted client message size (in bytes). This controls the diff --git a/src/node/handler/PadMessageHandler.ts b/src/node/handler/PadMessageHandler.ts index 592e5b9e857..b9eb8b7fdee 100644 --- a/src/node/handler/PadMessageHandler.ts +++ b/src/node/handler/PadMessageHandler.ts @@ -1010,6 +1010,119 @@ exports.updatePadClients = async (pad: PadType) => { // benefits of running this in parallel, // but benefit of reusing cached revision object is HUGE const revCache:MapArrayType = {}; + const forWireCache:MapArrayType = {}; + + const getRevision = async (rev: any) => { + let revision = revCache[rev]; + if (!revision) { + revision = await pad.getRevision(rev); + revCache[rev] = revision; + } + return revision; + }; + + const getForWire = (rev: number, revision: any) => { + let forWire = forWireCache[rev]; + if (!forWire) { + forWire = prepareForWire(revision.changeset, pad.pool); + forWireCache[rev] = forWire; + } + return forWire; + }; + + if (settings.roomBroadcastNewChanges) { + const headRev = pad.getHeadRevisionNumber(); + const syncedSocketIds: string[] = []; + const stragglerSockets: any[] = []; + + for (const socket of roomSockets) { + const sessioninfo = sessioninfos[socket.id]; + // The user might have disconnected since _getRoomSockets() was called. + if (sessioninfo == null) continue; + if (sessioninfo.rev === headRev - 1) { + syncedSocketIds.push(socket.id); + } else { + stragglerSockets.push(socket); + } + } + + if (syncedSocketIds.length > 0 && headRev >= 0) { + const revision = await getRevision(headRev); + const author = revision.meta.author; + const currentTime = revision.meta.timestamp; + const forWire = getForWire(headRev, revision); + const exemplarSession = sessioninfos[syncedSocketIds[0]]; + const msg = { + type: 'COLLABROOM', + data: { + type: 'NEW_CHANGES', + newRev: headRev, + changeset: forWire.translated, + apool: forWire.pool, + author, + currentTime, + timeDelta: currentTime - exemplarSession.time, + }, + }; + const ns = socketio?.sockets; + if (ns != null) { + if (stragglerSockets.length > 0) { + ns.to(pad.id).except(stragglerSockets.map((s) => s.id)).emit('message', msg); + } else { + ns.to(pad.id).emit('message', msg); + } + } else { + for (const socketId of syncedSocketIds) { + const socket = roomSockets.find((s) => s.id === socketId); + if (socket != null) socket.emit('message', msg); + } + } + recordSocketEmit('NEW_CHANGES'); + + for (const socketId of syncedSocketIds) { + const sessioninfo = sessioninfos[socketId]; + if (sessioninfo == null || sessioninfo.rev !== headRev - 1) continue; + sessioninfo.time = currentTime; + sessioninfo.rev = headRev; + } + } + + await Promise.all(stragglerSockets.map(async (socket) => { + const sessioninfo = sessioninfos[socket.id]; + if (sessioninfo == null) return; + + while (sessioninfo.rev < pad.getHeadRevisionNumber()) { + const r = sessioninfo.rev + 1; + const revision = await getRevision(r); + const author = revision.meta.author; + const currentTime = revision.meta.timestamp; + const forWire = getForWire(r, revision); + + const msg = { + type: 'COLLABROOM', + data: { + type: 'NEW_CHANGES', + newRev: r, + changeset: forWire.translated, + apool: forWire.pool, + author, + currentTime, + timeDelta: currentTime - sessioninfo.time, + }, + }; + try { + socket.emit('message', msg); + recordSocketEmit('NEW_CHANGES'); + } catch (err:any) { + messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`); + return; + } + sessioninfo.time = currentTime; + sessioninfo.rev = r; + } + })); + return; + } await Promise.all(roomSockets.map(async (socket) => { const sessioninfo = sessioninfos[socket.id]; @@ -1018,17 +1131,11 @@ exports.updatePadClients = async (pad: PadType) => { while (sessioninfo.rev < pad.getHeadRevisionNumber()) { const r = sessioninfo.rev + 1; - let revision = revCache[r]; - if (!revision) { - revision = await pad.getRevision(r); - revCache[r] = revision; - } + const revision = await getRevision(r); const author = revision.meta.author; - const revChangeset = revision.changeset; const currentTime = revision.meta.timestamp; - - const forWire = prepareForWire(revChangeset, pad.pool); + const forWire = getForWire(r, revision); const msg = { type: 'COLLABROOM', data: { diff --git a/src/node/utils/Settings.ts b/src/node/utils/Settings.ts index 021d3f6bfb6..7a33935dd72 100644 --- a/src/node/utils/Settings.ts +++ b/src/node/utils/Settings.ts @@ -275,6 +275,7 @@ export type SettingsType = { ipLogging: 'full' | 'truncated' | 'anonymous', automaticReconnectionTimeout: number, loadTest: boolean, + roomBroadcastNewChanges: boolean, scalingDiveMetrics: boolean, dumpOnUncleanExit: boolean, indentationOnNewLine: boolean, @@ -698,6 +699,11 @@ const settings: SettingsType = { * Disable Load Testing */ loadTest: false, + /** + * Use room-broadcast for steady-state NEW_CHANGES fan-out. Disabled by default so this + * optimization can be A/B tested safely. + */ + roomBroadcastNewChanges: false, /** * Expose extra Prometheus metrics designed for the scaling-dive load-test harness * (ether/etherpad#7756): etherpad_pad_users{padId}, etherpad_changeset_apply_duration_seconds, diff --git a/src/tests/backend-new/specs/roomBroadcastNewChanges-defaults.test.ts b/src/tests/backend-new/specs/roomBroadcastNewChanges-defaults.test.ts new file mode 100644 index 00000000000..909caaf902a --- /dev/null +++ b/src/tests/backend-new/specs/roomBroadcastNewChanges-defaults.test.ts @@ -0,0 +1,8 @@ +import {describe, it, expect} from 'vitest'; +import settings from '../../../node/utils/Settings'; + +describe('room broadcast NEW_CHANGES defaults', () => { + it('roomBroadcastNewChanges defaults to false', () => { + expect(settings.roomBroadcastNewChanges).toBe(false); + }); +});