Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/stream_chat_persistence/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## Upcoming changes

🚀 Performance

- Read only the thread replies matching the `PaginationParams` from DB when calling `MessageDao.getThreadMessagesByParentId` instead of reading all replies for the thread and applying pagination in memory.

🐞 Fixed

- `MessageDao.getThreadMessagesByParentId` now honours all `PaginationParams` cursor variants (`lessThan`/`lessThanOrEqual`/`greaterThan`/`greaterThanOrEqual`) and returns the page of replies closest to the cursor.

## 9.25.0

🚀 Performance
Expand Down
125 changes: 94 additions & 31 deletions packages/stream_chat_persistence/lib/src/dao/message_dao.dart
Original file line number Diff line number Diff line change
Expand Up @@ -202,44 +202,89 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
String parentId, {
PaginationParams? options,
}) async {
final rows = await (select(messages).join([
final (
lessThanCursor,
lessThanOrEqualCursor,
greaterThanCursor,
greaterThanOrEqualCursor,
) = await (
_lookupThreadCursor(parentId, options?.lessThan),
_lookupThreadCursor(parentId, options?.lessThanOrEqual),
_lookupThreadCursor(parentId, options?.greaterThan),
_lookupThreadCursor(parentId, options?.greaterThanOrEqual),
).wait;

// When the caller is paginating forward (greaterThan / greaterThanOrEqual
// only), order ASC so the SQL `LIMIT` retains the N replies immediately
// AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N replies
// closest to a `lessThan` cursor (or the thread's tail when no cursor is
// set). The final result is always reshaped to ASC for display.
final isForwardPagination =
(greaterThanCursor != null || greaterThanOrEqualCursor != null) &&
lessThanCursor == null &&
lessThanOrEqualCursor == null;

final orderBy = isForwardPagination
? [
OrderingTerm.asc(messages.createdAt),
OrderingTerm.asc(messages.id),
]
: [
OrderingTerm.desc(messages.createdAt),
OrderingTerm.desc(messages.id),
];

final query = select(messages).join([
leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
leftOuterJoin(
_pinnedByUsers,
messages.pinnedByUserId.equalsExp(_pinnedByUsers.id),
),
])
..where(messages.parentId.isNotNull())
..where(messages.parentId.equals(parentId))
..orderBy([OrderingTerm.asc(messages.createdAt)]))
.get();
final msgList = await _messagesFromJoinRows(rows);

if (msgList.isNotEmpty) {
final mutable = msgList.toList();
if (options?.lessThan != null) {
final lessThanIndex = mutable.indexWhere(
(m) => m.id == options!.lessThan,
);
if (lessThanIndex != -1) {
mutable.removeRange(lessThanIndex, mutable.length);
}
}
if (options?.greaterThan != null) {
final greaterThanIndex = mutable.indexWhere(
(m) => m.id == options!.greaterThan,
);
if (greaterThanIndex != -1) {
mutable.removeRange(0, greaterThanIndex);
}
}
final limit = options?.limit;
if (limit != null && limit > 0) {
return mutable.take(limit).toList();
}
return mutable;
..where(messages.parentId.equals(parentId))
..orderBy(orderBy);

// Cursor predicates compare the full `(createdAt, id)` tuple — the same
// key used in ORDER BY — so replies sharing a `createdAt` with the cursor
// fall on the correct side of the boundary. Filtering on `createdAt`
// alone would skip or repeat those siblings across pages.
if (lessThanCursor case final c?) {
query.where(
messages.createdAt.isSmallerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isSmallerThanValue(c.id)),
);
}
if (lessThanOrEqualCursor case final c?) {
query.where(
messages.createdAt.isSmallerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isSmallerOrEqualValue(c.id)),
);
}
if (greaterThanCursor case final c?) {
query.where(
messages.createdAt.isBiggerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isBiggerThanValue(c.id)),
);
}
return msgList;
if (greaterThanOrEqualCursor case final c?) {
query.where(
messages.createdAt.isBiggerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isBiggerOrEqualValue(c.id)),
);
}

if (options != null) {
query.limit(options.limit);
}

final rows = await query.get();
final orderedRows = isForwardPagination ? rows : rows.reversed.toList();

return _messagesFromJoinRows(orderedRows);
}

/// Returns all the messages of a channel by matching
Expand Down Expand Up @@ -373,4 +418,22 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
if (createdAt == null) return null;
return (createdAt: createdAt, id: id);
}

/// Returns the `(createdAt, id)` cursor for the thread reply with [id]
/// under [parentId] in the local cache, or `null` if [id] is null or no
/// such reply is cached.
Future<({DateTime createdAt, String id})?> _lookupThreadCursor(
String parentId,
String? id,
) async {
if (id == null) return null;
final createdAt = await (selectOnly(messages)
..addColumns([messages.createdAt])
..where(messages.id.equals(id))
..where(messages.parentId.equals(parentId)))
.map((row) => row.read(messages.createdAt))
.getSingleOrNull();
if (createdAt == null) return null;
return (createdAt: createdAt, id: id);
}
}
Loading
Loading