diff --git a/src/fosslight_util/_get_downloadable_url.py b/src/fosslight_util/_get_downloadable_url.py index 78c599a..7c65521 100755 --- a/src/fosslight_util/_get_downloadable_url.py +++ b/src/fosslight_util/_get_downloadable_url.py @@ -48,6 +48,10 @@ def _version_tokens_for_match(checkout_version: str) -> list[str]: ("testing", "(testing)"), ("unstable", "(unstable)"), ) +_DEBIAN_PACKAGE_HEADING_VERSION_RE = re.compile( + r"^Package:\s+.+?\s+\(([^()]+)\)\s*$", + re.IGNORECASE, +) def _collect_debian_suite_package_urls(search_soup) -> list[tuple[str, str]]: @@ -94,14 +98,24 @@ def _normalize_debian_pool_download_from_tarball_hrefs(source_links: list[str]) return "" +def _extract_debian_package_heading_version(package_soup) -> str: + for heading in package_soup.find_all("h1"): + heading_text = heading.get_text(" ", strip=True) + matched = _DEBIAN_PACKAGE_HEADING_VERSION_RE.match(heading_text) + if matched: + return matched.group(1).strip() + return "" + + def _resolve_debian_package_page_to_pool_tarball( package_url: str, checkout_version: str -) -> str: - """Fetch one packages.debian.org package page and return a pool tarball URL or ``""``.""" +) -> tuple[str, str]: + """Fetch one package page and return ``(pool_tarball_url, matched_version)``.""" r = requests.get(package_url, timeout=10) if r.status_code != 200: - return "" + return "", "" package_soup = BeautifulSoup(r.text, "html.parser") + package_version = _extract_debian_package_heading_version(package_soup) source_links = [] for a in package_soup.find_all("a", href=True): @@ -114,10 +128,18 @@ def _resolve_debian_package_page_to_pool_tarball( source_links.append(href) if not source_links: - return "" + return "", "" version_tokens = _version_tokens_for_match(checkout_version) if version_tokens: + if package_version and any( + token.lower() in package_version.lower() for token in version_tokens + ): + return ( + _normalize_debian_pool_download_from_tarball_hrefs(source_links), + package_version, + ) + version_matched = [] for href in source_links: low = href.lower() @@ -125,15 +147,18 @@ def _resolve_debian_package_page_to_pool_tarball( version_matched.append(href) source_links = version_matched if not source_links: - return "" + return "", "" - return _normalize_debian_pool_download_from_tarball_hrefs(source_links) + resolved_link = _normalize_debian_pool_download_from_tarball_hrefs(source_links) + if not resolved_link: + return "", "" + return resolved_link, package_version def _resolve_debian_search_to_source_tarball( search_url: str, checkout_version: str = "" -) -> str: - """Resolve Debian search URL to a pool tarball URL when possible. +) -> tuple[str, str]: + """Resolve Debian search URL to ``(pool_tarball_url, matched_version)``. Walks package pages for **oldoldstable**, **oldstable**, **stable**, **testing**, and **unstable** when those hits appear on the search results, so a binary @@ -145,7 +170,7 @@ def _resolve_debian_search_to_source_tarball( try: r = requests.get(search_url, timeout=10) if r.status_code != 200: - return "" + return "", "" search_soup = BeautifulSoup(r.text, "html.parser") pairs = _collect_debian_suite_package_urls(search_soup) @@ -160,7 +185,7 @@ def _resolve_debian_search_to_source_tarball( else: fb = _fallback_any_package_page_url(search_soup) if not fb: - return "" + return "", "" visit = [fb] seen: set[str] = set() @@ -168,14 +193,14 @@ def _resolve_debian_search_to_source_tarball( if not package_url or package_url in seen: continue seen.add(package_url) - got = _resolve_debian_package_page_to_pool_tarball( + got, matched_version = _resolve_debian_package_page_to_pool_tarball( package_url, checkout_version ) if got: - return got + return got, matched_version except Exception as e: logger.info(f"Failed to resolve Debian search URL {search_url}: {e}") - return "" + return "", "" def version_exists(pkg_type, origin_name, version): @@ -499,9 +524,11 @@ def get_downloadable_url(link, checkout_version): result_link = link if link.startswith("https://packages.debian.org/search?"): - resolved = _resolve_debian_search_to_source_tarball(link, checkout_version) + resolved, debian_version = _resolve_debian_search_to_source_tarball( + link, checkout_version + ) if resolved: - return True, resolved, "", "", "deb" + return True, resolved, "", debian_version, "deb" oss_name, oss_version, new_link, pkg_type = extract_name_version_from_link(link, checkout_version) new_link = new_link.replace('http://', '') diff --git a/src/fosslight_util/download.py b/src/fosslight_util/download.py index a645a89..1e8d892 100755 --- a/src/fosslight_util/download.py +++ b/src/fosslight_util/download.py @@ -177,6 +177,7 @@ def cli_download_and_extract(link: str, target_dir: str, log_dir: str, checkout_ msg_wget = "" oss_name = "" oss_version = "" + downloaded_link = "" log_file_name = "fosslight_download_" + \ datetime.now().strftime('%Y%m%d_%H-%M-%S')+".txt" logger, log_item = init_log(os.path.join(log_dir, log_file_name)) @@ -205,18 +206,24 @@ def cli_download_and_extract(link: str, target_dir: str, log_dir: str, checkout_ link, target_dir, checkout_to, tag, branch, ssh_key, id, git_token, called_cli) link = change_ssh_link_to_https(link) + if success_git: + downloaded_link = link if (not is_rubygems) and (not success_git): if os.path.isfile(target_dir): shutil.rmtree(target_dir) - success, downloaded_file, msg_wget, oss_name, oss_version = download_wget( + success, downloaded_file, msg_wget, oss_name, oss_version, resolved_link = download_wget( link, target_dir, compressed_only, checkout_to ) if success and downloaded_file: success = extract_compressed_file(downloaded_file, target_dir, True, compressed_only) + if success: + downloaded_link = resolved_link # Download from rubygems.org elif is_rubygems and shutil.which("gem"): success = gem_download(link, target_dir, checkout_to) + if success: + downloaded_link = link if msg: msg = f'git fail: {msg}' if is_rubygems: @@ -234,9 +241,11 @@ def cli_download_and_extract(link: str, target_dir: str, log_dir: str, checkout_ msg = str(error) clarified_version = clarified_version_from_oss_version(oss_version) + output_link = downloaded_link if success else "" output_result = { "success": success, "message": msg, + "link": output_link, "oss_name": oss_name, "oss_version": oss_version, "clarified_version": clarified_version, @@ -357,32 +366,38 @@ def get_remote_refs(git_url: str): ) +def _strip_debian_epoch_prefix(s: str) -> str: + if re.match(r'^\d+:', s): + return s.split(':', 1)[1] + return s + + def clarified_version_from_oss_version(oss_version: str) -> str: """Extract major, major.minor, or major.minor.patch from oss_version/ref string.""" s = (oss_version or "").strip() if not s: return "" - core = _strip_leading_v_prefix(s) + core = _strip_leading_v_prefix(_strip_debian_epoch_prefix(s)) if _PURE_DOT_NUMERIC_VERSION.match(core): return core - m = _BASE_SEMVER_FOR_CHECKOUT.match(s) + m = _BASE_SEMVER_FOR_CHECKOUT.match(core) if m: if m.group(3): return f"{m.group(1)}.{m.group(2)}.{m.group(3)}" return f"{m.group(1)}.{m.group(2)}" - m = _CLARIFIED_MAJOR_ONLY_FULL.match(s) + m = _CLARIFIED_MAJOR_ONLY_FULL.match(core) if m: return m.group(1) - m = _SEMVER_IN_REF.search(s) or _SEMVER_AT_REF_START.match(s) + m = _SEMVER_IN_REF.search(core) or _SEMVER_AT_REF_START.match(core) if m: return f"{m.group(1)}.{m.group(2)}.{m.group(3)}" - m = _SEMVER_DOT_QUALIFIER_IN_STR.search(s) + m = _SEMVER_DOT_QUALIFIER_IN_STR.search(core) if m: return f"{m.group(1)}.{m.group(2)}.{m.group(3)}" - m = _CLARIFIED_TWO_IN_STR.search(s) + m = _CLARIFIED_TWO_IN_STR.search(core) if m: return f"{m.group(1)}.{m.group(2)}" - m = _CLARIFIED_MAJOR_IN_STR.search(s) + m = _CLARIFIED_MAJOR_IN_STR.search(core) if m: return m.group(1) return "" @@ -736,6 +751,7 @@ def download_wget(link, target_dir, compressed_only, checkout_to): oss_name = "" oss_version = "" downloaded_file = "" + resolved_link = "" try: if platform.system() != "Windows": @@ -750,6 +766,7 @@ def download_wget(link, target_dir, compressed_only, checkout_to): ret, new_link, oss_name, oss_version, pkg_type = get_downloadable_url(link, checkout_to) if ret and new_link: link = new_link + resolved_link = link if compressed_only: # Check if link ends with known compression extensions @@ -806,7 +823,7 @@ def download_wget(link, target_dir, compressed_only, checkout_to): msg = str(error) logger.warning(f"wget - failed: {error}") - return success, downloaded_file, msg, oss_name, oss_version + return success, downloaded_file, msg, oss_name, oss_version, resolved_link def _download_file_once(url, target_dir, request_headers=None): diff --git a/tests/test_download_version_hint.py b/tests/test_download_version_hint.py index 4c51000..10c0809 100644 --- a/tests/test_download_version_hint.py +++ b/tests/test_download_version_hint.py @@ -2,12 +2,18 @@ # SPDX-License-Identifier: Apache-2.0 """Tests for wget-path oss_version / clarified_version hints from URL and filename.""" +import json +import logging + import pytest +from fosslight_util import download as download_module from fosslight_util.download import ( + cli_download_and_extract, clarified_version_from_oss_version, _oss_version_hint_from_wget_link, ) +from fosslight_util import _get_downloadable_url as downloadable_url @pytest.mark.parametrize( @@ -90,6 +96,8 @@ def test_oss_version_hint_from_wget_link(link, downloaded_file, expected_hint): ("1.1.7.7", "1.1.7.7"), ("v1.1.7.7", "1.1.7.7"), ("v3.28.3", "3.28.3"), + ("4:10.2.1-1", "10.2.1"), + ("1:3.118+deb11u1", "3.118"), ], ) def test_clarified_follows_hint_for_semver(hint, expected_clarified): @@ -108,3 +116,197 @@ def test_mvnrepository_url_hint_then_clarified(): hint = _oss_version_hint_from_wget_link(link, "") assert hint == "1.1.7.7" assert clarified_version_from_oss_version(hint) == "1.1.7.7" + + +class _FakeResponse: + def __init__(self, text, status_code=200): + self.text = text + self.status_code = status_code + + +def test_debian_package_heading_version_matches_checkout(monkeypatch): + package_html = """ + +
+