Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions runtime/python-executor/datamate/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import os
import re
import importlib
import sys
import uuid
Expand Down Expand Up @@ -129,6 +130,10 @@ def load_ops_module(self, op_name):
:param op_name: 算子名称
:return: 算子对象
'''
# Validate op_name to prevent path traversal / code injection (CodeQL / FCE)
if not op_name or not re.match(r'^[A-Za-z_][A-Za-z0-9_]*$', op_name):
raise ValueError(f"Invalid operator name: {op_name}")

parent_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ops")
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
Expand All @@ -137,6 +142,9 @@ def load_ops_module(self, op_name):
from core.base_op import OPERATORS as RELATIVE_OPERATORS
registry_content = RELATIVE_OPERATORS.modules.get(op_name)
if isinstance(registry_content, str):
# Validate registry_content is a safe dotted module path (no path separators)
if not re.match(r'^[A-Za-z_][A-Za-z0-9_.]*$', registry_content):
raise ValueError(f"Invalid module path for operator {op_name}: {registry_content}")
# registry_content是module的路径
submodule = importlib.import_module(registry_content)
res = getattr(submodule, op_name, None)
Expand Down
7 changes: 5 additions & 2 deletions runtime/python-executor/datamate/ops/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import importlib
import os
import re
import sys
from pathlib import Path

Expand All @@ -13,8 +14,10 @@
# 遍历子目录
for module_name in os.listdir(current_dir):
module_path = os.path.join(current_dir, module_name)
# 检查是否是目录且包含 __init__.py
if os.path.isdir(module_path) and '__init__.py' in os.listdir(module_path):
# 检查是否是目录且包含 __init__.py,且 module_name 为合法的 Python 标识符
if (os.path.isdir(module_path)
and '__init__.py' in os.listdir(module_path)
and re.match(r'^[A-Za-z_][A-Za-z0-9_]*$', module_name)):
# 动态导入模块
try:
importlib.import_module(f".{module_name}", package=__name__)
Expand Down
26 changes: 17 additions & 9 deletions runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
class CommandTask(Task):
"""命令任务包装类"""

def __init__(self, task_id: str, command: str, log_path = None, shell: bool = True,
timeout: Optional[int] = None, *args, **kwargs):
def __init__(self, task_id: str, command: str = None, log_path = None, shell: bool = True,
timeout: Optional[int] = None, cmd_args: List[str] = None, *args, **kwargs):
super().__init__(task_id, *args, **kwargs)
self.max_backups = 9
self.log_path = log_path
self.command = command
self.shell = shell
self.cmd_args = cmd_args
self.shell = shell if cmd_args is None else False
self.timeout = timeout
self.return_code = None
self._process = None
Expand All @@ -44,8 +45,15 @@ async def _execute(self):
current_log_path = f"{self.log_path}.{counter}"

with open(current_log_path, 'a') as f:
# 使用 asyncio.create_subprocess_shell 或 create_subprocess_exec
if self.shell:
if self.cmd_args is not None:
# Safe: use argument list with create_subprocess_exec (no shell injection)
process = await asyncio.create_subprocess_exec(
*self.cmd_args,
stdout=f,
stderr=asyncio.subprocess.STDOUT,
**self.kwargs
)
elif self.shell:
process = await asyncio.create_subprocess_shell(
self.command,
stdout=f,
Expand Down Expand Up @@ -132,7 +140,7 @@ def cancel(self) -> bool:
def to_result(self) -> TaskResult:
"""转换为结果对象"""
self.result = {
"command": self.command,
"command": self.command or self.cmd_args,
"return_code": self.return_code,
}
return super().to_result()
Expand All @@ -144,13 +152,13 @@ class CommandScheduler(TaskScheduler):
def __init__(self, max_concurrent: int = 5):
super().__init__(max_concurrent)

async def submit(self, task_id, command: str, log_path = None, shell: bool = True,
timeout: Optional[int] = None, **kwargs) -> str:
async def submit(self, task_id, command: str = None, log_path = None, shell: bool = True,
timeout: Optional[int] = None, cmd_args: List[str] = None, **kwargs) -> str:
if log_path is None:
log_path = f"/flow/{task_id}/output.log"

"""提交命令任务"""
task = CommandTask(task_id, command, log_path, shell, timeout, **kwargs)
task = CommandTask(task_id, command, log_path, shell, timeout, cmd_args=cmd_args, **kwargs)
self.tasks[task_id] = task

# 使用信号量限制并发
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

async def submit(task_id, config_path):
current_dir = os.path.dirname(__file__)
executor_script = os.path.join(current_dir, 'data_juicer_executor.py')

await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'data_juicer_executor.py')} "
f"--config_path={config_path}")
# Use argument list to avoid shell injection (CodeQL / FCE)
await cmd_scheduler.submit(
task_id,
cmd_args=["python", executor_script, f"--config_path={config_path}"]
)

def cancel(task_id):
return cmd_scheduler.cancel_task(task_id)
8 changes: 6 additions & 2 deletions runtime/python-executor/datamate/wrappers/datamate_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@

async def submit(task_id, config_path, retry_count: int = 0):
current_dir = os.path.dirname(__file__)
executor_script = os.path.join(current_dir, 'datamate_executor.py')

if not is_k8s():
await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'datamate_executor.py')} "
f"--config_path={config_path}")
# Use argument list to avoid shell injection (CodeQL / FCE)
await cmd_scheduler.submit(
task_id,
cmd_args=["python", executor_script, f"--config_path={config_path}"]
)
return

script_path = os.path.join(current_dir, "datamate_executor.py")
Expand Down
Loading