From 2b4b9c26ee251267997a953994b063c93fe0a440 Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Fri, 12 Jun 2026 20:25:08 +0200 Subject: [PATCH] perf(persistence): Move thread messages pagination to SQL Back-port of #2688 to v9. Refactors `MessageDao.getThreadMessagesByParentId` to filter and paginate thread replies at the SQL level instead of loading the full thread into memory, and adds support for all four `PaginationParams` cursor variants (`lessThan` / `lessThanOrEqual` / `greaterThan` / `greaterThanOrEqual`) with a `(createdAt, id)` tiebreaker. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/stream_chat_persistence/CHANGELOG.md | 10 + .../lib/src/dao/message_dao.dart | 125 +++-- .../test/src/dao/message_dao_test.dart | 432 ++++++++++++++++-- 3 files changed, 492 insertions(+), 75 deletions(-) diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index 73a120207..e77e5352b 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -1,3 +1,13 @@ +## Upcoming changes + +🚀 Performance + +- Read only the thread replies matching the `PaginationParams` from DB when calling `MessageDao.getThreadMessagesByParentId` instead of reading all replies for the thread and applying pagination in memory. + +🐞 Fixed + +- `MessageDao.getThreadMessagesByParentId` now honours all `PaginationParams` cursor variants (`lessThan`/`lessThanOrEqual`/`greaterThan`/`greaterThanOrEqual`) and returns the page of replies closest to the cursor. + ## 9.25.0 🚀 Performance diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart index 07bca8e12..edb6790fb 100644 --- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart @@ -202,44 +202,89 @@ class MessageDao extends DatabaseAccessor String parentId, { PaginationParams? options, }) async { - final rows = await (select(messages).join([ + final ( + lessThanCursor, + lessThanOrEqualCursor, + greaterThanCursor, + greaterThanOrEqualCursor, + ) = await ( + _lookupThreadCursor(parentId, options?.lessThan), + _lookupThreadCursor(parentId, options?.lessThanOrEqual), + _lookupThreadCursor(parentId, options?.greaterThan), + _lookupThreadCursor(parentId, options?.greaterThanOrEqual), + ).wait; + + // When the caller is paginating forward (greaterThan / greaterThanOrEqual + // only), order ASC so the SQL `LIMIT` retains the N replies immediately + // AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N replies + // closest to a `lessThan` cursor (or the thread's tail when no cursor is + // set). The final result is always reshaped to ASC for display. + final isForwardPagination = + (greaterThanCursor != null || greaterThanOrEqualCursor != null) && + lessThanCursor == null && + lessThanOrEqualCursor == null; + + final orderBy = isForwardPagination + ? [ + OrderingTerm.asc(messages.createdAt), + OrderingTerm.asc(messages.id), + ] + : [ + OrderingTerm.desc(messages.createdAt), + OrderingTerm.desc(messages.id), + ]; + + final query = select(messages).join([ leftOuterJoin(_users, messages.userId.equalsExp(_users.id)), leftOuterJoin( _pinnedByUsers, messages.pinnedByUserId.equalsExp(_pinnedByUsers.id), ), ]) - ..where(messages.parentId.isNotNull()) - ..where(messages.parentId.equals(parentId)) - ..orderBy([OrderingTerm.asc(messages.createdAt)])) - .get(); - final msgList = await _messagesFromJoinRows(rows); - - if (msgList.isNotEmpty) { - final mutable = msgList.toList(); - if (options?.lessThan != null) { - final lessThanIndex = mutable.indexWhere( - (m) => m.id == options!.lessThan, - ); - if (lessThanIndex != -1) { - mutable.removeRange(lessThanIndex, mutable.length); - } - } - if (options?.greaterThan != null) { - final greaterThanIndex = mutable.indexWhere( - (m) => m.id == options!.greaterThan, - ); - if (greaterThanIndex != -1) { - mutable.removeRange(0, greaterThanIndex); - } - } - final limit = options?.limit; - if (limit != null && limit > 0) { - return mutable.take(limit).toList(); - } - return mutable; + ..where(messages.parentId.equals(parentId)) + ..orderBy(orderBy); + + // Cursor predicates compare the full `(createdAt, id)` tuple — the same + // key used in ORDER BY — so replies sharing a `createdAt` with the cursor + // fall on the correct side of the boundary. Filtering on `createdAt` + // alone would skip or repeat those siblings across pages. + if (lessThanCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerThanValue(c.id)), + ); + } + if (lessThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isSmallerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isSmallerOrEqualValue(c.id)), + ); + } + if (greaterThanCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerThanValue(c.id)), + ); } - return msgList; + if (greaterThanOrEqualCursor case final c?) { + query.where( + messages.createdAt.isBiggerThanValue(c.createdAt) | + (messages.createdAt.equals(c.createdAt) & + messages.id.isBiggerOrEqualValue(c.id)), + ); + } + + if (options != null) { + query.limit(options.limit); + } + + final rows = await query.get(); + final orderedRows = isForwardPagination ? rows : rows.reversed.toList(); + + return _messagesFromJoinRows(orderedRows); } /// Returns all the messages of a channel by matching @@ -373,4 +418,22 @@ class MessageDao extends DatabaseAccessor if (createdAt == null) return null; return (createdAt: createdAt, id: id); } + + /// Returns the `(createdAt, id)` cursor for the thread reply with [id] + /// under [parentId] in the local cache, or `null` if [id] is null or no + /// such reply is cached. + Future<({DateTime createdAt, String id})?> _lookupThreadCursor( + String parentId, + String? id, + ) async { + if (id == null) return null; + final createdAt = await (selectOnly(messages) + ..addColumns([messages.createdAt]) + ..where(messages.id.equals(id)) + ..where(messages.parentId.equals(parentId))) + .map((row) => row.read(messages.createdAt)) + .getSingleOrNull(); + if (createdAt == null) return null; + return (createdAt: createdAt, id: id); + } } diff --git a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart index b870ebd2c..aa788f37d 100644 --- a/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/message_dao_test.dart @@ -275,59 +275,403 @@ void main() { } }); - test('getThreadMessagesByParentId', () async { + group('getThreadMessagesByParentId', () { const cid = 'test:Cid'; const parentId = 'testMessageId${cid}0'; + String threadId(int i) => 'testThreadMessageId$cid$i'; + + test('getThreadMessagesByParentId', () async { + // Messages should be empty initially + final messages = await messageDao.getThreadMessagesByParentId(parentId); + expect(messages, isEmpty); + + // Preparing test data + final insertedMessages = await _prepareTestData(cid, threads: true); + expect(insertedMessages, isNotEmpty); + + // Should fetch all the thread messages of parentId + final threadMessages = + await messageDao.getThreadMessagesByParentId(parentId); + expect(threadMessages.length, 1); + expect(threadMessages.first.parentId, parentId); + }); - // Messages should be empty initially - final messages = await messageDao.getThreadMessagesByParentId(parentId); - expect(messages, isEmpty); + test('getThreadMessagesByParentId along with pagination', () async { + const options = PaginationParams( + limit: 15, + lessThan: 'testThreadMessageId${cid}25', + greaterThan: 'testThreadMessageId${cid}5', + ); - // Preparing test data - final insertedMessages = await _prepareTestData(cid, threads: true); - expect(insertedMessages, isNotEmpty); + // Messages should be empty initially + final messages = await messageDao.getThreadMessagesByParentId( + parentId, + options: options, + ); + expect(messages, isEmpty); - // Should fetch all the thread messages of parentId - final threadMessages = - await messageDao.getThreadMessagesByParentId(parentId); - expect(threadMessages.length, 1); - expect(threadMessages.first.parentId, parentId); - }); + // Preparing test data + final insertedMessages = await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + expect(insertedMessages, isNotEmpty); - test('getThreadMessagesByParentId along with pagination', () async { - const cid = 'test:Cid'; - const parentId = 'testMessageId${cid}0'; - const options = PaginationParams( - limit: 15, - lessThan: 'testThreadMessageId${cid}25', - greaterThan: 'testThreadMessageId${cid}5', - ); + // Should fetch all the thread messages of parentId and apply the + // pagination. + final threadMessages = await messageDao.getThreadMessagesByParentId( + parentId, + options: options, + ); + expect(threadMessages.length, 15); + expect(threadMessages.first.parentId, parentId); + // lessThan is set → backward pagination → DESC + LIMIT then reverse. + // Filter: id6..id24. Take 15 closest to id25 → id10..id24. + expect(threadMessages.first.id, 'testThreadMessageId${cid}10'); + expect(threadMessages.last.id, 'testThreadMessageId${cid}24'); + }); - // Messages should be empty initially - final messages = await messageDao.getThreadMessagesByParentId( - parentId, - options: options, - ); - expect(messages, isEmpty); + test('limit only returns the latest N replies', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); - // Preparing test data - final insertedMessages = await _prepareTestData( - cid, - threads: true, - mapAllThreadToFirstMessage: true, - count: 30, - ); - expect(insertedMessages, isNotEmpty); + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams(limit: 5), + ); - // Should fetch all the thread messages of parentId and apply the pagination - final threadMessages = await messageDao.getThreadMessagesByParentId( - parentId, - options: options, - ); - expect(threadMessages.length, 15); - expect(threadMessages.first.parentId, parentId); - expect(threadMessages.first.id, 'testThreadMessageId${cid}5'); - expect(threadMessages.last.id, 'testThreadMessageId${cid}19'); + // No cursor → backward pagination → DESC + LIMIT then reversed. The + // result is the newest 5 replies (the tail of the thread), in ASC + // order: id25..id29. + expect(replies.length, 5); + expect(replies.first.id, threadId(25)); + expect(replies.last.id, threadId(29)); + }); + + test('lessThan only excludes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, lessThan: threadId(25)), + ); + + // Strictly before id25 → id0..id24. Backward pagination keeps the 5 + // closest to the cursor → id20..id24. + expect(replies.length, 5); + expect(replies.first.id, threadId(20)); + expect(replies.last.id, threadId(24)); + }); + + test('lessThanOrEqual only includes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, lessThanOrEqual: threadId(25)), + ); + + // Up to and including id25 → id0..id25. Backward pagination keeps the + // 5 closest to the cursor → id21..id25. + expect(replies.length, 5); + expect(replies.first.id, threadId(21)); + expect(replies.last.id, threadId(25)); + }); + + test('greaterThan only excludes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, greaterThan: threadId(5)), + ); + + // Strictly after id5 → id6..id29, capped to 5 → id6..id10. + expect(replies.length, 5); + expect(replies.first.id, threadId(6)); + expect(replies.last.id, threadId(10)); + }); + + test('greaterThanOrEqual only includes the cursor', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(limit: 5, greaterThanOrEqual: threadId(5)), + ); + + // From id5 onwards → id5..id29, capped to 5 → id5..id9. + expect(replies.length, 5); + expect(replies.first.id, threadId(5)); + expect(replies.last.id, threadId(9)); + }); + + test('lessThan id not in result set is a no-op', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams( + limit: 100, + lessThan: 'missing-id', + ), + ); + + expect(replies.length, 30); + expect(replies.first.id, threadId(0)); + expect(replies.last.id, threadId(29)); + }); + + test('greaterThan id not in result set is a no-op', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams( + limit: 100, + greaterThan: 'missing-id', + ), + ); + + expect(replies.length, 30); + expect(replies.first.id, threadId(0)); + expect(replies.last.id, threadId(29)); + }); + + test('channel-message id as cursor is a no-op (not a thread reply)', + () async { + // `_prepareTestData` inserts channel messages with `parentId = null` and + // thread replies with `parentId` set. The thread lookup requires + // `parentId.equals(parentId)`, so passing a channel-message id must + // resolve to a no-op so the main query falls back to returning the full + // thread. + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams( + limit: 100, + lessThan: 'testMessageId${cid}5', + ), + ); + + expect(replies.length, 30); + expect(replies.first.id, threadId(0)); + expect(replies.last.id, threadId(29)); + }); + + test('default PaginationParams() applies implicit limit of 10', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams(), + ); + + expect(replies.length, 10); + expect(replies.first.id, threadId(20)); + expect(replies.last.id, threadId(29)); + }); + + test('default limit + lessThan returns last 10 of filtered set', () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(lessThan: threadId(25)), + ); + + expect(replies.length, 10); + expect(replies.first.id, threadId(15)); + expect(replies.last.id, threadId(24)); + }); + + test('default limit + greaterThan returns first 10 after the pivot', + () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(greaterThan: threadId(5)), + ); + + expect(replies.length, 10); + expect(replies.first.id, threadId(6)); + expect(replies.last.id, threadId(15)); + }); + + test('default limit + lessThanOrEqual returns the pivot and 9 before', + () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(lessThanOrEqual: threadId(25)), + ); + + expect(replies.length, 10); + expect(replies.first.id, threadId(16)); + expect(replies.last.id, threadId(25)); + }); + + test('default limit + greaterThanOrEqual returns the pivot and 9 after', + () async { + await _prepareTestData( + cid, + threads: true, + mapAllThreadToFirstMessage: true, + count: 30, + ); + + final replies = await messageDao.getThreadMessagesByParentId( + parentId, + options: PaginationParams(greaterThanOrEqual: threadId(5)), + ); + + expect(replies.length, 10); + expect(replies.first.id, threadId(5)); + expect(replies.last.id, threadId(14)); + }); + + test('cursor with tied createdAt does not skip or duplicate siblings', + () async { + // Three replies share an identical `createdAt`. The SQL ORDER BY uses + // the `(createdAt, id)` tuple, so within the trio the relative order is + // by id (lexicographic). A cursor at `reply_tieB` must split the trio + // cleanly: `reply_tieA` lands on the "before" side, `reply_tieC` on the + // "after" side. A `createdAt`-only WHERE predicate would collapse all + // three into the cursor's bucket and drop or keep them together. + final users = [User(id: 'tieUser')]; + await database.userDao.updateUsers(users); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + + final tie = DateTime.now(); + final earlier = tie.subtract(const Duration(seconds: 1)); + final later = tie.add(const Duration(seconds: 1)); + + Message parent() => Message( + id: parentId, + user: users.first, + createdAt: earlier, + updatedAt: earlier, + text: parentId, + ); + + Message reply(String id, DateTime t) => Message( + id: id, + user: users.first, + parentId: parentId, + createdAt: t, + updatedAt: t, + text: id, + ); + + await messageDao.updateMessages(cid, [ + parent(), + reply('reply_pre', earlier), + reply('reply_tieA', tie), + reply('reply_tieB', tie), + reply('reply_tieC', tie), + reply('reply_post', later), + ]); + + final before = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams(limit: 100, lessThan: 'reply_tieB'), + ); + expect(before.map((m) => m.id).toList(), ['reply_pre', 'reply_tieA']); + + final after = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams(limit: 100, greaterThan: 'reply_tieB'), + ); + expect(after.map((m) => m.id).toList(), ['reply_tieC', 'reply_post']); + + final atOrBefore = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams( + limit: 100, + lessThanOrEqual: 'reply_tieB', + ), + ); + expect( + atOrBefore.map((m) => m.id).toList(), + ['reply_pre', 'reply_tieA', 'reply_tieB'], + ); + + final atOrAfter = await messageDao.getThreadMessagesByParentId( + parentId, + options: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'reply_tieB', + ), + ); + expect( + atOrAfter.map((m) => m.id).toList(), + ['reply_tieB', 'reply_tieC', 'reply_post'], + ); + }); }); test('getMessagesByCid', () async {