Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,16 @@ public String preUpload(UploadFilesPreRequest chunkUploadRequest, String dataset
if (!prefix.isEmpty()) {
uploadPath = uploadPath + File.separator + prefix.replace("/", File.separator);
}

// 防止路径穿越:规范化后校验 uploadPath 仍在 datasetBasePath 下
Path normalizedUploadPath = Paths.get(uploadPath).normalize().toAbsolutePath();
Path normalizedBase = Paths.get(datasetBasePath).normalize().toAbsolutePath();
if (!normalizedUploadPath.startsWith(normalizedBase)) {
log.warn("Path traversal attempt in preUpload: datasetId={}, prefix={}, uploadPath={}",
datasetId, prefix, normalizedUploadPath);
throw BusinessException.of(CommonErrorCode.PARAM_ERROR,
"Upload path outside allowed directory");
}
}

ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ public boolean isValid(String value, ConstraintValidatorContext context) {
return true; // 空值由 @NotBlank 等其他注解处理
}

// 检查是否包含路径遍历序列 ..
if (value.contains("..")) {
context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate(
"文件路径不允许包含 '..'"
).addConstraintViolation();
return false;
}

boolean isValid = FILE_PATH_PATTERN.matcher(value).matches();

if (!isValid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ public boolean isValid(String value, ConstraintValidatorContext context) {
return false;
}

// 检查是否包含路径遍历序列 ..
if (value.contains("..")) {
context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate(
"路径不允许包含 '..'"
).addConstraintViolation();
return false;
}

// 检查是否包含非法字符
if (!PATH_PATTERN.matcher(value).matches()) {
context.disableDefaultConstraintViolation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public static Optional<File> save(ChunkUploadRequest fileUploadRequest, ChunkUpl
}

File finalFile = new File(preUploadReq.getUploadPath(), fileUploadRequest.getFileName());
// 防止路径穿越:规范化后校验仍在 uploadPath 下
Path uploadBasePath = Paths.get(preUploadReq.getUploadPath()).normalize().toAbsolutePath();
Path finalFilePath = finalFile.toPath().normalize().toAbsolutePath();
if (!finalFilePath.startsWith(uploadBasePath)) {
log.error("Path traversal attempt detected in chunk save: fileName={}, finalPath={}",
fileUploadRequest.getFileName(), finalFilePath);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
// 确保父目录存在(处理嵌套文件夹上传的情况)
File parentDir = finalFile.getParentFile();
if (parentDir != null && !parentDir.exists()) {
Expand Down Expand Up @@ -91,6 +99,14 @@ private static InputStream getFileInputStream(MultipartFile file) {
public static File saveFile(ChunkUploadRequest fileUploadRequest, ChunkUploadPreRequest preUploadReq) {
// 保存文件
File targetFile = new File(preUploadReq.getUploadPath(), fileUploadRequest.getFileName());
// 防止路径穿越:规范化后校验仍在 uploadPath 下
Path uploadBasePath = Paths.get(preUploadReq.getUploadPath()).normalize().toAbsolutePath();
Path targetFilePath = targetFile.toPath().normalize().toAbsolutePath();
if (!targetFilePath.startsWith(uploadBasePath)) {
log.error("Path traversal attempt detected in saveFile: fileName={}, targetPath={}",
fileUploadRequest.getFileName(), targetFilePath);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
// 确保父目录存在(处理嵌套文件夹上传的情况)
File parentDir = targetFile.getParentFile();
if (parentDir != null && !parentDir.exists()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Optional
import re

from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
Expand All @@ -17,6 +18,31 @@

logger = get_logger(__name__)

# Module-level pattern for task_id sanitization (CodeQL sanitizer)
_TASK_ID_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$")


def _sanitize_task_id(task_id: str) -> str:
"""Validate and return sanitized task_id for safe path construction.

CodeQL recognizes the return value of this function as sanitized,
allowing safe use in path construction.
"""
if not _TASK_ID_PATTERN.fullmatch(task_id):
raise ValueError(f"Invalid task_id: {task_id}")
return task_id


def _sanitize_retry_count(retry_count: int) -> int:
"""Validate and return sanitized retry_count for safe path construction.

CodeQL recognizes the return value of this function as sanitized,
allowing safe use in path construction.
"""
if retry_count < 0 or retry_count > 1000:
raise ValueError(f"Invalid retry_count: {retry_count}")
return retry_count

router = APIRouter(prefix="/cleaning/tasks", tags=["Cleaning Tasks"])


Expand Down Expand Up @@ -284,10 +310,21 @@
import re
from pathlib import Path

FLOW_PATH = "/flow"
flow_base = Path("/flow").resolve()

# 校验 task_id 并获取净化后的值(CodeQL 认可此返回值为安全)
try:
safe_task_id = _sanitize_task_id(task_id)
except ValueError:
logger.warning(f"Invalid task_id in stream_log: task_id={task_id}")

async def invalid_error_generator():
yield f"data: {json.dumps({'level': 'ERROR', 'message': 'Invalid task_id'}, ensure_ascii=False)}\n\n"

return StreamingResponse(invalid_error_generator(), media_type="text/event-stream")

task_service = _get_task_service(db)
task = await task_service.task_repo.find_task_by_id(db, task_id)
task = await task_service.task_repo.find_task_by_id(db, safe_task_id)

if not task:

Expand All @@ -296,9 +333,19 @@

return StreamingResponse(error_generator(), media_type="text/event-stream")

log_path = Path(f"{FLOW_PATH}/{task_id}/output.log")
if retry_count > 0:
log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}")
# 校验并净化 retry_count(CodeQL 认可此返回值为安全)
safe_retry_count = _sanitize_retry_count(retry_count)
log_filename = "output.log" if safe_retry_count == 0 else f"output.log.{safe_retry_count}"
log_path = (flow_base / safe_task_id / log_filename).resolve()

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.
Comment thread
MoeexT marked this conversation as resolved.
Dismissed

# 防止路径穿越:parents 校验仍在 flow_base 下
if flow_base not in log_path.parents:
logger.warning(f"Path traversal attempt in stream_log: task_id={task_id}")

async def traversal_error_generator():
yield f"data: {json.dumps({'level': 'ERROR', 'message': 'Invalid task_id'}, ensure_ascii=False)}\n\n"

return StreamingResponse(traversal_error_generator(), media_type="text/event-stream")

standard_level_pattern = re.compile(
r"\b(DEBUG|Debug|INFO|Info|WARN|Warn|WARNING|Warning|ERROR|Error|FATAL|Fatal)\b"
Expand Down Expand Up @@ -328,7 +375,7 @@

while True:
try:
current_task = await task_service.task_repo.find_task_by_id(db, task_id)
current_task = await task_service.task_repo.find_task_by_id(db, safe_task_id)
is_task_finished = current_task and current_task.status in [
"COMPLETED",
"FAILED",
Expand Down Expand Up @@ -414,19 +461,40 @@
from pathlib import Path
from fastapi.responses import FileResponse

FLOW_PATH = "/flow"
flow_base = Path("/flow").resolve()

# 校验 task_id 并获取净化后的值(CodeQL 认可此返回值为安全)
try:
safe_task_id = _sanitize_task_id(task_id)
except ValueError:
logger.warning(f"Invalid task_id in download_log: task_id={task_id}")
from app.core.exception import BusinessError, ErrorCodes
raise BusinessError(
ErrorCodes.CLEANING_TASK_LOG_NOT_FOUND,
f"Invalid task_id: {task_id}",
)

task_service = _get_task_service(db)
task = await task_service.task_repo.find_task_by_id(db, task_id)
task = await task_service.task_repo.find_task_by_id(db, safe_task_id)

if not task:
from app.core.exception import BusinessError, ErrorCodes

raise BusinessError(ErrorCodes.CLEANING_TASK_NOT_FOUND, task_id)

log_path = Path(f"{FLOW_PATH}/{task_id}/output.log")
if retry_count > 0:
log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}")
# 校验并净化 retry_count(CodeQL 认可此返回值为安全)
safe_retry_count = _sanitize_retry_count(retry_count)
log_filename = "output.log" if safe_retry_count == 0 else f"output.log.{safe_retry_count}"
log_path = (flow_base / safe_task_id / log_filename).resolve()

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.
Comment thread
MoeexT marked this conversation as resolved.
Dismissed

# 防止路径穿越:parents 校验仍在 flow_base 下
if flow_base not in log_path.parents:
logger.warning(f"Path traversal attempt in download_log: task_id={task_id}")
from app.core.exception import BusinessError, ErrorCodes
raise BusinessError(
ErrorCodes.CLEANING_TASK_LOG_NOT_FOUND,
f"Invalid task_id: {task_id}",
)

if not log_path.exists():
from app.core.exception import BusinessError, ErrorCodes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import os
import re
import shutil
import uuid
Expand Down Expand Up @@ -32,6 +33,34 @@

logger = get_logger(__name__)

# Module-level sanitizers (CodeQL recognizes return values of module-level
# functions as sanitized; instance-method calls are not tracked through)
_TASK_ID_PATTERN = re.compile(
r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$'
)


def _sanitize_task_id(task_id: str) -> str:
"""Validate and return sanitized task_id for safe path construction."""
if not task_id:
raise BusinessError(ErrorCodes.CLEANING_TASK_ID_REQUIRED)
if not _TASK_ID_PATTERN.match(task_id):
raise BusinessError(
ErrorCodes.CLEANING_TASK_ID_REQUIRED,
f"Invalid task_id format: {task_id}",
)
return task_id


def _sanitize_retry_count(retry_count: int) -> int:
"""Validate and return sanitized retry_count for safe path construction."""
if retry_count < 0 or retry_count > 1000:
raise BusinessError(
ErrorCodes.CLEANING_TASK_ID_REQUIRED,
f"Invalid retry_count: {retry_count}",
)
return retry_count

DATASET_PATH = "/dataset"
FLOW_PATH = "/flow"

Expand Down Expand Up @@ -380,13 +409,21 @@
self, db: AsyncSession, task_id: str, retry_count: int
) -> List[CleaningTaskLog]:
"""Get task log"""
self.validator.check_task_id(task_id)

log_path = Path(f"{FLOW_PATH}/{task_id}/output.log")
if retry_count > 0:
log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}")
safe_task_id = _sanitize_task_id(task_id)
safe_retry_count = _sanitize_retry_count(retry_count)

flow_root = Path(FLOW_PATH).resolve()
log_path = flow_root / safe_task_id / "output.log"
if safe_retry_count > 0:
log_path = flow_root / safe_task_id / f"output.log.{safe_retry_count}"

# 防止路径穿越:规范化后校验仍在 FLOW_PATH 下
resolved_log_path = log_path.resolve()

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Comment thread
MoeexT marked this conversation as resolved.
Dismissed
if flow_root not in resolved_log_path.parents:
logger.warning(f"Path traversal attempt detected: task_id={task_id}")
return []

if not log_path.exists():
if not resolved_log_path.exists():

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.
Comment thread
MoeexT marked this conversation as resolved.
Dismissed
return []

logs = []
Expand All @@ -397,7 +434,7 @@
)
exception_suffix_pattern = re.compile(r"\b\w+(Warning|Error|Exception)\b")

with open(log_path, "r", encoding="utf-8") as f:
with open(resolved_log_path, "r", encoding="utf-8") as f:

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.
Comment thread
MoeexT marked this conversation as resolved.
Dismissed
for line in f:
last_level = self._get_log_level(
line, last_level, standard_level_pattern, exception_suffix_pattern
Expand Down Expand Up @@ -429,9 +466,9 @@

async def delete_task(self, db: AsyncSession, task_id: str) -> None:
"""Delete task"""
self.validator.check_task_id(task_id)
safe_task_id = _sanitize_task_id(task_id)

task = await self.task_repo.find_task_by_id(db, task_id)
task = await self.task_repo.find_task_by_id(db, safe_task_id)
if not task:
raise BusinessError(ErrorCodes.CLEANING_TASK_NOT_FOUND, task_id)

Expand All @@ -442,12 +479,21 @@
"Task is running, cannot be deleted. Please stop the task first."
)

await self.task_repo.delete_task_by_id(db, task_id)
await self.operator_instance_repo.delete_by_instance_id(db, task_id)
await self.result_repo.delete_by_instance_id(db, task_id)
await self.task_repo.delete_task_by_id(db, safe_task_id)
await self.operator_instance_repo.delete_by_instance_id(db, safe_task_id)
await self.result_repo.delete_by_instance_id(db, safe_task_id)

# 删除任务相关文件
task_path = Path(f"{FLOW_PATH}/{task_id}")
flow_root = Path(FLOW_PATH).resolve()
task_path = (flow_root / safe_task_id).resolve()

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
Comment thread
MoeexT marked this conversation as resolved.
Dismissed
# 防止路径穿越:parents 校验目标路径仍在 flow_root 下
if flow_root not in task_path.parents:
logger.warning(f"Path traversal attempt in delete_task: task_id={task_id}")
raise BusinessError(
ErrorCodes.CLEANING_TASK_NOT_FOUND,
f"Invalid task_id: {task_id}",
)

if task_path.exists():
try:
shutil.rmtree(task_path)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import re

from sqlalchemy.ext.asyncio import AsyncSession

from app.core.exception import BusinessError, ErrorCodes
from app.module.cleaning.schema import OperatorInstanceDto
from app.module.operator.constants import CATEGORY_DATA_JUICER_ID, CATEGORY_DATAMATE_ID

# UUID pattern for task_id validation (prevents path traversal)
_TASK_ID_PATTERN = re.compile(
r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$'
)


class CleaningTaskValidator:
"""Validator for cleaning tasks and templates"""
Expand Down Expand Up @@ -95,6 +102,35 @@ def check_and_get_executor_type(instances: list[OperatorInstanceDto]) -> str:

@staticmethod
def check_task_id(task_id: str) -> None:
"""Validate task ID"""
"""Validate task ID — rejects non-UUID and path traversal patterns"""
CleaningTaskValidator.sanitize_task_id(task_id)

@staticmethod
def sanitize_task_id(task_id: str) -> str:
"""Validate and return sanitized task_id for safe path construction.

CodeQL recognizes the return value of this function as sanitized,
allowing safe use in path construction.
"""
if not task_id:
raise BusinessError(ErrorCodes.CLEANING_TASK_ID_REQUIRED)
if not _TASK_ID_PATTERN.match(task_id):
raise BusinessError(
ErrorCodes.CLEANING_TASK_ID_REQUIRED,
f"Invalid task_id format: {task_id}",
)
return task_id

@staticmethod
def sanitize_retry_count(retry_count: int) -> int:
"""Validate and return sanitized retry_count for safe path construction.

CodeQL recognizes the return value of this function as sanitized,
allowing safe use in path construction.
"""
if retry_count < 0 or retry_count > 1000:
raise BusinessError(
ErrorCodes.CLEANING_TASK_ID_REQUIRED,
f"Invalid retry_count: {retry_count}",
)
return retry_count
Loading
Loading