From fa71c868960951b9f0ded088cee9e01e23a2ae50 Mon Sep 17 00:00:00 2001 From: MoeexT Date: Wed, 17 Jun 2026 10:01:17 +0800 Subject: [PATCH 1/9] fix: prevent path traversal via task_id in cleaning task log/delete endpoints Add UUID format validation in check_task_id() and resolve+startsWith boundary checks on all log_path/task_path constructions to prevent arbitrary file read and arbitrary directory deletion. - cleaning_task_validator.py: add UUID regex to check_task_id() - cleaning_task_service.py: add path boundary checks in get_task_log() and delete_task() - cleaning_task_routes.py: add path boundary checks in stream and download endpoints --- .../interface/cleaning_task_routes.py | 21 +++++++++++++++++++ .../cleaning/service/cleaning_task_service.py | 17 ++++++++++++++- .../service/cleaning_task_validator.py | 14 ++++++++++++- 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py index 8e4cef92..c6c66253 100644 --- a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py +++ b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py @@ -281,6 +281,7 @@ async def stream_cleaning_task_log( """Stream cleaning task log via SSE""" import asyncio import json + import os import re from pathlib import Path @@ -300,6 +301,16 @@ async def error_generator(): if retry_count > 0: log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") + # 防止路径穿越:规范化后校验仍在 /flow 下 + log_path = log_path.resolve() + if not str(log_path).startswith(FLOW_PATH + "/"): + logger.warning(f"Path traversal attempt in stream_log: task_id={task_id}") + + async def traversal_error_generator(): + yield f"data: {json.dumps({'level': 'ERROR', 'message': 'Invalid task_id'}, ensure_ascii=False)}\n\n" + + return StreamingResponse(traversal_error_generator(), media_type="text/event-stream") + standard_level_pattern = re.compile( r"\b(DEBUG|Debug|INFO|Info|WARN|Warn|WARNING|Warning|ERROR|Error|FATAL|Fatal)\b" ) @@ -428,6 +439,16 @@ async def download_cleaning_task_log( if retry_count > 0: log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") + # 防止路径穿越:规范化后校验仍在 /flow 下 + log_path = log_path.resolve() + if not str(log_path).startswith(FLOW_PATH + "/"): + logger.warning(f"Path traversal attempt in download_log: task_id={task_id}") + from app.core.exception import BusinessError, ErrorCodes + raise BusinessError( + ErrorCodes.CLEANING_TASK_LOG_NOT_FOUND, + f"Invalid task_id: {task_id}", + ) + if not log_path.exists(): from app.core.exception import BusinessError, ErrorCodes diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index 93ddd3a3..ba6adf73 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -1,4 +1,5 @@ import json +import os import re import shutil import uuid @@ -386,6 +387,12 @@ async def get_task_log( if retry_count > 0: log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") + # 防止路径穿越:规范化后校验仍在 /flow 下 + log_path = log_path.resolve() + if not str(log_path).startswith(FLOW_PATH + "/"): + logger.warning(f"Path traversal attempt detected: task_id={task_id}") + return [] + if not log_path.exists(): return [] @@ -447,7 +454,15 @@ async def delete_task(self, db: AsyncSession, task_id: str) -> None: await self.result_repo.delete_by_instance_id(db, task_id) # 删除任务相关文件 - task_path = Path(f"{FLOW_PATH}/{task_id}") + task_path = Path(f"{FLOW_PATH}/{task_id}").resolve() + # 防止路径穿越:规范化后校验仍在 /flow 下 + if not str(task_path).startswith(FLOW_PATH + "/"): + logger.warning(f"Path traversal attempt in delete_task: task_id={task_id}") + raise BusinessError( + ErrorCodes.CLEANING_TASK_NOT_FOUND, + f"Invalid task_id: {task_id}", + ) + if task_path.exists(): try: shutil.rmtree(task_path) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py index 254652dc..de29def9 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py @@ -1,9 +1,16 @@ +import re + from sqlalchemy.ext.asyncio import AsyncSession from app.core.exception import BusinessError, ErrorCodes from app.module.cleaning.schema import OperatorInstanceDto from app.module.operator.constants import CATEGORY_DATA_JUICER_ID, CATEGORY_DATAMATE_ID +# UUID pattern for task_id validation (prevents path traversal) +_TASK_ID_PATTERN = re.compile( + r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$' +) + class CleaningTaskValidator: """Validator for cleaning tasks and templates""" @@ -95,6 +102,11 @@ def check_and_get_executor_type(instances: list[OperatorInstanceDto]) -> str: @staticmethod def check_task_id(task_id: str) -> None: - """Validate task ID""" + """Validate task ID — rejects non-UUID and path traversal patterns""" if not task_id: raise BusinessError(ErrorCodes.CLEANING_TASK_ID_REQUIRED) + if not _TASK_ID_PATTERN.match(task_id): + raise BusinessError( + ErrorCodes.CLEANING_TASK_ID_REQUIRED, + f"Invalid task_id format: {task_id}", + ) From d16bf0b186e88fb6d71ac7d5cdac55fbf2b828aa Mon Sep 17 00:00:00 2001 From: MoeexT Date: Wed, 17 Jun 2026 10:13:20 +0800 Subject: [PATCH 2/9] fix: prevent path traversal via filename in chunk upload (arbitrary file write) Add normalize + startsWith boundary checks in ChunksSaver save/saveFile methods to prevent directory traversal via crafted fileName containing '../'. - ChunksSaver.java: validate final path stays within uploadPath in save() and saveFile() - chunks_saver.py: validate resolved path stays within upload_path in save() and save_file() --- .../common/domain/utils/ChunksSaver.java | 16 +++++++++++++++ .../app/module/shared/chunks_saver.py | 20 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java index 63db7028..d00362dd 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java @@ -49,6 +49,14 @@ public static Optional save(ChunkUploadRequest fileUploadRequest, ChunkUpl } File finalFile = new File(preUploadReq.getUploadPath(), fileUploadRequest.getFileName()); + // 防止路径穿越:规范化后校验仍在 uploadPath 下 + Path uploadBasePath = Paths.get(preUploadReq.getUploadPath()).normalize().toAbsolutePath(); + Path finalFilePath = finalFile.toPath().normalize().toAbsolutePath(); + if (!finalFilePath.startsWith(uploadBasePath)) { + log.error("Path traversal attempt detected in chunk save: fileName={}, finalPath={}", + fileUploadRequest.getFileName(), finalFilePath); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } // 确保父目录存在(处理嵌套文件夹上传的情况) File parentDir = finalFile.getParentFile(); if (parentDir != null && !parentDir.exists()) { @@ -91,6 +99,14 @@ private static InputStream getFileInputStream(MultipartFile file) { public static File saveFile(ChunkUploadRequest fileUploadRequest, ChunkUploadPreRequest preUploadReq) { // 保存文件 File targetFile = new File(preUploadReq.getUploadPath(), fileUploadRequest.getFileName()); + // 防止路径穿越:规范化后校验仍在 uploadPath 下 + Path uploadBasePath = Paths.get(preUploadReq.getUploadPath()).normalize().toAbsolutePath(); + Path targetFilePath = targetFile.toPath().normalize().toAbsolutePath(); + if (!targetFilePath.startsWith(uploadBasePath)) { + log.error("Path traversal attempt detected in saveFile: fileName={}, targetPath={}", + fileUploadRequest.getFileName(), targetFilePath); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } // 确保父目录存在(处理嵌套文件夹上传的情况) File parentDir = targetFile.getParentFile(); if (parentDir != null && !parentDir.exists()) { diff --git a/runtime/datamate-python/app/module/shared/chunks_saver.py b/runtime/datamate-python/app/module/shared/chunks_saver.py index 554b263b..e8bff577 100644 --- a/runtime/datamate-python/app/module/shared/chunks_saver.py +++ b/runtime/datamate-python/app/module/shared/chunks_saver.py @@ -57,6 +57,16 @@ def save( final_file = Path(upload_path) / file_upload_request.file_name + # 防止路径穿越:规范化后校验仍在 upload_path 下 + resolved_final = final_file.resolve() + resolved_base = Path(upload_path).resolve() + if not str(resolved_final).startswith(str(resolved_base) + os.sep): + logger.error( + f"Path traversal attempt in chunk save: file_name={file_upload_request.file_name}, " + f"resolved_path={resolved_final}" + ) + raise ValueError("Path traversal detected in file name") + try: temp_file.rename(final_file) except OSError as e: @@ -89,6 +99,16 @@ def save_file( """ target_file = Path(upload_path) / file_upload_request.file_name + # 防止路径穿越:规范化后校验仍在 upload_path 下 + resolved_target = target_file.resolve() + resolved_base = Path(upload_path).resolve() + if not str(resolved_target).startswith(str(resolved_base) + os.sep): + logger.error( + f"Path traversal attempt in save_file: file_name={file_upload_request.file_name}, " + f"resolved_path={resolved_target}" + ) + raise ValueError("Path traversal detected in file name") + logger.info(f"file path {target_file}, file size {len(file_content)}") try: From 4105654caedfa7695f82c3b6e251e92bd7f2c1da Mon Sep 17 00:00:00 2001 From: MoeexT Date: Wed, 17 Jun 2026 10:20:15 +0800 Subject: [PATCH 3/9] fix: prevent path traversal in GlusterFS reader/writer via subPath and destPath injection Add multi-layer path boundary checks to prevent cross-directory reads and writes: - GlusterfsWriter: validate destPath stays within /dataset, reject .. in subPath, validate sourcePath stays within mount point, reject path separators in fileName - GlusterfsReader: validate readPath stays within mount point --- .../glusterfsreader/GlusterfsReader.java | 11 ++++- .../glusterfswriter/GlusterfsWriter.java | 46 +++++++++++++++++-- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java index 827e60b9..d5a8fe5e 100644 --- a/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java +++ b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java @@ -98,7 +98,16 @@ public void startRead(RecordSender recordSender) { readPath = this.mountPoint + "/" + this.subPath.replaceFirst("^/+", ""); } - try (Stream stream = Files.list(Paths.get(readPath))) { + // 防止路径穿越:规范化后校验读取路径仍在 mount 点下 + Path normalizedReadPath = Paths.get(readPath).normalize().toAbsolutePath(); + Path normalizedMount = Paths.get(this.mountPoint).normalize().toAbsolutePath(); + if (!normalizedReadPath.startsWith(normalizedMount)) { + LOG.error("Path traversal detected in reader: readPath={} outside mountPoint={}", + normalizedReadPath, normalizedMount); + throw new RuntimeException("Read path outside mount point: " + readPath); + } + + try (Stream stream = Files.list(normalizedReadPath)) { List fileList = stream.filter(Files::isRegularFile) .filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file))) .map(path -> path.getFileName().toString()) diff --git a/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java index 61eba883..788a95a0 100644 --- a/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java +++ b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java @@ -13,6 +13,8 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -26,6 +28,9 @@ public class GlusterfsWriter extends Writer { private static final Logger LOG = LoggerFactory.getLogger(GlusterfsWriter.class); + /** 允许的目标路径基础目录 */ + private static final String ALLOWED_DEST_BASE = "/dataset"; + public static class Job extends Writer.Job { private Configuration jobConfig; private String mountPoint; @@ -49,7 +54,13 @@ public void prepare() { GlusterfsMountUtil.mount(remote, mountPoint); String destPath = this.jobConfig.getString("destPath"); - new File(destPath).mkdirs(); + // 防止路径穿越:规范化后校验目标路径在允许目录内 + Path normalizedDest = Paths.get(destPath).normalize().toAbsolutePath(); + if (!normalizedDest.startsWith(ALLOWED_DEST_BASE)) { + throw new IllegalArgumentException( + "destPath is outside allowed directory: " + destPath); + } + new File(normalizedDest.toString()).mkdirs(); } @Override @@ -86,6 +97,12 @@ public void init() { this.mountPoint = this.jobConfig.getString("mountPoint"); this.subPath = this.jobConfig.getString("path", ""); this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class); + + // 防止路径穿越:subPath 不能包含 .. + if (StringUtils.isNotBlank(this.subPath) && this.subPath.contains("..")) { + throw new IllegalArgumentException( + "Invalid subPath (path traversal): " + this.subPath); + } } @Override @@ -95,6 +112,23 @@ public void startWrite(RecordReceiver lineReceiver) { sourcePath = this.mountPoint + "/" + this.subPath.replaceFirst("^/+", ""); } + // 防止路径穿越:规范化后校验源路径仍在 mount 点下 + Path normalizedSourcePath = Paths.get(sourcePath).normalize().toAbsolutePath(); + Path normalizedMount = Paths.get(this.mountPoint).normalize().toAbsolutePath(); + if (!normalizedSourcePath.startsWith(normalizedMount)) { + LOG.error("Path traversal detected: sourcePath={} outside mountPoint={}", + normalizedSourcePath, normalizedMount); + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, + "Source path outside mount point: " + sourcePath); + } + + // 防止路径穿越:校验目标路径在允许目录内 + Path normalizedDest = Paths.get(this.destPath).normalize().toAbsolutePath(); + if (!normalizedDest.startsWith(ALLOWED_DEST_BASE)) { + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, + "destPath outside allowed directory: " + this.destPath); + } + try { Record record; while ((record = lineReceiver.getFromReader()) != null) { @@ -106,9 +140,15 @@ public void startWrite(RecordReceiver lineReceiver) { continue; } - String filePath = sourcePath + "/" + fileName; + // 防止路径穿越:fileName 不能包含路径分隔符或 .. + if (fileName.contains("..") || fileName.contains("/") || fileName.contains("\\")) { + LOG.warn("Skipping file with suspicious name: {}", fileName); + continue; + } + + String filePath = normalizedSourcePath + "/" + fileName; ShellUtil.runCommand("rsync", Arrays.asList("--no-links", "--chmod=754", "--", filePath, - this.destPath + "/" + fileName)); + normalizedDest + "/" + fileName)); } } catch (Exception e) { LOG.error("Error writing files from GlusterFS: {}", e.getMessage(), e); From 0e51ab798855a9ab0b4ec8ef4ea10ca6bb611c50 Mon Sep 17 00:00:00 2001 From: MoeexT Date: Wed, 17 Jun 2026 10:24:21 +0800 Subject: [PATCH 4/9] fix: prevent path traversal via prefix/fileName in upload preUpload Add .. rejection in ValidPathValidator and ValidFilePathValidator, plus normalize+startsWith boundary check on uploadPath in preUpload to prevent cross-directory writes via crafted prefix parameters. - ValidPathValidator: reject .. sequences - ValidFilePathValidator: reject .. sequences - DatasetFileApplicationService.preUpload(): validate uploadPath stays within datasetBasePath --- .../application/DatasetFileApplicationService.java | 10 ++++++++++ .../interfaces/validation/ValidFilePathValidator.java | 9 +++++++++ .../interfaces/validation/ValidPathValidator.java | 9 +++++++++ 3 files changed, 28 insertions(+) diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java index 43b81720..bbfab302 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java @@ -493,6 +493,16 @@ public String preUpload(UploadFilesPreRequest chunkUploadRequest, String dataset if (!prefix.isEmpty()) { uploadPath = uploadPath + File.separator + prefix.replace("/", File.separator); } + + // 防止路径穿越:规范化后校验 uploadPath 仍在 datasetBasePath 下 + Path normalizedUploadPath = Paths.get(uploadPath).normalize().toAbsolutePath(); + Path normalizedBase = Paths.get(datasetBasePath).normalize().toAbsolutePath(); + if (!normalizedUploadPath.startsWith(normalizedBase)) { + log.warn("Path traversal attempt in preUpload: datasetId={}, prefix={}, uploadPath={}", + datasetId, prefix, normalizedUploadPath); + throw BusinessException.of(CommonErrorCode.PARAM_ERROR, + "Upload path outside allowed directory"); + } } ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build(); diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidFilePathValidator.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidFilePathValidator.java index b45a29f3..5b813e3a 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidFilePathValidator.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidFilePathValidator.java @@ -31,6 +31,15 @@ public boolean isValid(String value, ConstraintValidatorContext context) { return true; // 空值由 @NotBlank 等其他注解处理 } + // 检查是否包含路径遍历序列 .. + if (value.contains("..")) { + context.disableDefaultConstraintViolation(); + context.buildConstraintViolationWithTemplate( + "文件路径不允许包含 '..'" + ).addConstraintViolation(); + return false; + } + boolean isValid = FILE_PATH_PATTERN.matcher(value).matches(); if (!isValid) { diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidPathValidator.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidPathValidator.java index 69bb297b..1b714189 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidPathValidator.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidPathValidator.java @@ -54,6 +54,15 @@ public boolean isValid(String value, ConstraintValidatorContext context) { return false; } + // 检查是否包含路径遍历序列 .. + if (value.contains("..")) { + context.disableDefaultConstraintViolation(); + context.buildConstraintViolationWithTemplate( + "路径不允许包含 '..'" + ).addConstraintViolation(); + return false; + } + // 检查是否包含非法字符 if (!PATH_PATTERN.matcher(value).matches()) { context.disableDefaultConstraintViolation(); From d8c0005839607fb8398011f5f4d33f1c2c15e3d8 Mon Sep 17 00:00:00 2001 From: MoeexT Date: Wed, 17 Jun 2026 10:38:11 +0800 Subject: [PATCH 5/9] fix: resolve CodeQL "uncontrolled data in path" warnings Restructure path construction to validate task_id BEFORE building Path objects, use Path / operator instead of f-string concatenation, and use Path.relative_to() for boundary checks instead of str.startswith(). - Routes: add inline re.fullmatch validation before Path construction - Service: use flow_root / task_id / filename pattern - All 4 locations: replace startswith() with relative_to() --- .../interface/cleaning_task_routes.py | 47 +++++++++++++------ .../cleaning/service/cleaning_task_service.py | 26 ++++++---- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py index c6c66253..0dba81c4 100644 --- a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py +++ b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py @@ -281,11 +281,19 @@ async def stream_cleaning_task_log( """Stream cleaning task log via SSE""" import asyncio import json - import os import re from pathlib import Path - FLOW_PATH = "/flow" + flow_base = Path("/flow").resolve() + + # 校验 task_id 格式,防止不受控数据直接用于路径构造 + if not re.fullmatch(r"[A-Za-z0-9_-]+", task_id): + logger.warning(f"Invalid task_id in stream_log: task_id={task_id}") + + async def invalid_error_generator(): + yield f"data: {json.dumps({'level': 'ERROR', 'message': 'Invalid task_id'}, ensure_ascii=False)}\n\n" + + return StreamingResponse(invalid_error_generator(), media_type="text/event-stream") task_service = _get_task_service(db) task = await task_service.task_repo.find_task_by_id(db, task_id) @@ -297,13 +305,13 @@ async def error_generator(): return StreamingResponse(error_generator(), media_type="text/event-stream") - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log") - if retry_count > 0: - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") + log_filename = "output.log" if retry_count == 0 else f"output.log.{retry_count}" + log_path = (flow_base / task_id / log_filename).resolve() - # 防止路径穿越:规范化后校验仍在 /flow 下 - log_path = log_path.resolve() - if not str(log_path).startswith(FLOW_PATH + "/"): + # 防止路径穿越:relative_to 校验仍在 flow_base 下 + try: + log_path.relative_to(flow_base) + except ValueError: logger.warning(f"Path traversal attempt in stream_log: task_id={task_id}") async def traversal_error_generator(): @@ -425,7 +433,16 @@ async def download_cleaning_task_log( from pathlib import Path from fastapi.responses import FileResponse - FLOW_PATH = "/flow" + flow_base = Path("/flow").resolve() + + # 校验 task_id 格式,防止不受控数据直接用于路径构造 + if not re.fullmatch(r"[A-Za-z0-9_-]+", task_id): + logger.warning(f"Invalid task_id in download_log: task_id={task_id}") + from app.core.exception import BusinessError, ErrorCodes + raise BusinessError( + ErrorCodes.CLEANING_TASK_LOG_NOT_FOUND, + f"Invalid task_id: {task_id}", + ) task_service = _get_task_service(db) task = await task_service.task_repo.find_task_by_id(db, task_id) @@ -435,13 +452,13 @@ async def download_cleaning_task_log( raise BusinessError(ErrorCodes.CLEANING_TASK_NOT_FOUND, task_id) - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log") - if retry_count > 0: - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") + log_filename = "output.log" if retry_count == 0 else f"output.log.{retry_count}" + log_path = (flow_base / task_id / log_filename).resolve() - # 防止路径穿越:规范化后校验仍在 /flow 下 - log_path = log_path.resolve() - if not str(log_path).startswith(FLOW_PATH + "/"): + # 防止路径穿越:relative_to 校验仍在 flow_base 下 + try: + log_path.relative_to(flow_base) + except ValueError: logger.warning(f"Path traversal attempt in download_log: task_id={task_id}") from app.core.exception import BusinessError, ErrorCodes raise BusinessError( diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index ba6adf73..ec6999a6 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -383,17 +383,20 @@ async def get_task_log( """Get task log""" self.validator.check_task_id(task_id) - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log") + flow_root = Path(FLOW_PATH).resolve() + log_path = flow_root / task_id / "output.log" if retry_count > 0: - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") + log_path = flow_root / task_id / f"output.log.{retry_count}" - # 防止路径穿越:规范化后校验仍在 /flow 下 - log_path = log_path.resolve() - if not str(log_path).startswith(FLOW_PATH + "/"): + # 防止路径穿越:规范化后校验仍在 FLOW_PATH 下 + resolved_log_path = log_path.resolve() + try: + resolved_log_path.relative_to(flow_root) + except ValueError: logger.warning(f"Path traversal attempt detected: task_id={task_id}") return [] - if not log_path.exists(): + if not resolved_log_path.exists(): return [] logs = [] @@ -404,7 +407,7 @@ async def get_task_log( ) exception_suffix_pattern = re.compile(r"\b\w+(Warning|Error|Exception)\b") - with open(log_path, "r", encoding="utf-8") as f: + with open(resolved_log_path, "r", encoding="utf-8") as f: for line in f: last_level = self._get_log_level( line, last_level, standard_level_pattern, exception_suffix_pattern @@ -454,9 +457,12 @@ async def delete_task(self, db: AsyncSession, task_id: str) -> None: await self.result_repo.delete_by_instance_id(db, task_id) # 删除任务相关文件 - task_path = Path(f"{FLOW_PATH}/{task_id}").resolve() - # 防止路径穿越:规范化后校验仍在 /flow 下 - if not str(task_path).startswith(FLOW_PATH + "/"): + flow_root = Path(FLOW_PATH).resolve() + task_path = (flow_root / task_id).resolve() + # 防止路径穿越:relative_to 校验目标路径仍在 flow_root 下 + try: + task_path.relative_to(flow_root) + except ValueError: logger.warning(f"Path traversal attempt in delete_task: task_id={task_id}") raise BusinessError( ErrorCodes.CLEANING_TASK_NOT_FOUND, From 23e77991b28e9dd08ee09288891425c01b3564d8 Mon Sep 17 00:00:00 2001 From: MoeexT Date: Wed, 17 Jun 2026 10:56:10 +0800 Subject: [PATCH 6/9] fix: use CodeQL-sanitized path validation patterns Replace inline re.fullmatch with module-level _TASK_ID_PATTERN and replace relative_to() try/except with parents containment check to satisfy CodeQL taint tracking as proper sanitizers. - routes: module-level _TASK_ID_PATTERN + flow_base not in log_path.parents - service: flow_root not in parents for get_task_log and delete_task --- .../interface/cleaning_task_routes.py | 20 +++++++++---------- .../cleaning/service/cleaning_task_service.py | 10 +++------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py index 0dba81c4..06311048 100644 --- a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py +++ b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py @@ -1,4 +1,5 @@ from typing import Optional +import re from fastapi import APIRouter, Depends from fastapi.responses import StreamingResponse @@ -17,6 +18,9 @@ logger = get_logger(__name__) +# Module-level pattern for task_id sanitization (CodeQL sanitizer) +_TASK_ID_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$") + router = APIRouter(prefix="/cleaning/tasks", tags=["Cleaning Tasks"]) @@ -287,7 +291,7 @@ async def stream_cleaning_task_log( flow_base = Path("/flow").resolve() # 校验 task_id 格式,防止不受控数据直接用于路径构造 - if not re.fullmatch(r"[A-Za-z0-9_-]+", task_id): + if not _TASK_ID_PATTERN.fullmatch(task_id): logger.warning(f"Invalid task_id in stream_log: task_id={task_id}") async def invalid_error_generator(): @@ -308,10 +312,8 @@ async def error_generator(): log_filename = "output.log" if retry_count == 0 else f"output.log.{retry_count}" log_path = (flow_base / task_id / log_filename).resolve() - # 防止路径穿越:relative_to 校验仍在 flow_base 下 - try: - log_path.relative_to(flow_base) - except ValueError: + # 防止路径穿越:parents 校验仍在 flow_base 下 + if flow_base not in log_path.parents: logger.warning(f"Path traversal attempt in stream_log: task_id={task_id}") async def traversal_error_generator(): @@ -436,7 +438,7 @@ async def download_cleaning_task_log( flow_base = Path("/flow").resolve() # 校验 task_id 格式,防止不受控数据直接用于路径构造 - if not re.fullmatch(r"[A-Za-z0-9_-]+", task_id): + if not _TASK_ID_PATTERN.fullmatch(task_id): logger.warning(f"Invalid task_id in download_log: task_id={task_id}") from app.core.exception import BusinessError, ErrorCodes raise BusinessError( @@ -455,10 +457,8 @@ async def download_cleaning_task_log( log_filename = "output.log" if retry_count == 0 else f"output.log.{retry_count}" log_path = (flow_base / task_id / log_filename).resolve() - # 防止路径穿越:relative_to 校验仍在 flow_base 下 - try: - log_path.relative_to(flow_base) - except ValueError: + # 防止路径穿越:parents 校验仍在 flow_base 下 + if flow_base not in log_path.parents: logger.warning(f"Path traversal attempt in download_log: task_id={task_id}") from app.core.exception import BusinessError, ErrorCodes raise BusinessError( diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index ec6999a6..857840ce 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -390,9 +390,7 @@ async def get_task_log( # 防止路径穿越:规范化后校验仍在 FLOW_PATH 下 resolved_log_path = log_path.resolve() - try: - resolved_log_path.relative_to(flow_root) - except ValueError: + if flow_root not in resolved_log_path.parents: logger.warning(f"Path traversal attempt detected: task_id={task_id}") return [] @@ -459,10 +457,8 @@ async def delete_task(self, db: AsyncSession, task_id: str) -> None: # 删除任务相关文件 flow_root = Path(FLOW_PATH).resolve() task_path = (flow_root / task_id).resolve() - # 防止路径穿越:relative_to 校验目标路径仍在 flow_root 下 - try: - task_path.relative_to(flow_root) - except ValueError: + # 防止路径穿越:parents 校验目标路径仍在 flow_root 下 + if flow_root not in task_path.parents: logger.warning(f"Path traversal attempt in delete_task: task_id={task_id}") raise BusinessError( ErrorCodes.CLEANING_TASK_NOT_FOUND, From fce688691c3d096c1cecf24f4b7fb7fe85fe9cd1 Mon Sep 17 00:00:00 2001 From: MoeexT Date: Wed, 17 Jun 2026 11:21:10 +0800 Subject: [PATCH 7/9] fix: use CodeQL-recognized sanitizer function for task_id path construction Replace inline regex validation with _sanitize_task_id() / sanitize_task_id() functions whose return values CodeQL tracks as sanitized, eliminating the remaining 6 "Uncontrolled data used in path expression" warnings. --- .../interface/cleaning_task_routes.py | 33 ++++++++++++++----- .../cleaning/service/cleaning_task_service.py | 18 +++++----- .../service/cleaning_task_validator.py | 10 ++++++ 3 files changed, 43 insertions(+), 18 deletions(-) diff --git a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py index 06311048..31986cdd 100644 --- a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py +++ b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py @@ -21,6 +21,17 @@ # Module-level pattern for task_id sanitization (CodeQL sanitizer) _TASK_ID_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$") + +def _sanitize_task_id(task_id: str) -> str: + """Validate and return sanitized task_id for safe path construction. + + CodeQL recognizes the return value of this function as sanitized, + allowing safe use in path construction. + """ + if not _TASK_ID_PATTERN.fullmatch(task_id): + raise ValueError(f"Invalid task_id: {task_id}") + return task_id + router = APIRouter(prefix="/cleaning/tasks", tags=["Cleaning Tasks"]) @@ -290,8 +301,10 @@ async def stream_cleaning_task_log( flow_base = Path("/flow").resolve() - # 校验 task_id 格式,防止不受控数据直接用于路径构造 - if not _TASK_ID_PATTERN.fullmatch(task_id): + # 校验 task_id 并获取净化后的值(CodeQL 认可此返回值为安全) + try: + safe_task_id = _sanitize_task_id(task_id) + except ValueError: logger.warning(f"Invalid task_id in stream_log: task_id={task_id}") async def invalid_error_generator(): @@ -300,7 +313,7 @@ async def invalid_error_generator(): return StreamingResponse(invalid_error_generator(), media_type="text/event-stream") task_service = _get_task_service(db) - task = await task_service.task_repo.find_task_by_id(db, task_id) + task = await task_service.task_repo.find_task_by_id(db, safe_task_id) if not task: @@ -310,7 +323,7 @@ async def error_generator(): return StreamingResponse(error_generator(), media_type="text/event-stream") log_filename = "output.log" if retry_count == 0 else f"output.log.{retry_count}" - log_path = (flow_base / task_id / log_filename).resolve() + log_path = (flow_base / safe_task_id / log_filename).resolve() # 防止路径穿越:parents 校验仍在 flow_base 下 if flow_base not in log_path.parents: @@ -349,7 +362,7 @@ async def log_generator(): while True: try: - current_task = await task_service.task_repo.find_task_by_id(db, task_id) + current_task = await task_service.task_repo.find_task_by_id(db, safe_task_id) is_task_finished = current_task and current_task.status in [ "COMPLETED", "FAILED", @@ -437,8 +450,10 @@ async def download_cleaning_task_log( flow_base = Path("/flow").resolve() - # 校验 task_id 格式,防止不受控数据直接用于路径构造 - if not _TASK_ID_PATTERN.fullmatch(task_id): + # 校验 task_id 并获取净化后的值(CodeQL 认可此返回值为安全) + try: + safe_task_id = _sanitize_task_id(task_id) + except ValueError: logger.warning(f"Invalid task_id in download_log: task_id={task_id}") from app.core.exception import BusinessError, ErrorCodes raise BusinessError( @@ -447,7 +462,7 @@ async def download_cleaning_task_log( ) task_service = _get_task_service(db) - task = await task_service.task_repo.find_task_by_id(db, task_id) + task = await task_service.task_repo.find_task_by_id(db, safe_task_id) if not task: from app.core.exception import BusinessError, ErrorCodes @@ -455,7 +470,7 @@ async def download_cleaning_task_log( raise BusinessError(ErrorCodes.CLEANING_TASK_NOT_FOUND, task_id) log_filename = "output.log" if retry_count == 0 else f"output.log.{retry_count}" - log_path = (flow_base / task_id / log_filename).resolve() + log_path = (flow_base / safe_task_id / log_filename).resolve() # 防止路径穿越:parents 校验仍在 flow_base 下 if flow_base not in log_path.parents: diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index 857840ce..04c24295 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -381,12 +381,12 @@ async def get_task_log( self, db: AsyncSession, task_id: str, retry_count: int ) -> List[CleaningTaskLog]: """Get task log""" - self.validator.check_task_id(task_id) + safe_task_id = self.validator.sanitize_task_id(task_id) flow_root = Path(FLOW_PATH).resolve() - log_path = flow_root / task_id / "output.log" + log_path = flow_root / safe_task_id / "output.log" if retry_count > 0: - log_path = flow_root / task_id / f"output.log.{retry_count}" + log_path = flow_root / safe_task_id / f"output.log.{retry_count}" # 防止路径穿越:规范化后校验仍在 FLOW_PATH 下 resolved_log_path = log_path.resolve() @@ -437,9 +437,9 @@ def _get_log_level( async def delete_task(self, db: AsyncSession, task_id: str) -> None: """Delete task""" - self.validator.check_task_id(task_id) + safe_task_id = self.validator.sanitize_task_id(task_id) - task = await self.task_repo.find_task_by_id(db, task_id) + task = await self.task_repo.find_task_by_id(db, safe_task_id) if not task: raise BusinessError(ErrorCodes.CLEANING_TASK_NOT_FOUND, task_id) @@ -450,13 +450,13 @@ async def delete_task(self, db: AsyncSession, task_id: str) -> None: "Task is running, cannot be deleted. Please stop the task first." ) - await self.task_repo.delete_task_by_id(db, task_id) - await self.operator_instance_repo.delete_by_instance_id(db, task_id) - await self.result_repo.delete_by_instance_id(db, task_id) + await self.task_repo.delete_task_by_id(db, safe_task_id) + await self.operator_instance_repo.delete_by_instance_id(db, safe_task_id) + await self.result_repo.delete_by_instance_id(db, safe_task_id) # 删除任务相关文件 flow_root = Path(FLOW_PATH).resolve() - task_path = (flow_root / task_id).resolve() + task_path = (flow_root / safe_task_id).resolve() # 防止路径穿越:parents 校验目标路径仍在 flow_root 下 if flow_root not in task_path.parents: logger.warning(f"Path traversal attempt in delete_task: task_id={task_id}") diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py index de29def9..fb0e8dfc 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py @@ -103,6 +103,15 @@ def check_and_get_executor_type(instances: list[OperatorInstanceDto]) -> str: @staticmethod def check_task_id(task_id: str) -> None: """Validate task ID — rejects non-UUID and path traversal patterns""" + CleaningTaskValidator.sanitize_task_id(task_id) + + @staticmethod + def sanitize_task_id(task_id: str) -> str: + """Validate and return sanitized task_id for safe path construction. + + CodeQL recognizes the return value of this function as sanitized, + allowing safe use in path construction. + """ if not task_id: raise BusinessError(ErrorCodes.CLEANING_TASK_ID_REQUIRED) if not _TASK_ID_PATTERN.match(task_id): @@ -110,3 +119,4 @@ def check_task_id(task_id: str) -> None: ErrorCodes.CLEANING_TASK_ID_REQUIRED, f"Invalid task_id format: {task_id}", ) + return task_id From 867097f09ecf85c290b8b0186f0444633bb2f815 Mon Sep 17 00:00:00 2001 From: MoeexT Date: Wed, 17 Jun 2026 11:25:47 +0800 Subject: [PATCH 8/9] fix: sanitize retry_count to resolve remaining CodeQL path warnings Add _sanitize_retry_count() / sanitize_retry_count() with range validation and use the sanitized return value in all 5 places where retry_count feeds into log_path construction. CodeQL tracks the sanitizer return value as safe. --- .../interface/cleaning_task_routes.py | 19 +++++++++++++++++-- .../cleaning/service/cleaning_task_service.py | 5 +++-- .../service/cleaning_task_validator.py | 14 ++++++++++++++ 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py index 31986cdd..81276d4e 100644 --- a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py +++ b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py @@ -32,6 +32,17 @@ def _sanitize_task_id(task_id: str) -> str: raise ValueError(f"Invalid task_id: {task_id}") return task_id + +def _sanitize_retry_count(retry_count: int) -> int: + """Validate and return sanitized retry_count for safe path construction. + + CodeQL recognizes the return value of this function as sanitized, + allowing safe use in path construction. + """ + if retry_count < 0 or retry_count > 1000: + raise ValueError(f"Invalid retry_count: {retry_count}") + return retry_count + router = APIRouter(prefix="/cleaning/tasks", tags=["Cleaning Tasks"]) @@ -322,7 +333,9 @@ async def error_generator(): return StreamingResponse(error_generator(), media_type="text/event-stream") - log_filename = "output.log" if retry_count == 0 else f"output.log.{retry_count}" + # 校验并净化 retry_count(CodeQL 认可此返回值为安全) + safe_retry_count = _sanitize_retry_count(retry_count) + log_filename = "output.log" if safe_retry_count == 0 else f"output.log.{safe_retry_count}" log_path = (flow_base / safe_task_id / log_filename).resolve() # 防止路径穿越:parents 校验仍在 flow_base 下 @@ -469,7 +482,9 @@ async def download_cleaning_task_log( raise BusinessError(ErrorCodes.CLEANING_TASK_NOT_FOUND, task_id) - log_filename = "output.log" if retry_count == 0 else f"output.log.{retry_count}" + # 校验并净化 retry_count(CodeQL 认可此返回值为安全) + safe_retry_count = _sanitize_retry_count(retry_count) + log_filename = "output.log" if safe_retry_count == 0 else f"output.log.{safe_retry_count}" log_path = (flow_base / safe_task_id / log_filename).resolve() # 防止路径穿越:parents 校验仍在 flow_base 下 diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index 04c24295..9c40b337 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -382,11 +382,12 @@ async def get_task_log( ) -> List[CleaningTaskLog]: """Get task log""" safe_task_id = self.validator.sanitize_task_id(task_id) + safe_retry_count = self.validator.sanitize_retry_count(retry_count) flow_root = Path(FLOW_PATH).resolve() log_path = flow_root / safe_task_id / "output.log" - if retry_count > 0: - log_path = flow_root / safe_task_id / f"output.log.{retry_count}" + if safe_retry_count > 0: + log_path = flow_root / safe_task_id / f"output.log.{safe_retry_count}" # 防止路径穿越:规范化后校验仍在 FLOW_PATH 下 resolved_log_path = log_path.resolve() diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py index fb0e8dfc..6de6113f 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py @@ -120,3 +120,17 @@ def sanitize_task_id(task_id: str) -> str: f"Invalid task_id format: {task_id}", ) return task_id + + @staticmethod + def sanitize_retry_count(retry_count: int) -> int: + """Validate and return sanitized retry_count for safe path construction. + + CodeQL recognizes the return value of this function as sanitized, + allowing safe use in path construction. + """ + if retry_count < 0 or retry_count > 1000: + raise BusinessError( + ErrorCodes.CLEANING_TASK_ID_REQUIRED, + f"Invalid retry_count: {retry_count}", + ) + return retry_count From 92b66b0c4a1ed27e6346416b5e4ab1fbc4a5624b Mon Sep 17 00:00:00 2001 From: MoeexT Date: Wed, 17 Jun 2026 11:30:09 +0800 Subject: [PATCH 9/9] fix: use module-level sanitizer functions in cleaning_task_service CodeQL cannot trace data flow through instance-method calls (self.validator. sanitize_*). Replace with module-level _sanitize_task_id / _sanitize_retry_count so CodeQL recognizes the sanitized return values. --- .../cleaning/service/cleaning_task_service.py | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index 9c40b337..dc55fb37 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -33,6 +33,34 @@ logger = get_logger(__name__) +# Module-level sanitizers (CodeQL recognizes return values of module-level +# functions as sanitized; instance-method calls are not tracked through) +_TASK_ID_PATTERN = re.compile( + r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$' +) + + +def _sanitize_task_id(task_id: str) -> str: + """Validate and return sanitized task_id for safe path construction.""" + if not task_id: + raise BusinessError(ErrorCodes.CLEANING_TASK_ID_REQUIRED) + if not _TASK_ID_PATTERN.match(task_id): + raise BusinessError( + ErrorCodes.CLEANING_TASK_ID_REQUIRED, + f"Invalid task_id format: {task_id}", + ) + return task_id + + +def _sanitize_retry_count(retry_count: int) -> int: + """Validate and return sanitized retry_count for safe path construction.""" + if retry_count < 0 or retry_count > 1000: + raise BusinessError( + ErrorCodes.CLEANING_TASK_ID_REQUIRED, + f"Invalid retry_count: {retry_count}", + ) + return retry_count + DATASET_PATH = "/dataset" FLOW_PATH = "/flow" @@ -381,8 +409,8 @@ async def get_task_log( self, db: AsyncSession, task_id: str, retry_count: int ) -> List[CleaningTaskLog]: """Get task log""" - safe_task_id = self.validator.sanitize_task_id(task_id) - safe_retry_count = self.validator.sanitize_retry_count(retry_count) + safe_task_id = _sanitize_task_id(task_id) + safe_retry_count = _sanitize_retry_count(retry_count) flow_root = Path(FLOW_PATH).resolve() log_path = flow_root / safe_task_id / "output.log" @@ -438,7 +466,7 @@ def _get_log_level( async def delete_task(self, db: AsyncSession, task_id: str) -> None: """Delete task""" - safe_task_id = self.validator.sanitize_task_id(task_id) + safe_task_id = _sanitize_task_id(task_id) task = await self.task_repo.find_task_by_id(db, safe_task_id) if not task: