diff --git a/api/api.py b/api/api.py index ff99061..b9a3dd7 100644 --- a/api/api.py +++ b/api/api.py @@ -23,7 +23,7 @@ import os import re import traceback -from typing import Dict, List, Optional +from typing import Annotated, Dict, List, Optional from uuid import UUID from datetime import datetime @@ -1118,22 +1118,27 @@ async def post_config(config: Dict) -> None: tags=["dlq"], ) async def post_dlq_reprocess( - ingress_list: str = Query(..., description="Name of ingress list to reprocess to") + ingress_list: str = Query(..., description="Name of ingress list to reprocess to"), + count: Annotated[ + int, + Query( + ge=1, + le=100000, + description="Max items to move in this call. Callers drive the loop client-side for large DLQs to avoid HTTP timeouts.", + ), + ] = 1000, ) -> JSONResponse: - """Move items from a dead letter queue back to the ingress list. + """Move up to ``count`` items from a dead letter queue back to the ingress list. - Args: - ingress_list: Name of the ingress list to move items to - - Returns: - JSONResponse containing count of items moved - - Raises: - HTTPException: If there is an error reprocessing the DLQ + Bounded per call so each request returns quickly regardless of DLQ size. + Clients drain large DLQs by calling repeatedly until the response is 0. """ try: counter = 0 - while item := await queue.dequeue_dlq_async(redis_async, ingress_list): + for _ in range(count): + item = await queue.dequeue_dlq_async(redis_async, ingress_list) + if item is None: + break await queue.enqueue_async(redis_async, ingress_list, item) counter += 1 return JSONResponse(content=counter) diff --git a/common/tests/test_api_branches.py b/common/tests/test_api_branches.py index 7e748b7..560434e 100644 --- a/common/tests/test_api_branches.py +++ b/common/tests/test_api_branches.py @@ -220,6 +220,16 @@ async def test_get_vcon_count_get_config_and_dlq_endpoints_cover_success_and_err with pytest.raises(api.HTTPException, match="Failed to reprocess DLQ"): await api.post_dlq_reprocess("ingress-a") + # count param bounds how many items are moved per call: with count=3 and + # an effectively-unbounded DLQ, the endpoint stops at 3. + redis_async.lpop.side_effect = None + redis_async.lpop.return_value = "x" + redis_async.rpush.reset_mock() + with patch.object(api, "get_ingress_list_dlq_name", return_value="ingress-a:dlq"): + bounded = await api.post_dlq_reprocess("ingress-a", count=3) + assert json.loads(bounded.body) == 3 + assert redis_async.rpush.await_count == 3 + redis_async.lrange.return_value = ["x", "y"] with patch.object(api, "get_ingress_list_dlq_name", return_value="ingress-a:dlq"): get_dlq_response = await api.get_dlq_vcons("ingress-a")