Skip to content

Commit 3a90cac

Browse files
committed
fix stuck processing_tasks and queued_requests leak
- worker_loop: persist started_at on Task and status='processing' on task:{id} so _store_final_task_state preserves them - requeue_stuck_processing_tasks: fall back to queued_at when started_at is missing (covers workers crashed in SADD→SET window) - _store_final_task_state: zrem queued_requests on terminal state v1.0.11
1 parent ceae7a1 commit 3a90cac

2 files changed

Lines changed: 23 additions & 25 deletions

File tree

modelq/app/base.py

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -278,24 +278,20 @@ def requeue_stuck_processing_tasks(self, threshold: float = 180.0):
278278
continue
279279

280280
task_dict = json.loads(task_data)
281-
started_at = task_dict.get("started_at", 0)
282-
if started_at:
283-
if now - started_at > threshold:
284-
logger.info(
285-
f"Re-queuing stuck task {task_id} which has been 'processing' for {now - started_at:.2f} seconds."
286-
)
287-
# Update status, queued_at, etc.
288-
task_dict["status"] = "queued"
289-
task_dict["queued_at"] = now
290-
291-
# Store the updated dict back in Redis
292-
self.redis_client.set(f"task:{task_id}", json.dumps(task_dict),ex=86400)
293-
294-
# Push it back into ml_tasks
295-
self.redis_client.rpush("ml_tasks", json.dumps(task_dict))
296-
297-
# Remove from processing set
298-
self.redis_client.srem("processing_tasks", task_id)
281+
# Fall back to queued_at when started_at is missing — covers workers
282+
# that crashed in the SADD→SET window before started_at was persisted.
283+
reference = task_dict.get("started_at") or task_dict.get("queued_at")
284+
if reference and (now - reference) > threshold:
285+
logger.info(
286+
f"Re-queuing stuck task {task_id} (age {now - reference:.2f}s)."
287+
)
288+
task_dict["status"] = "queued"
289+
task_dict["queued_at"] = now
290+
task_dict["started_at"] = None
291+
292+
self.redis_client.set(f"task:{task_id}", json.dumps(task_dict), ex=86400)
293+
self.redis_client.rpush("ml_tasks", json.dumps(task_dict))
294+
self.redis_client.srem("processing_tasks", task_id)
299295

300296
def prune_old_task_results(self, older_than_seconds: int = None):
301297
"""
@@ -673,20 +669,20 @@ def worker_loop(worker_id):
673669
task_dict = json.loads(task_json)
674670
task = Task.from_dict(task_dict)
675671

676-
# Mark task as 'processing'
677672
added = self.redis_client.sadd("processing_tasks", task.task_id)
678673
if added == 0:
679674
logger.warning(
680675
f"Task {task.task_id} is already being processed. Skipping duplicate."
681676
)
682677
continue
683-
task.status = "processing"
684678

685-
# Set started_at
686-
task_dict["started_at"] = time.time()
679+
now_ts = time.time()
680+
task.status = "processing"
681+
task.started_at = now_ts
682+
task_dict["status"] = "processing"
683+
task_dict["started_at"] = now_ts
687684

688-
# Update in Redis
689-
self.redis_client.set(f"task:{task.task_id}", json.dumps(task_dict),ex=86400)
685+
self.redis_client.set(f"task:{task.task_id}", json.dumps(task_dict), ex=86400)
690686

691687
if task.task_name in self.allowed_tasks:
692688
try:
@@ -968,6 +964,8 @@ def _store_final_task_state(self, task: Task, success: bool, error: Optional[Exc
968964
ex=86400
969965
)
970966

967+
self.redis_client.zrem("queued_requests", task.task_id)
968+
971969
# Update task history
972970
self._update_task_history(task.task_id, task_dict)
973971

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "modelq"
3-
version = "1.0.10"
3+
version = "1.0.11"
44
description = "Celery-like task queue for ML inference."
55
authors = ["Tanmaypatil123 <tanmay@modelslab.com>"]
66
readme = "README.md"

0 commit comments

Comments
 (0)