diff --git a/changelog.d/3-bug-fixes/optimize-cleanup-pending-deletes b/changelog.d/3-bug-fixes/optimize-cleanup-pending-deletes new file mode 100644 index 00000000000..a4ca7c4067f --- /dev/null +++ b/changelog.d/3-bug-fixes/optimize-cleanup-pending-deletes @@ -0,0 +1 @@ +Reduce connection usage and number of SQL queries for checking for pending PostgreSQL migration cleanup. \ No newline at end of file diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs index e26533da7b5..0855ee5016c 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Cleanup.hs @@ -22,6 +22,9 @@ module Wire.ConversationStore.Migration.Cleanup where import Cassandra import Data.Id import Data.Map qualified as Map +import Data.Vector (Vector) +import Hasql.Session qualified as Session +import Hasql.Statement (Statement) import Hasql.Statement qualified as Hasql import Hasql.TH import Imports @@ -113,28 +116,41 @@ deleteRemoteMemberStatusesFromCassandra uid = do delete :: PrepQuery W (Identity UserId) () delete = "delete from user_remote_conv where user = ?" -cleanupIfNecessary :: (PGConstraints r, Member (Input ClientState) r, Member ConversationStore r) => [Either ConvId UserId] -> Sem r () -cleanupIfNecessary = mapM_ (either cleanupConvIfNecessary cleanupUserIfNecessary) - -cleanupUserIfNecessary :: (PGConstraints r, Member (Input ClientState) r) => UserId -> Sem r () -cleanupUserIfNecessary uid = - whenM (isPendingDelete DeleteUser uid) $ do - deleteRemoteMemberStatusesFromCassandra uid - markDeletionComplete DeleteUser uid - -cleanupConvIfNecessary :: (PGConstraints r, Member ConversationStore r) => ConvId -> Sem r () -cleanupConvIfNecessary cid = - whenM (isPendingDelete DeleteConv cid) $ do - maybe (pure ()) deleteConv =<< getAllConvData cid - markDeletionComplete DeleteConv cid - -isPendingDelete :: (PGConstraints r) => DeletionType -> Id a -> Sem r Bool -isPendingDelete typ id_ = runStatement (typ, id_) select +cleanupIfNecessary :: forall r. (PGConstraints r, Member (Input ClientState) r, Member ConversationStore r) => [Either ConvId UserId] -> Sem r () +cleanupIfNecessary ids = do + (pendingConvIds, pendingUserIds) <- runSessionWithRetry $ do + let (convIds, userIds) = partitionEithers ids + pendingConvIds <- Session.statement (DeleteConv, convIds) filterPendingDeletes + pendingUserIds <- Session.statement (DeleteUser, userIds) filterPendingDeletes + pure (pendingConvIds, pendingUserIds) + + unless (null pendingConvIds) $ do + cleanupConvs pendingConvIds + runStatement (DeleteConv, pendingConvIds) markDeletionsComplete + + unless (null pendingUserIds) $ do + cleanupUsers pendingUserIds + runStatement (DeleteUser, pendingUserIds) markDeletionsComplete where - select = - lmapPG - [singletonStatement|SELECT EXISTS (SELECT 1 - FROM conversation_migration_pending_deletes - WHERE typ = $1 :: text AND id = $2 :: uuid - ) :: boolean - |] + markDeletionsComplete :: Statement (DeletionType, [Id a]) () + markDeletionsComplete = + lmapPG @(_, Vector _) + [resultlessStatement|DELETE FROM conversation_migration_pending_deletes + WHERE typ = $1 :: text AND id = ANY($2 :: uuid[])|] + + filterPendingDeletes :: Statement (DeletionType, [Id a]) [Id a] + filterPendingDeletes = + dimapPG @(_, Vector _) @_ @(Vector _) @[_] + [vectorStatement|SELECT id :: uuid + FROM conversation_migration_pending_deletes + WHERE typ = $1 :: text AND id = ANY($2 :: uuid[]) + |] + cleanupConvs :: [ConvId] -> Sem r () + cleanupConvs = + mapM_ $ \cid -> do + mConvData <- getAllConvData cid + forM_ mConvData deleteConv + + cleanupUsers :: [UserId] -> Sem r () + cleanupUsers = + mapM_ deleteRemoteMemberStatusesFromCassandra