From 5062098437c4669532623f85332ae44d73c22e25 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Mon, 22 Jun 2026 10:17:12 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/module/cleaning/service/cleaning_task_scheduler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py index 67a2baaf..8dc9e8a8 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py @@ -43,12 +43,15 @@ async def execute_task(self, db: AsyncSession, task_id: str, retry_count: int) - task.started_at = datetime.now() task.retry_count = retry_count - await self.task_repo.update_task(db, task) submitted = await self.runtime_client.submit_task(task_id, retry_count) if submitted: + await self.task_repo.update_task(db, task) self._polling_task_ids.add(task_id) + task.status = CleaningTaskStatus.FAILED + await self.task_repo.update_task(db, task) + return submitted async def stop_task(self, db: AsyncSession, task_id: str) -> bool: From e3d55fded196aff13a045cd79c683c4fc4a814df Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Mon, 22 Jun 2026 10:56:58 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/cleaning_task_scheduler.py | 1 + .../cleaning/service/cleaning_task_service.py | 19 ++++++++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py index 8dc9e8a8..49e8eeac 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py @@ -50,6 +50,7 @@ async def execute_task(self, db: AsyncSession, task_id: str, retry_count: int) - self._polling_task_ids.add(task_id) task.status = CleaningTaskStatus.FAILED + task.finished_at = datetime.now() await self.task_repo.update_task(db, task) return submitted diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index dc55fb37..5f4ae51c 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -3,6 +3,7 @@ import re import shutil import uuid +from datetime import datetime from pathlib import Path from typing import List, Dict, Any, Set @@ -518,9 +519,21 @@ async def execute_task(self, db: AsyncSession, task_id: str) -> bool: await self.scan_dataset(db, task_id, task.src_dataset_id, succeed_set) await self.result_repo.delete_by_instance_id(db, task_id, "FAILED") - return await self.scheduler.execute_task( - db, task_id, (task.retry_count or 0) + 1 - ) + try: + return await self.scheduler.execute_task( + db, task_id, (task.retry_count or 0) + 1 + ) + except Exception as e: + logger.exception( + "execute_task failed, task_id=%s, retry_count=%s", + task_id, + (task.retry_count or 0) + 1, + ) + + task = CleaningTaskDto() + task.status = CleaningTaskStatus.FAILED + task.finished_at = datetime.now() + await self.task_repo.update_task(db, task) async def stop_task(self, db: AsyncSession, task_id: str) -> bool: """Stop task""" From 23eadd54099b08624c8ea901a314d5381daa2b1c Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Mon, 22 Jun 2026 11:01:58 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/module/cleaning/service/cleaning_task_scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py index 49e8eeac..44808969 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py @@ -48,6 +48,7 @@ async def execute_task(self, db: AsyncSession, task_id: str, retry_count: int) - if submitted: await self.task_repo.update_task(db, task) self._polling_task_ids.add(task_id) + return submitted task.status = CleaningTaskStatus.FAILED task.finished_at = datetime.now() From 6566daab2140ffa9c5bc76014180c9b4752b85e0 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Mon, 22 Jun 2026 11:04:46 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/module/cleaning/service/cleaning_task_service.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index 5f4ae51c..628d1ca0 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -524,10 +524,8 @@ async def execute_task(self, db: AsyncSession, task_id: str) -> bool: db, task_id, (task.retry_count or 0) + 1 ) except Exception as e: - logger.exception( - "execute_task failed, task_id=%s, retry_count=%s", - task_id, - (task.retry_count or 0) + 1, + logger.error( + "execute_task failed, task_id=%s, retry_count=%s", task_id, (task.retry_count or 0) + 1, e ) task = CleaningTaskDto() From 9f0452c4641d059df9b4659ca35c48904a4aae7a Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Mon, 22 Jun 2026 11:11:20 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/module/cleaning/service/cleaning_task_service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index 628d1ca0..bdd9bc5f 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -525,10 +525,11 @@ async def execute_task(self, db: AsyncSession, task_id: str) -> bool: ) except Exception as e: logger.error( - "execute_task failed, task_id=%s, retry_count=%s", task_id, (task.retry_count or 0) + 1, e + f"execute_task failed, task_id={task_id}, retry_count={(task.retry_count or 0) + 1}: {e}" ) task = CleaningTaskDto() + task.id = task_id task.status = CleaningTaskStatus.FAILED task.finished_at = datetime.now() await self.task_repo.update_task(db, task) From 7b30577d4ed8b60f47c48d285a8509429c314800 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Mon, 22 Jun 2026 11:17:17 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/module/cleaning/service/cleaning_task_service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index bdd9bc5f..76f9fc77 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -533,6 +533,7 @@ async def execute_task(self, db: AsyncSession, task_id: str) -> bool: task.status = CleaningTaskStatus.FAILED task.finished_at = datetime.now() await self.task_repo.update_task(db, task) + return False async def stop_task(self, db: AsyncSession, task_id: str) -> bool: """Stop task"""