diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py index 67a2baaf..44808969 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py @@ -43,11 +43,16 @@ async def execute_task(self, db: AsyncSession, task_id: str, retry_count: int) - task.started_at = datetime.now() task.retry_count = retry_count - await self.task_repo.update_task(db, task) submitted = await self.runtime_client.submit_task(task_id, retry_count) if submitted: + await self.task_repo.update_task(db, task) self._polling_task_ids.add(task_id) + return submitted + + task.status = CleaningTaskStatus.FAILED + task.finished_at = datetime.now() + await self.task_repo.update_task(db, task) return submitted diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index dc55fb37..76f9fc77 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -3,6 +3,7 @@ import re import shutil import uuid +from datetime import datetime from pathlib import Path from typing import List, Dict, Any, Set @@ -518,9 +519,21 @@ async def execute_task(self, db: AsyncSession, task_id: str) -> bool: await self.scan_dataset(db, task_id, task.src_dataset_id, succeed_set) await self.result_repo.delete_by_instance_id(db, task_id, "FAILED") - return await self.scheduler.execute_task( - db, task_id, (task.retry_count or 0) + 1 - ) + try: + return await self.scheduler.execute_task( + db, task_id, (task.retry_count or 0) + 1 + ) + except Exception as e: + logger.error( + f"execute_task failed, task_id={task_id}, retry_count={(task.retry_count or 0) + 1}: {e}" + ) + + task = CleaningTaskDto() + task.id = task_id + task.status = CleaningTaskStatus.FAILED + task.finished_at = datetime.now() + await self.task_repo.update_task(db, task) + return False async def stop_task(self, db: AsyncSession, task_id: str) -> bool: """Stop task"""