Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 116 additions & 16 deletions src/exploit_iq_commons/utils/dep_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,20 @@
ROOT_LEVEL_SENTINEL = 'root-top-level-agent-morpheus'

TRANSITIVE_ENV_NAME = 'transitive_env'
INSTALLED_PACKAGES_FILE = 'installed_packages.txt'

PYPROJECT_TOML = 'pyproject.toml'
SETUP_PY = 'setup.py'
SETUP_CFG = 'setup.cfg'
PIPFILE = 'Pipfile'
UV_LOCK = 'uv.lock'
POETRY_LOCK = 'poetry.lock'
README_MD = 'README.md'

# Manifest formats tried in priority order when requirements.txt is absent.
# Each entry is the filename; the install strategy is determined in _install_from_best_manifest.
_PYTHON_MANIFEST_FALLBACK_ORDER = [UV_LOCK, POETRY_LOCK, PYPROJECT_TOML, SETUP_PY, SETUP_CFG, PIPFILE]

_WALK_EXCLUDE_DIRS = frozenset({
".venv",
"venv",
Expand Down Expand Up @@ -125,10 +134,9 @@ def detect_ecosystem(git_repo_path: Path) -> Ecosystem | None:
"""
if os.path.isfile(git_repo_path / GOLANG_MANIFEST):
return MANIFESTS_TO_ECOSYSTEMS[GOLANG_MANIFEST]
if (
os.path.isfile(git_repo_path / PYTHON_MANIFEST)
or os.path.isfile(git_repo_path / PYPROJECT_TOML)
or os.path.isfile(git_repo_path / SETUP_PY)
if any(
os.path.isfile(git_repo_path / m)
for m in (PYTHON_MANIFEST, PYPROJECT_TOML, SETUP_PY, SETUP_CFG, UV_LOCK, POETRY_LOCK, PIPFILE)
):
return MANIFESTS_TO_ECOSYSTEMS[PYTHON_MANIFEST]
if os.path.isfile(git_repo_path / JS_MANIFEST):
Expand Down Expand Up @@ -1154,17 +1162,43 @@ def build_tree(self, manifest_path: Path) -> defaultdict[Any, list]:
pass
parent_stack.append(package)

installed_dependencies = []
with open(manifest_path / PYTHON_MANIFEST, 'r') as manifest:
for line in manifest:
if line.strip() and not PythonLanguageFunctionsParser.is_comment_line(line):
installed_dependencies.append(re.split(r"[=>< ]", line.strip())[0])
installed_dependencies = self._get_direct_dependencies(manifest_path, dependencies or "")
for dependency, parents in tree.items():
if dependency in installed_dependencies:
parents.add(ROOT_PROJECT)
tree[dependency] = list(parents)
return tree

def _get_direct_dependencies(self, manifest_path: Path, deptree_output: str) -> list[str]:
"""Return the names of direct (top-level) dependencies for the project.

Prefers ``requirements.txt`` for an exact list. When ``requirements.txt``
is absent, falls back to the packages at indent-level 0 in the ``deptree``
output, which are the packages installed directly into the venv (i.e. the
effective direct dependencies regardless of the manifest format used).
"""
req_txt = manifest_path / PYTHON_MANIFEST
if req_txt.exists():
deps: list[str] = []
with open(req_txt, 'r') as f:
for line in f:
if line.strip() and not PythonLanguageFunctionsParser.is_comment_line(line):
name = re.split(r"[=>< ]", line.strip())[0].strip().lower().replace('-', '_')
if name:
deps.append(name)
return deps

# No requirements.txt: infer from deptree top-level entries (no leading spaces = level 0)
deps = []
for line in deptree_output.split(os.linesep):
if line and not line.startswith(' '):
name = re.split(r"[=>< #]", line.strip())[0].strip().lower().replace('-', '_')
if name and name != 'deptree':
deps.append(name)
if deps:
logger.debug("No requirements.txt; inferred direct deps from deptree top-level: %s", deps)
return deps

def extract_version_from_specifier(self, specifier_str: str) -> str | None:
"""Extract the most likely runtime Python version from a PEP 440 specifier string.

Expand Down Expand Up @@ -1474,14 +1508,22 @@ def _try_file(path: Path, extractor) -> str | None:
def install_dependencies(self, manifest_path: Path):
"""Install Python dependencies for the given repository into a virtual environment.

Calls :meth:`determine_python_version` to select the interpreter; when a
version is found ``uv venv`` is invoked with ``--python <version>``,
otherwise ``uv`` selects the default interpreter. Each line of
``requirements.txt`` is then installed via :meth:`install_dependency`.
Calls :meth:`determine_python_version` to select the interpreter. Creates
a ``transitive_env`` venv and installs packages from whichever manifest
format is present, trying formats in this priority order:

1. ``requirements.txt``: line-by-line install (original behaviour)
2. ``uv.lock`` or ``poetry.lock``: ``uv export | uv pip install -r -``
3. ``pyproject.toml``, ``setup.py``, or ``setup.cfg``: ``uv pip install .``
4. ``Pipfile``: ``pipenv requirements | uv pip install -r -``

After installation, writes ``installed_packages.txt`` containing a
freeze-format snapshot of every package in the venv so that Code
Keyword Search can answer "is package X installed?" without source
traversal.

Args:
manifest_path: Absolute path to the root of the cloned repository,
which is expected to contain a ``requirements.txt`` manifest.
manifest_path: Absolute path to the root of the cloned repository.
"""
python_version = self.determine_python_version(str(manifest_path))
if python_version:
Expand All @@ -1491,15 +1533,73 @@ def install_dependencies(self, manifest_path: Path):
logger.warning("Python version undetermined for %s; using uv default interpreter", manifest_path)
cmd = f"cd {manifest_path} && uv venv {TRANSITIVE_ENV_NAME}"
run_command(cmd)

venv_python = f"{manifest_path}/{TRANSITIVE_ENV_NAME}/bin/python"
site_packages = self._find_site_packages(manifest_path)
with open(manifest_path / PYTHON_MANIFEST, 'r') as manifest:

installed_via = self._install_from_best_manifest(manifest_path, venv_python, site_packages)
if installed_via:
logger.info("Installed Python dependencies via %s", installed_via)
else:
logger.warning("No supported Python manifest found in %s; transitive_env will be empty", manifest_path)

self._write_installed_packages(manifest_path)

def _install_from_best_manifest(self, manifest_path: Path, venv_python: str,
site_packages: Optional[Path]) -> Optional[str]:
"""Try each Python manifest format in priority order; return the format name on success."""
req_txt = manifest_path / PYTHON_MANIFEST
if req_txt.exists():
self._install_from_requirements_txt(req_txt, manifest_path, site_packages)
return PYTHON_MANIFEST

# Lock files: export to requirements format then pipe to uv pip install
for lock_file in (UV_LOCK, POETRY_LOCK):
if (manifest_path / lock_file).exists():
res = run_command(
f"cd {manifest_path} && uv export --format requirements-txt --no-dev 2>/dev/null"
f" | uv pip install -r - --python {venv_python}"
)
if res is not None:
return lock_file

# Project manifests: uv pip install . resolves and installs all declared deps
for manifest_name in (PYPROJECT_TOML, SETUP_PY, SETUP_CFG):
if (manifest_path / manifest_name).exists():
run_command(f"cd {manifest_path} && uv pip install . --python {venv_python}")
return manifest_name

# Pipfile: requires pipenv; skip silently if not available
if (manifest_path / PIPFILE).exists():
res = run_command(
f"cd {manifest_path} && pipenv requirements 2>/dev/null"
f" | uv pip install -r - --python {venv_python}"
)
if res is not None:
return PIPFILE

return None

def _install_from_requirements_txt(self, req_txt: Path, manifest_path: Path,
site_packages: Optional[Path]) -> None:
"""Install dependencies line-by-line from requirements.txt (original behaviour)."""
with open(req_txt, 'r') as manifest:
for line in tqdm(manifest):
if line.strip() and not PythonLanguageFunctionsParser.is_comment_line(line):
self.install_dependency(line, manifest_path)
if site_packages:
package_name = re.split(r'[=>< \n]', line.strip())[0]
self._fallback_if_stub_only(package_name, site_packages)

def _write_installed_packages(self, manifest_path: Path) -> None:
"""Write a freeze-format snapshot of the venv to installed_packages.txt."""
pip_freeze = run_command(f"{manifest_path}/{TRANSITIVE_ENV_NAME}/bin/pip list --format=freeze")
if pip_freeze:
(manifest_path / INSTALLED_PACKAGES_FILE).write_text(pip_freeze)
logger.info("Wrote installed packages snapshot to %s/%s", manifest_path, INSTALLED_PACKAGES_FILE)
else:
logger.warning("Could not generate installed packages list for %s", manifest_path)

def install_dependency(self, dependency, repo_path):
dependency = dependency.strip()
valid_signs = ['==', '>=', '<=', '!=']
Expand Down
70 changes: 68 additions & 2 deletions src/exploit_iq_commons/utils/source_code_git_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,26 @@
_credential_id_ctx,
fetch_and_decrypt_credential,
)
from exploit_iq_commons.utils.dep_tree import INSTALLED_PACKAGES_FILE, TRANSITIVE_ENV_NAME
from exploit_iq_commons.utils.transitive_code_searcher_tool import (
TransitiveCodeSearcher,
)

# Maximum number of .py files a site-packages package may contain before it
# is excluded from automatic indexing.
_SITE_PKG_MAX_PY_FILES: int = 150

# Directory-name suffixes/names to skip when scanning site-packages.
_SITE_PKG_SKIP_SUFFIXES: frozenset[str] = frozenset({".dist-info", ".egg-info"})
_SITE_PKG_SKIP_DIRS: frozenset[str] = frozenset({
"__pycache__",
"ansible_collections", # excluded: exceeds file-count threshold by a large margin
"tests",
"test",
"docs",
"doc",
})

PathLike = typing.Union[str, os.PathLike]


Expand Down Expand Up @@ -426,8 +442,18 @@ def yield_blobs(self) -> typing.Iterator[Blob]:
for exc in self.exclude or {}:
exclude_files = exclude_files.union(set(str(x.relative_to(base_path)) for x in base_path.glob(exc)))

# Filter out files that are not in the repo
# include_files = include_files.intersection(all_files_in_repo)
# Always include installed_packages.txt when present so that Code
# Keyword Search can answer "is package X installed?" for transitive deps.
installed_pkg_file = base_path / INSTALLED_PACKAGES_FILE
if installed_pkg_file.is_file():
include_files.add(INSTALLED_PACKAGES_FILE)
logger.debug("Including %s in document index", INSTALLED_PACKAGES_FILE)

# Include Python source from site-packages so that CCA and Code Keyword
# Search can trace transitive call chains across package boundaries.
# Packages exceeding _SITE_PKG_MAX_PY_FILES .py files and known noisy
# directories are excluded to bound indexing cost.
self._add_site_packages_blobs(base_path, include_files)

# Take the include files and remove the exclude files.
final_files = include_files - exclude_files
Expand All @@ -449,3 +475,43 @@ def yield_blobs(self) -> typing.Iterator[Blob]:
logger.warning("Failed to read blob for '%s'. Ignoring this file. Error: %s", abs_file_path, e)
else:
logger.debug("Skipping path as it is a directory, not a file: '%s'", abs_file_path)

@staticmethod
def _add_site_packages_blobs(base_path: Path, include_files: set[str]) -> None:
"""Add Python source files from transitive_env site-packages to include_files.

Only packages with at most ``_SITE_PKG_MAX_PY_FILES`` .py files are
indexed. Known heavy or noisy directories are skipped.
Files inside ``__pycache__`` sub-directories are always excluded.
"""
added_pkgs: list[str] = []
skipped_pkgs: list[str] = []

for sp_dir in base_path.glob(f"{TRANSITIVE_ENV_NAME}/lib/*/site-packages"):
if not sp_dir.is_dir():
continue
for pkg_dir in sp_dir.iterdir():
if not pkg_dir.is_dir():
continue
# Skip metadata directories and known noisy dirs
if any(pkg_dir.name.endswith(sfx) for sfx in _SITE_PKG_SKIP_SUFFIXES):
continue
if pkg_dir.name in _SITE_PKG_SKIP_DIRS:
continue
py_files = [
f for f in pkg_dir.rglob("*.py")
if "__pycache__" not in f.parts
]
if len(py_files) <= _SITE_PKG_MAX_PY_FILES:
for f in py_files:
include_files.add(str(f.relative_to(base_path)))
added_pkgs.append(pkg_dir.name)
else:
skipped_pkgs.append(f"{pkg_dir.name}({len(py_files)} files)")

if added_pkgs:
logger.info("Indexed %d site-packages package(s) for transitive analysis: %s",
len(added_pkgs), ", ".join(added_pkgs))
if skipped_pkgs:
logger.info("Skipped %d oversized site-packages package(s): %s",
len(skipped_pkgs), ", ".join(skipped_pkgs))
Loading