Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions settings.json.docker
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,12 @@
*/
"socketTransportProtocols" : ["websocket", "polling"],

/*
* room-broadcast NEW_CHANGES to steady-state
* recipients, keep per-socket catch-up for stragglers.
*/
"roomBroadcastNewChanges": "${ROOM_BROADCAST_NEW_CHANGES:false}",

"socketIo": {
/*
* Maximum permitted client message size (in bytes). This controls the
Expand Down
9 changes: 9 additions & 0 deletions settings.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,15 @@
*/
"socketTransportProtocols" : ["websocket", "polling"],

/*
* use room-broadcast NEW_CHANGES fan-out for
* steady-state recipients (clients at head-1), while stragglers still catch
* up via per-socket emits.
*
* Keep disabled unless explicitly benchmarking or validating this behavior.
*/
"roomBroadcastNewChanges": false,

"socketIo": {
/*
* Maximum permitted client message size (in bytes). This controls the
Expand Down
123 changes: 115 additions & 8 deletions src/node/handler/PadMessageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,119 @@ exports.updatePadClients = async (pad: PadType) => {
// benefits of running this in parallel,
// but benefit of reusing cached revision object is HUGE
const revCache:MapArrayType<any> = {};
const forWireCache:MapArrayType<any> = {};

const getRevision = async (rev: any) => {
let revision = revCache[rev];
if (!revision) {
revision = await pad.getRevision(rev);
revCache[rev] = revision;
}
return revision;
};

const getForWire = (rev: number, revision: any) => {
let forWire = forWireCache[rev];
if (!forWire) {
forWire = prepareForWire(revision.changeset, pad.pool);
forWireCache[rev] = forWire;
}
return forWire;
};

if (settings.roomBroadcastNewChanges) {
const headRev = pad.getHeadRevisionNumber();
const syncedSocketIds: string[] = [];
const stragglerSockets: any[] = [];

for (const socket of roomSockets) {
const sessioninfo = sessioninfos[socket.id];
// The user might have disconnected since _getRoomSockets() was called.
if (sessioninfo == null) continue;
if (sessioninfo.rev === headRev - 1) {
syncedSocketIds.push(socket.id);
} else {
stragglerSockets.push(socket);
}
}

if (syncedSocketIds.length > 0 && headRev >= 0) {
const revision = await getRevision(headRev);
const author = revision.meta.author;
const currentTime = revision.meta.timestamp;
const forWire = getForWire(headRev, revision);
const exemplarSession = sessioninfos[syncedSocketIds[0]];
const msg = {
type: 'COLLABROOM',
data: {
type: 'NEW_CHANGES',
newRev: headRev,
changeset: forWire.translated,
apool: forWire.pool,
author,
currentTime,
timeDelta: currentTime - exemplarSession.time,
},
Comment on lines +1054 to +1065
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Broadcast timedelta can crash 🐞 Bug ≡ Correctness

In updatePadClients() with roomBroadcastNewChanges enabled, timeDelta is computed from
exemplarSession.time after awaiting revision fetch, so the exemplar session can disappear
(disconnect) or have undefined time, causing a throw or broadcasting NaN timeDelta to all
steady-state clients. This breaks timeslider time tracking because the client adds timeDelta to
currentTime.
Agent Prompt
### Issue description
When `settings.roomBroadcastNewChanges` is enabled, `updatePadClients()` builds a single `NEW_CHANGES` message for all steady-state sockets and sets `timeDelta` using `currentTime - exemplarSession.time`. Because this happens after an `await`, the exemplar session can be removed from `sessioninfos` (disconnect) or have a non-numeric/undefined `time`, causing either a runtime exception or `NaN` `timeDelta` broadcasted to many clients.

### Issue Context
- `sessioninfos` entries are deleted on disconnect, so a socket can disappear between the initial scan and the later exemplar lookup.
- Some sessions may have missing/undefined `time` (for example, reconnect path sets `rev` but does not initialize `time`), and there is an existing comment warning that missing `time` produces `timeDelta=NaN`.
- Timeslider client code applies `timeDelta` to `padContents.currentTime`, so `NaN` corrupts time tracking.

### Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[1033-1090]
- src/node/handler/PadMessageHandler.ts[246-250]
- src/node/handler/PadMessageHandler.ts[1307-1314]
- src/node/handler/PadMessageHandler.ts[1502-1511]
- src/static/js/broadcast.ts[206-268]

### Implementation notes
- Avoid depending on any single session object for `timeDelta` in the broadcast message. Prefer computing `timeDelta` from revision timestamps, e.g.:
  - `currentTime = revision.meta.timestamp`
  - `prevTime = headRev > 0 ? (await getRevision(headRev - 1)).meta.timestamp : currentTime`
  - `timeDelta = currentTime - prevTime`
- If you keep an exemplar-based fast path, guard it: ensure `exemplarSession` exists and `typeof exemplarSession.time === 'number'`, otherwise fall back to revision timestamp delta.
- (Optional hardening) Initialize `sessionInfo.time` on the reconnect path similarly to the normal connect path to prevent future `NaN` deltas in per-socket catch-up.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

};
const ns = socketio?.sockets;
if (ns != null) {
if (stragglerSockets.length > 0) {
ns.to(pad.id).except(stragglerSockets.map((s) => s.id)).emit('message', msg);
} else {
ns.to(pad.id).emit('message', msg);
}
} else {
for (const socketId of syncedSocketIds) {
const socket = roomSockets.find((s) => s.id === socketId);
if (socket != null) socket.emit('message', msg);
}
}
recordSocketEmit('NEW_CHANGES');

for (const socketId of syncedSocketIds) {
const sessioninfo = sessioninfos[socketId];
if (sessioninfo == null || sessioninfo.rev !== headRev - 1) continue;
sessioninfo.time = currentTime;
sessioninfo.rev = headRev;
}
}

await Promise.all(stragglerSockets.map(async (socket) => {
const sessioninfo = sessioninfos[socket.id];
if (sessioninfo == null) return;

while (sessioninfo.rev < pad.getHeadRevisionNumber()) {
const r = sessioninfo.rev + 1;
const revision = await getRevision(r);
const author = revision.meta.author;
const currentTime = revision.meta.timestamp;
const forWire = getForWire(r, revision);

const msg = {
type: 'COLLABROOM',
data: {
type: 'NEW_CHANGES',
newRev: r,
changeset: forWire.translated,
apool: forWire.pool,
author,
currentTime,
timeDelta: currentTime - sessioninfo.time,
},
};
try {
socket.emit('message', msg);
recordSocketEmit('NEW_CHANGES');
} catch (err:any) {
messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`);
return;
}
sessioninfo.time = currentTime;
sessioninfo.rev = r;
}
}));
return;
}

await Promise.all(roomSockets.map(async (socket) => {
const sessioninfo = sessioninfos[socket.id];
Expand All @@ -1018,17 +1131,11 @@ exports.updatePadClients = async (pad: PadType) => {

while (sessioninfo.rev < pad.getHeadRevisionNumber()) {
const r = sessioninfo.rev + 1;
let revision = revCache[r];
if (!revision) {
revision = await pad.getRevision(r);
revCache[r] = revision;
}
const revision = await getRevision(r);

const author = revision.meta.author;
const revChangeset = revision.changeset;
const currentTime = revision.meta.timestamp;

const forWire = prepareForWire(revChangeset, pad.pool);
const forWire = getForWire(r, revision);
const msg = {
type: 'COLLABROOM',
data: {
Expand Down
6 changes: 6 additions & 0 deletions src/node/utils/Settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ export type SettingsType = {
ipLogging: 'full' | 'truncated' | 'anonymous',
automaticReconnectionTimeout: number,
loadTest: boolean,
roomBroadcastNewChanges: boolean,
scalingDiveMetrics: boolean,
dumpOnUncleanExit: boolean,
indentationOnNewLine: boolean,
Expand Down Expand Up @@ -698,6 +699,11 @@ const settings: SettingsType = {
* Disable Load Testing
*/
loadTest: false,
/**
* Use room-broadcast for steady-state NEW_CHANGES fan-out. Disabled by default so this
* optimization can be A/B tested safely.
*/
roomBroadcastNewChanges: false,
/**
* Expose extra Prometheus metrics designed for the scaling-dive load-test harness
* (ether/etherpad#7756): etherpad_pad_users{padId}, etherpad_changeset_apply_duration_seconds,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import {describe, it, expect} from 'vitest';
import settings from '../../../node/utils/Settings';

describe('room broadcast NEW_CHANGES defaults', () => {
it('roomBroadcastNewChanges defaults to false', () => {
expect(settings.roomBroadcastNewChanges).toBe(false);
});
});