diff --git a/runtime/python-executor/datamate/core/dataset.py b/runtime/python-executor/datamate/core/dataset.py index 16d45cd6b..2226886f1 100644 --- a/runtime/python-executor/datamate/core/dataset.py +++ b/runtime/python-executor/datamate/core/dataset.py @@ -3,6 +3,7 @@ from __future__ import annotations import os +import re import importlib import sys import uuid @@ -129,6 +130,10 @@ def load_ops_module(self, op_name): :param op_name: 算子名称 :return: 算子对象 ''' + # Validate op_name to prevent path traversal / code injection (CodeQL / FCE) + if not op_name or not re.match(r'^[A-Za-z_][A-Za-z0-9_]*$', op_name): + raise ValueError(f"Invalid operator name: {op_name}") + parent_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ops") if parent_dir not in sys.path: sys.path.insert(0, parent_dir) @@ -137,6 +142,9 @@ def load_ops_module(self, op_name): from core.base_op import OPERATORS as RELATIVE_OPERATORS registry_content = RELATIVE_OPERATORS.modules.get(op_name) if isinstance(registry_content, str): + # Validate registry_content is a safe dotted module path (no path separators) + if not re.match(r'^[A-Za-z_][A-Za-z0-9_.]*$', registry_content): + raise ValueError(f"Invalid module path for operator {op_name}: {registry_content}") # registry_content是module的路径 submodule = importlib.import_module(registry_content) res = getattr(submodule, op_name, None) diff --git a/runtime/python-executor/datamate/ops/__init__.py b/runtime/python-executor/datamate/ops/__init__.py index efa830d6a..1f884a9d6 100644 --- a/runtime/python-executor/datamate/ops/__init__.py +++ b/runtime/python-executor/datamate/ops/__init__.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import importlib import os +import re import sys from pathlib import Path @@ -13,8 +14,10 @@ # 遍历子目录 for module_name in os.listdir(current_dir): module_path = os.path.join(current_dir, module_name) - # 检查是否是目录且包含 __init__.py - if os.path.isdir(module_path) and '__init__.py' in os.listdir(module_path): + # 检查是否是目录且包含 __init__.py,且 module_name 为合法的 Python 标识符 + if (os.path.isdir(module_path) + and '__init__.py' in os.listdir(module_path) + and re.match(r'^[A-Za-z_][A-Za-z0-9_]*$', module_name)): # 动态导入模块 try: importlib.import_module(f".{module_name}", package=__name__) diff --git a/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py b/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py index 87701d1b7..dd0da71ff 100644 --- a/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py +++ b/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py @@ -11,13 +11,14 @@ class CommandTask(Task): """命令任务包装类""" - def __init__(self, task_id: str, command: str, log_path = None, shell: bool = True, - timeout: Optional[int] = None, *args, **kwargs): + def __init__(self, task_id: str, command: str = None, log_path = None, shell: bool = True, + timeout: Optional[int] = None, cmd_args: List[str] = None, *args, **kwargs): super().__init__(task_id, *args, **kwargs) self.max_backups = 9 self.log_path = log_path self.command = command - self.shell = shell + self.cmd_args = cmd_args + self.shell = shell if cmd_args is None else False self.timeout = timeout self.return_code = None self._process = None @@ -44,8 +45,15 @@ async def _execute(self): current_log_path = f"{self.log_path}.{counter}" with open(current_log_path, 'a') as f: - # 使用 asyncio.create_subprocess_shell 或 create_subprocess_exec - if self.shell: + if self.cmd_args is not None: + # Safe: use argument list with create_subprocess_exec (no shell injection) + process = await asyncio.create_subprocess_exec( + *self.cmd_args, + stdout=f, + stderr=asyncio.subprocess.STDOUT, + **self.kwargs + ) + elif self.shell: process = await asyncio.create_subprocess_shell( self.command, stdout=f, @@ -132,7 +140,7 @@ def cancel(self) -> bool: def to_result(self) -> TaskResult: """转换为结果对象""" self.result = { - "command": self.command, + "command": self.command or self.cmd_args, "return_code": self.return_code, } return super().to_result() @@ -144,13 +152,13 @@ class CommandScheduler(TaskScheduler): def __init__(self, max_concurrent: int = 5): super().__init__(max_concurrent) - async def submit(self, task_id, command: str, log_path = None, shell: bool = True, - timeout: Optional[int] = None, **kwargs) -> str: + async def submit(self, task_id, command: str = None, log_path = None, shell: bool = True, + timeout: Optional[int] = None, cmd_args: List[str] = None, **kwargs) -> str: if log_path is None: log_path = f"/flow/{task_id}/output.log" """提交命令任务""" - task = CommandTask(task_id, command, log_path, shell, timeout, **kwargs) + task = CommandTask(task_id, command, log_path, shell, timeout, cmd_args=cmd_args, **kwargs) self.tasks[task_id] = task # 使用信号量限制并发 diff --git a/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py b/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py index 06b3b40a3..1a87d0744 100644 --- a/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py +++ b/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py @@ -6,9 +6,13 @@ async def submit(task_id, config_path): current_dir = os.path.dirname(__file__) + executor_script = os.path.join(current_dir, 'data_juicer_executor.py') - await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'data_juicer_executor.py')} " - f"--config_path={config_path}") + # Use argument list to avoid shell injection (CodeQL / FCE) + await cmd_scheduler.submit( + task_id, + cmd_args=["python", executor_script, f"--config_path={config_path}"] + ) def cancel(task_id): return cmd_scheduler.cancel_task(task_id) diff --git a/runtime/python-executor/datamate/wrappers/datamate_wrapper.py b/runtime/python-executor/datamate/wrappers/datamate_wrapper.py index ff5a3804b..c35c1fc51 100644 --- a/runtime/python-executor/datamate/wrappers/datamate_wrapper.py +++ b/runtime/python-executor/datamate/wrappers/datamate_wrapper.py @@ -8,10 +8,14 @@ async def submit(task_id, config_path, retry_count: int = 0): current_dir = os.path.dirname(__file__) + executor_script = os.path.join(current_dir, 'datamate_executor.py') if not is_k8s(): - await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'datamate_executor.py')} " - f"--config_path={config_path}") + # Use argument list to avoid shell injection (CodeQL / FCE) + await cmd_scheduler.submit( + task_id, + cmd_args=["python", executor_script, f"--config_path={config_path}"] + ) return script_path = os.path.join(current_dir, "datamate_executor.py")