diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java index 43b81720e..bbfab3025 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java @@ -493,6 +493,16 @@ public String preUpload(UploadFilesPreRequest chunkUploadRequest, String dataset if (!prefix.isEmpty()) { uploadPath = uploadPath + File.separator + prefix.replace("/", File.separator); } + + // 防止路径穿越:规范化后校验 uploadPath 仍在 datasetBasePath 下 + Path normalizedUploadPath = Paths.get(uploadPath).normalize().toAbsolutePath(); + Path normalizedBase = Paths.get(datasetBasePath).normalize().toAbsolutePath(); + if (!normalizedUploadPath.startsWith(normalizedBase)) { + log.warn("Path traversal attempt in preUpload: datasetId={}, prefix={}, uploadPath={}", + datasetId, prefix, normalizedUploadPath); + throw BusinessException.of(CommonErrorCode.PARAM_ERROR, + "Upload path outside allowed directory"); + } } ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build(); diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidFilePathValidator.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidFilePathValidator.java index b45a29f30..5b813e3a3 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidFilePathValidator.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidFilePathValidator.java @@ -31,6 +31,15 @@ public boolean isValid(String value, ConstraintValidatorContext context) { return true; // 空值由 @NotBlank 等其他注解处理 } + // 检查是否包含路径遍历序列 .. + if (value.contains("..")) { + context.disableDefaultConstraintViolation(); + context.buildConstraintViolationWithTemplate( + "文件路径不允许包含 '..'" + ).addConstraintViolation(); + return false; + } + boolean isValid = FILE_PATH_PATTERN.matcher(value).matches(); if (!isValid) { diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidPathValidator.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidPathValidator.java index 69bb297b9..1b7141896 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidPathValidator.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/validation/ValidPathValidator.java @@ -54,6 +54,15 @@ public boolean isValid(String value, ConstraintValidatorContext context) { return false; } + // 检查是否包含路径遍历序列 .. + if (value.contains("..")) { + context.disableDefaultConstraintViolation(); + context.buildConstraintViolationWithTemplate( + "路径不允许包含 '..'" + ).addConstraintViolation(); + return false; + } + // 检查是否包含非法字符 if (!PATH_PATTERN.matcher(value).matches()) { context.disableDefaultConstraintViolation(); diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java index 63db70281..d00362dd2 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ChunksSaver.java @@ -49,6 +49,14 @@ public static Optional save(ChunkUploadRequest fileUploadRequest, ChunkUpl } File finalFile = new File(preUploadReq.getUploadPath(), fileUploadRequest.getFileName()); + // 防止路径穿越:规范化后校验仍在 uploadPath 下 + Path uploadBasePath = Paths.get(preUploadReq.getUploadPath()).normalize().toAbsolutePath(); + Path finalFilePath = finalFile.toPath().normalize().toAbsolutePath(); + if (!finalFilePath.startsWith(uploadBasePath)) { + log.error("Path traversal attempt detected in chunk save: fileName={}, finalPath={}", + fileUploadRequest.getFileName(), finalFilePath); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } // 确保父目录存在(处理嵌套文件夹上传的情况) File parentDir = finalFile.getParentFile(); if (parentDir != null && !parentDir.exists()) { @@ -91,6 +99,14 @@ private static InputStream getFileInputStream(MultipartFile file) { public static File saveFile(ChunkUploadRequest fileUploadRequest, ChunkUploadPreRequest preUploadReq) { // 保存文件 File targetFile = new File(preUploadReq.getUploadPath(), fileUploadRequest.getFileName()); + // 防止路径穿越:规范化后校验仍在 uploadPath 下 + Path uploadBasePath = Paths.get(preUploadReq.getUploadPath()).normalize().toAbsolutePath(); + Path targetFilePath = targetFile.toPath().normalize().toAbsolutePath(); + if (!targetFilePath.startsWith(uploadBasePath)) { + log.error("Path traversal attempt detected in saveFile: fileName={}, targetPath={}", + fileUploadRequest.getFileName(), targetFilePath); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } // 确保父目录存在(处理嵌套文件夹上传的情况) File parentDir = targetFile.getParentFile(); if (parentDir != null && !parentDir.exists()) { diff --git a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py index 8e4cef92c..81276d4e1 100644 --- a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py +++ b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py @@ -1,4 +1,5 @@ from typing import Optional +import re from fastapi import APIRouter, Depends from fastapi.responses import StreamingResponse @@ -17,6 +18,31 @@ logger = get_logger(__name__) +# Module-level pattern for task_id sanitization (CodeQL sanitizer) +_TASK_ID_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$") + + +def _sanitize_task_id(task_id: str) -> str: + """Validate and return sanitized task_id for safe path construction. + + CodeQL recognizes the return value of this function as sanitized, + allowing safe use in path construction. + """ + if not _TASK_ID_PATTERN.fullmatch(task_id): + raise ValueError(f"Invalid task_id: {task_id}") + return task_id + + +def _sanitize_retry_count(retry_count: int) -> int: + """Validate and return sanitized retry_count for safe path construction. + + CodeQL recognizes the return value of this function as sanitized, + allowing safe use in path construction. + """ + if retry_count < 0 or retry_count > 1000: + raise ValueError(f"Invalid retry_count: {retry_count}") + return retry_count + router = APIRouter(prefix="/cleaning/tasks", tags=["Cleaning Tasks"]) @@ -284,10 +310,21 @@ async def stream_cleaning_task_log( import re from pathlib import Path - FLOW_PATH = "/flow" + flow_base = Path("/flow").resolve() + + # 校验 task_id 并获取净化后的值(CodeQL 认可此返回值为安全) + try: + safe_task_id = _sanitize_task_id(task_id) + except ValueError: + logger.warning(f"Invalid task_id in stream_log: task_id={task_id}") + + async def invalid_error_generator(): + yield f"data: {json.dumps({'level': 'ERROR', 'message': 'Invalid task_id'}, ensure_ascii=False)}\n\n" + + return StreamingResponse(invalid_error_generator(), media_type="text/event-stream") task_service = _get_task_service(db) - task = await task_service.task_repo.find_task_by_id(db, task_id) + task = await task_service.task_repo.find_task_by_id(db, safe_task_id) if not task: @@ -296,9 +333,19 @@ async def error_generator(): return StreamingResponse(error_generator(), media_type="text/event-stream") - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log") - if retry_count > 0: - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") + # 校验并净化 retry_count(CodeQL 认可此返回值为安全) + safe_retry_count = _sanitize_retry_count(retry_count) + log_filename = "output.log" if safe_retry_count == 0 else f"output.log.{safe_retry_count}" + log_path = (flow_base / safe_task_id / log_filename).resolve() + + # 防止路径穿越:parents 校验仍在 flow_base 下 + if flow_base not in log_path.parents: + logger.warning(f"Path traversal attempt in stream_log: task_id={task_id}") + + async def traversal_error_generator(): + yield f"data: {json.dumps({'level': 'ERROR', 'message': 'Invalid task_id'}, ensure_ascii=False)}\n\n" + + return StreamingResponse(traversal_error_generator(), media_type="text/event-stream") standard_level_pattern = re.compile( r"\b(DEBUG|Debug|INFO|Info|WARN|Warn|WARNING|Warning|ERROR|Error|FATAL|Fatal)\b" @@ -328,7 +375,7 @@ async def log_generator(): while True: try: - current_task = await task_service.task_repo.find_task_by_id(db, task_id) + current_task = await task_service.task_repo.find_task_by_id(db, safe_task_id) is_task_finished = current_task and current_task.status in [ "COMPLETED", "FAILED", @@ -414,19 +461,40 @@ async def download_cleaning_task_log( from pathlib import Path from fastapi.responses import FileResponse - FLOW_PATH = "/flow" + flow_base = Path("/flow").resolve() + + # 校验 task_id 并获取净化后的值(CodeQL 认可此返回值为安全) + try: + safe_task_id = _sanitize_task_id(task_id) + except ValueError: + logger.warning(f"Invalid task_id in download_log: task_id={task_id}") + from app.core.exception import BusinessError, ErrorCodes + raise BusinessError( + ErrorCodes.CLEANING_TASK_LOG_NOT_FOUND, + f"Invalid task_id: {task_id}", + ) task_service = _get_task_service(db) - task = await task_service.task_repo.find_task_by_id(db, task_id) + task = await task_service.task_repo.find_task_by_id(db, safe_task_id) if not task: from app.core.exception import BusinessError, ErrorCodes raise BusinessError(ErrorCodes.CLEANING_TASK_NOT_FOUND, task_id) - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log") - if retry_count > 0: - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") + # 校验并净化 retry_count(CodeQL 认可此返回值为安全) + safe_retry_count = _sanitize_retry_count(retry_count) + log_filename = "output.log" if safe_retry_count == 0 else f"output.log.{safe_retry_count}" + log_path = (flow_base / safe_task_id / log_filename).resolve() + + # 防止路径穿越:parents 校验仍在 flow_base 下 + if flow_base not in log_path.parents: + logger.warning(f"Path traversal attempt in download_log: task_id={task_id}") + from app.core.exception import BusinessError, ErrorCodes + raise BusinessError( + ErrorCodes.CLEANING_TASK_LOG_NOT_FOUND, + f"Invalid task_id: {task_id}", + ) if not log_path.exists(): from app.core.exception import BusinessError, ErrorCodes diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index 93ddd3a3f..dc55fb37b 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -1,4 +1,5 @@ import json +import os import re import shutil import uuid @@ -32,6 +33,34 @@ logger = get_logger(__name__) +# Module-level sanitizers (CodeQL recognizes return values of module-level +# functions as sanitized; instance-method calls are not tracked through) +_TASK_ID_PATTERN = re.compile( + r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$' +) + + +def _sanitize_task_id(task_id: str) -> str: + """Validate and return sanitized task_id for safe path construction.""" + if not task_id: + raise BusinessError(ErrorCodes.CLEANING_TASK_ID_REQUIRED) + if not _TASK_ID_PATTERN.match(task_id): + raise BusinessError( + ErrorCodes.CLEANING_TASK_ID_REQUIRED, + f"Invalid task_id format: {task_id}", + ) + return task_id + + +def _sanitize_retry_count(retry_count: int) -> int: + """Validate and return sanitized retry_count for safe path construction.""" + if retry_count < 0 or retry_count > 1000: + raise BusinessError( + ErrorCodes.CLEANING_TASK_ID_REQUIRED, + f"Invalid retry_count: {retry_count}", + ) + return retry_count + DATASET_PATH = "/dataset" FLOW_PATH = "/flow" @@ -380,13 +409,21 @@ async def get_task_log( self, db: AsyncSession, task_id: str, retry_count: int ) -> List[CleaningTaskLog]: """Get task log""" - self.validator.check_task_id(task_id) - - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log") - if retry_count > 0: - log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") + safe_task_id = _sanitize_task_id(task_id) + safe_retry_count = _sanitize_retry_count(retry_count) + + flow_root = Path(FLOW_PATH).resolve() + log_path = flow_root / safe_task_id / "output.log" + if safe_retry_count > 0: + log_path = flow_root / safe_task_id / f"output.log.{safe_retry_count}" + + # 防止路径穿越:规范化后校验仍在 FLOW_PATH 下 + resolved_log_path = log_path.resolve() + if flow_root not in resolved_log_path.parents: + logger.warning(f"Path traversal attempt detected: task_id={task_id}") + return [] - if not log_path.exists(): + if not resolved_log_path.exists(): return [] logs = [] @@ -397,7 +434,7 @@ async def get_task_log( ) exception_suffix_pattern = re.compile(r"\b\w+(Warning|Error|Exception)\b") - with open(log_path, "r", encoding="utf-8") as f: + with open(resolved_log_path, "r", encoding="utf-8") as f: for line in f: last_level = self._get_log_level( line, last_level, standard_level_pattern, exception_suffix_pattern @@ -429,9 +466,9 @@ def _get_log_level( async def delete_task(self, db: AsyncSession, task_id: str) -> None: """Delete task""" - self.validator.check_task_id(task_id) + safe_task_id = _sanitize_task_id(task_id) - task = await self.task_repo.find_task_by_id(db, task_id) + task = await self.task_repo.find_task_by_id(db, safe_task_id) if not task: raise BusinessError(ErrorCodes.CLEANING_TASK_NOT_FOUND, task_id) @@ -442,12 +479,21 @@ async def delete_task(self, db: AsyncSession, task_id: str) -> None: "Task is running, cannot be deleted. Please stop the task first." ) - await self.task_repo.delete_task_by_id(db, task_id) - await self.operator_instance_repo.delete_by_instance_id(db, task_id) - await self.result_repo.delete_by_instance_id(db, task_id) + await self.task_repo.delete_task_by_id(db, safe_task_id) + await self.operator_instance_repo.delete_by_instance_id(db, safe_task_id) + await self.result_repo.delete_by_instance_id(db, safe_task_id) # 删除任务相关文件 - task_path = Path(f"{FLOW_PATH}/{task_id}") + flow_root = Path(FLOW_PATH).resolve() + task_path = (flow_root / safe_task_id).resolve() + # 防止路径穿越:parents 校验目标路径仍在 flow_root 下 + if flow_root not in task_path.parents: + logger.warning(f"Path traversal attempt in delete_task: task_id={task_id}") + raise BusinessError( + ErrorCodes.CLEANING_TASK_NOT_FOUND, + f"Invalid task_id: {task_id}", + ) + if task_path.exists(): try: shutil.rmtree(task_path) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py index 254652dc7..6de6113fc 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_validator.py @@ -1,9 +1,16 @@ +import re + from sqlalchemy.ext.asyncio import AsyncSession from app.core.exception import BusinessError, ErrorCodes from app.module.cleaning.schema import OperatorInstanceDto from app.module.operator.constants import CATEGORY_DATA_JUICER_ID, CATEGORY_DATAMATE_ID +# UUID pattern for task_id validation (prevents path traversal) +_TASK_ID_PATTERN = re.compile( + r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$' +) + class CleaningTaskValidator: """Validator for cleaning tasks and templates""" @@ -95,6 +102,35 @@ def check_and_get_executor_type(instances: list[OperatorInstanceDto]) -> str: @staticmethod def check_task_id(task_id: str) -> None: - """Validate task ID""" + """Validate task ID — rejects non-UUID and path traversal patterns""" + CleaningTaskValidator.sanitize_task_id(task_id) + + @staticmethod + def sanitize_task_id(task_id: str) -> str: + """Validate and return sanitized task_id for safe path construction. + + CodeQL recognizes the return value of this function as sanitized, + allowing safe use in path construction. + """ if not task_id: raise BusinessError(ErrorCodes.CLEANING_TASK_ID_REQUIRED) + if not _TASK_ID_PATTERN.match(task_id): + raise BusinessError( + ErrorCodes.CLEANING_TASK_ID_REQUIRED, + f"Invalid task_id format: {task_id}", + ) + return task_id + + @staticmethod + def sanitize_retry_count(retry_count: int) -> int: + """Validate and return sanitized retry_count for safe path construction. + + CodeQL recognizes the return value of this function as sanitized, + allowing safe use in path construction. + """ + if retry_count < 0 or retry_count > 1000: + raise BusinessError( + ErrorCodes.CLEANING_TASK_ID_REQUIRED, + f"Invalid retry_count: {retry_count}", + ) + return retry_count diff --git a/runtime/datamate-python/app/module/shared/chunks_saver.py b/runtime/datamate-python/app/module/shared/chunks_saver.py index 554b263b3..e8bff5777 100644 --- a/runtime/datamate-python/app/module/shared/chunks_saver.py +++ b/runtime/datamate-python/app/module/shared/chunks_saver.py @@ -57,6 +57,16 @@ def save( final_file = Path(upload_path) / file_upload_request.file_name + # 防止路径穿越:规范化后校验仍在 upload_path 下 + resolved_final = final_file.resolve() + resolved_base = Path(upload_path).resolve() + if not str(resolved_final).startswith(str(resolved_base) + os.sep): + logger.error( + f"Path traversal attempt in chunk save: file_name={file_upload_request.file_name}, " + f"resolved_path={resolved_final}" + ) + raise ValueError("Path traversal detected in file name") + try: temp_file.rename(final_file) except OSError as e: @@ -89,6 +99,16 @@ def save_file( """ target_file = Path(upload_path) / file_upload_request.file_name + # 防止路径穿越:规范化后校验仍在 upload_path 下 + resolved_target = target_file.resolve() + resolved_base = Path(upload_path).resolve() + if not str(resolved_target).startswith(str(resolved_base) + os.sep): + logger.error( + f"Path traversal attempt in save_file: file_name={file_upload_request.file_name}, " + f"resolved_path={resolved_target}" + ) + raise ValueError("Path traversal detected in file name") + logger.info(f"file path {target_file}, file size {len(file_content)}") try: diff --git a/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java index 827e60b9a..d5a8fe5e6 100644 --- a/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java +++ b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java @@ -98,7 +98,16 @@ public void startRead(RecordSender recordSender) { readPath = this.mountPoint + "/" + this.subPath.replaceFirst("^/+", ""); } - try (Stream stream = Files.list(Paths.get(readPath))) { + // 防止路径穿越:规范化后校验读取路径仍在 mount 点下 + Path normalizedReadPath = Paths.get(readPath).normalize().toAbsolutePath(); + Path normalizedMount = Paths.get(this.mountPoint).normalize().toAbsolutePath(); + if (!normalizedReadPath.startsWith(normalizedMount)) { + LOG.error("Path traversal detected in reader: readPath={} outside mountPoint={}", + normalizedReadPath, normalizedMount); + throw new RuntimeException("Read path outside mount point: " + readPath); + } + + try (Stream stream = Files.list(normalizedReadPath)) { List fileList = stream.filter(Files::isRegularFile) .filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file))) .map(path -> path.getFileName().toString()) diff --git a/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java index 61eba8835..788a95a00 100644 --- a/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java +++ b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java @@ -13,6 +13,8 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -26,6 +28,9 @@ public class GlusterfsWriter extends Writer { private static final Logger LOG = LoggerFactory.getLogger(GlusterfsWriter.class); + /** 允许的目标路径基础目录 */ + private static final String ALLOWED_DEST_BASE = "/dataset"; + public static class Job extends Writer.Job { private Configuration jobConfig; private String mountPoint; @@ -49,7 +54,13 @@ public void prepare() { GlusterfsMountUtil.mount(remote, mountPoint); String destPath = this.jobConfig.getString("destPath"); - new File(destPath).mkdirs(); + // 防止路径穿越:规范化后校验目标路径在允许目录内 + Path normalizedDest = Paths.get(destPath).normalize().toAbsolutePath(); + if (!normalizedDest.startsWith(ALLOWED_DEST_BASE)) { + throw new IllegalArgumentException( + "destPath is outside allowed directory: " + destPath); + } + new File(normalizedDest.toString()).mkdirs(); } @Override @@ -86,6 +97,12 @@ public void init() { this.mountPoint = this.jobConfig.getString("mountPoint"); this.subPath = this.jobConfig.getString("path", ""); this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class); + + // 防止路径穿越:subPath 不能包含 .. + if (StringUtils.isNotBlank(this.subPath) && this.subPath.contains("..")) { + throw new IllegalArgumentException( + "Invalid subPath (path traversal): " + this.subPath); + } } @Override @@ -95,6 +112,23 @@ public void startWrite(RecordReceiver lineReceiver) { sourcePath = this.mountPoint + "/" + this.subPath.replaceFirst("^/+", ""); } + // 防止路径穿越:规范化后校验源路径仍在 mount 点下 + Path normalizedSourcePath = Paths.get(sourcePath).normalize().toAbsolutePath(); + Path normalizedMount = Paths.get(this.mountPoint).normalize().toAbsolutePath(); + if (!normalizedSourcePath.startsWith(normalizedMount)) { + LOG.error("Path traversal detected: sourcePath={} outside mountPoint={}", + normalizedSourcePath, normalizedMount); + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, + "Source path outside mount point: " + sourcePath); + } + + // 防止路径穿越:校验目标路径在允许目录内 + Path normalizedDest = Paths.get(this.destPath).normalize().toAbsolutePath(); + if (!normalizedDest.startsWith(ALLOWED_DEST_BASE)) { + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, + "destPath outside allowed directory: " + this.destPath); + } + try { Record record; while ((record = lineReceiver.getFromReader()) != null) { @@ -106,9 +140,15 @@ public void startWrite(RecordReceiver lineReceiver) { continue; } - String filePath = sourcePath + "/" + fileName; + // 防止路径穿越:fileName 不能包含路径分隔符或 .. + if (fileName.contains("..") || fileName.contains("/") || fileName.contains("\\")) { + LOG.warn("Skipping file with suspicious name: {}", fileName); + continue; + } + + String filePath = normalizedSourcePath + "/" + fileName; ShellUtil.runCommand("rsync", Arrays.asList("--no-links", "--chmod=754", "--", filePath, - this.destPath + "/" + fileName)); + normalizedDest + "/" + fileName)); } } catch (Exception e) { LOG.error("Error writing files from GlusterFS: {}", e.getMessage(), e);