Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import os
import re
import traceback
from typing import Dict, List, Optional
from typing import Annotated, Dict, List, Optional
from uuid import UUID
from datetime import datetime

Expand Down Expand Up @@ -1118,22 +1118,27 @@ async def post_config(config: Dict) -> None:
tags=["dlq"],
)
async def post_dlq_reprocess(
ingress_list: str = Query(..., description="Name of ingress list to reprocess to")
ingress_list: str = Query(..., description="Name of ingress list to reprocess to"),
count: Annotated[
int,
Query(
ge=1,
le=100000,
description="Max items to move in this call. Callers drive the loop client-side for large DLQs to avoid HTTP timeouts.",
),
] = 1000,
) -> JSONResponse:
"""Move items from a dead letter queue back to the ingress list.
"""Move up to ``count`` items from a dead letter queue back to the ingress list.

Args:
ingress_list: Name of the ingress list to move items to

Returns:
JSONResponse containing count of items moved

Raises:
HTTPException: If there is an error reprocessing the DLQ
Bounded per call so each request returns quickly regardless of DLQ size.
Clients drain large DLQs by calling repeatedly until the response is 0.
"""
try:
counter = 0
while item := await queue.dequeue_dlq_async(redis_async, ingress_list):
for _ in range(count):
item = await queue.dequeue_dlq_async(redis_async, ingress_list)
if item is None:
break
await queue.enqueue_async(redis_async, ingress_list, item)
counter += 1
return JSONResponse(content=counter)
Expand Down
10 changes: 10 additions & 0 deletions common/tests/test_api_branches.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ async def test_get_vcon_count_get_config_and_dlq_endpoints_cover_success_and_err
with pytest.raises(api.HTTPException, match="Failed to reprocess DLQ"):
await api.post_dlq_reprocess("ingress-a")

# count param bounds how many items are moved per call: with count=3 and
# an effectively-unbounded DLQ, the endpoint stops at 3.
redis_async.lpop.side_effect = None
redis_async.lpop.return_value = "x"
redis_async.rpush.reset_mock()
with patch.object(api, "get_ingress_list_dlq_name", return_value="ingress-a:dlq"):
bounded = await api.post_dlq_reprocess("ingress-a", count=3)
assert json.loads(bounded.body) == 3
assert redis_async.rpush.await_count == 3

redis_async.lrange.return_value = ["x", "y"]
with patch.object(api, "get_ingress_list_dlq_name", return_value="ingress-a:dlq"):
get_dlq_response = await api.get_dlq_vcons("ingress-a")
Expand Down
Loading