From af849fdc98efedd6524b428b1df28708f42d724b Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Wed, 20 May 2026 21:05:52 +0200 Subject: [PATCH 1/3] ConversationStore.Migration: Use fewer connections while checking for pending deletes Previous code was using 1 session per user id and conv id. --- .../optimize-cleanup-pending-deletes | 1 + .../ConversationStore/Migration/Cleanup.hs | 51 ++++++++++--------- 2 files changed, 28 insertions(+), 24 deletions(-) create mode 100644 changelog.d/3-bug-fixes/optimize-cleanup-pending-deletes diff --git a/changelog.d/3-bug-fixes/optimize-cleanup-pending-deletes b/changelog.d/3-bug-fixes/optimize-cleanup-pending-deletes new file mode 100644 index 00000000000..a4ca7c4067f --- /dev/null +++ b/changelog.d/3-bug-fixes/optimize-cleanup-pending-deletes @@ -0,0 +1 @@ +Reduce connection usage and number of SQL queries for checking for pending PostgreSQL migration cleanup. \ No newline at end of file diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs index e26533da7b5..d18e4e47d4b 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs @@ -22,6 +22,9 @@ module Wire.ConversationStore.Migration.Cleanup where import Cassandra import Data.Id import Data.Map qualified as Map +import Data.Vector (Vector) +import Hasql.Session qualified as Session +import Hasql.Statement (Statement) import Hasql.Statement qualified as Hasql import Hasql.TH import Imports @@ -113,28 +116,28 @@ deleteRemoteMemberStatusesFromCassandra uid = do delete :: PrepQuery W (Identity UserId) () delete = "delete from user_remote_conv where user = ?" -cleanupIfNecessary :: (PGConstraints r, Member (Input ClientState) r, Member ConversationStore r) => [Either ConvId UserId] -> Sem r () -cleanupIfNecessary = mapM_ (either cleanupConvIfNecessary cleanupUserIfNecessary) - -cleanupUserIfNecessary :: (PGConstraints r, Member (Input ClientState) r) => UserId -> Sem r () -cleanupUserIfNecessary uid = - whenM (isPendingDelete DeleteUser uid) $ do - deleteRemoteMemberStatusesFromCassandra uid - markDeletionComplete DeleteUser uid - -cleanupConvIfNecessary :: (PGConstraints r, Member ConversationStore r) => ConvId -> Sem r () -cleanupConvIfNecessary cid = - whenM (isPendingDelete DeleteConv cid) $ do - maybe (pure ()) deleteConv =<< getAllConvData cid - markDeletionComplete DeleteConv cid - -isPendingDelete :: (PGConstraints r) => DeletionType -> Id a -> Sem r Bool -isPendingDelete typ id_ = runStatement (typ, id_) select +cleanupIfNecessary :: forall r. (PGConstraints r, Member (Input ClientState) r, Member ConversationStore r) => [Either ConvId UserId] -> Sem r () +cleanupIfNecessary ids = do + (pendingConvIds, pendingUserIds) <- runSessionWithRetry $ do + let (convIds, userIds) = partitionEithers ids + pendingConvIds <- Session.statement (DeleteConv, convIds) filterPendingDeletes + pendingUserIds <- Session.statement (DeleteUser, userIds) filterPendingDeletes + pure (pendingConvIds, pendingUserIds) + cleanupConvs pendingConvIds + cleanupUsers pendingUserIds where - select = - lmapPG - [singletonStatement|SELECT EXISTS (SELECT 1 - FROM conversation_migration_pending_deletes - WHERE typ = $1 :: text AND id = $2 :: uuid - ) :: boolean - |] + filterPendingDeletes :: Statement (DeletionType, [Id a]) [Id a] + filterPendingDeletes = + dimapPG @(_, Vector _) @_ @(Vector _) @[_] + [vectorStatement|SELECT id :: uuid + FROM conversation_migration_pending_deletes + WHERE typ = $1 :: text AND id = ANY($2 :: uuid[]) + |] + cleanupConvs :: [ConvId] -> Sem r () + cleanupConvs = + mapM_ $ \cid -> + maybe (pure ()) deleteConv =<< getAllConvData cid + + cleanupUsers :: [UserId] -> Sem r () + cleanupUsers = + mapM_ deleteRemoteMemberStatusesFromCassandra From 7757655fe45828db8182055b4a5a478c624c6c98 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Wed, 20 May 2026 21:39:35 +0200 Subject: [PATCH 2/3] hlint --- .../src/Wire/ConversationStore/Migration/Cleanup.hs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs index d18e4e47d4b..80178348948 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs @@ -135,8 +135,9 @@ cleanupIfNecessary ids = do |] cleanupConvs :: [ConvId] -> Sem r () cleanupConvs = - mapM_ $ \cid -> - maybe (pure ()) deleteConv =<< getAllConvData cid + mapM_ $ \cid -> do + mConvData <- getAllConvData cid + forM_ mConvData deleteConv cleanupUsers :: [UserId] -> Sem r () cleanupUsers = From 09f6d48861d23645c85969b64093c6913984e72c Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Thu, 21 May 2026 14:31:56 +0200 Subject: [PATCH 3/3] Don't forget to cleanup the pending deletes table --- .../Wire/ConversationStore/Migration/Cleanup.hs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs index 80178348948..0855ee5016c 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs @@ -123,9 +123,21 @@ cleanupIfNecessary ids = do pendingConvIds <- Session.statement (DeleteConv, convIds) filterPendingDeletes pendingUserIds <- Session.statement (DeleteUser, userIds) filterPendingDeletes pure (pendingConvIds, pendingUserIds) - cleanupConvs pendingConvIds - cleanupUsers pendingUserIds + + unless (null pendingConvIds) $ do + cleanupConvs pendingConvIds + runStatement (DeleteConv, pendingConvIds) markDeletionsComplete + + unless (null pendingUserIds) $ do + cleanupUsers pendingUserIds + runStatement (DeleteUser, pendingUserIds) markDeletionsComplete where + markDeletionsComplete :: Statement (DeletionType, [Id a]) () + markDeletionsComplete = + lmapPG @(_, Vector _) + [resultlessStatement|DELETE FROM conversation_migration_pending_deletes + WHERE typ = $1 :: text AND id = ANY($2 :: uuid[])|] + filterPendingDeletes :: Statement (DeletionType, [Id a]) [Id a] filterPendingDeletes = dimapPG @(_, Vector _) @_ @(Vector _) @[_]