Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions src/fosslight_util/_get_downloadable_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ def _version_tokens_for_match(checkout_version: str) -> list[str]:
("testing", "(testing)"),
("unstable", "(unstable)"),
)
_DEBIAN_PACKAGE_HEADING_VERSION_RE = re.compile(
r"^Package:\s+.+?\s+\(([^()]+)\)\s*$",
re.IGNORECASE,
)


def _collect_debian_suite_package_urls(search_soup) -> list[tuple[str, str]]:
Expand Down Expand Up @@ -94,14 +98,24 @@ def _normalize_debian_pool_download_from_tarball_hrefs(source_links: list[str])
return ""


def _extract_debian_package_heading_version(package_soup) -> str:
for heading in package_soup.find_all("h1"):
heading_text = heading.get_text(" ", strip=True)
matched = _DEBIAN_PACKAGE_HEADING_VERSION_RE.match(heading_text)
if matched:
return matched.group(1).strip()
return ""


def _resolve_debian_package_page_to_pool_tarball(
package_url: str, checkout_version: str
) -> str:
"""Fetch one packages.debian.org package page and return a pool tarball URL or ``""``."""
) -> tuple[str, str]:
"""Fetch one package page and return ``(pool_tarball_url, matched_version)``."""
r = requests.get(package_url, timeout=10)
if r.status_code != 200:
return ""
return "", ""
package_soup = BeautifulSoup(r.text, "html.parser")
package_version = _extract_debian_package_heading_version(package_soup)

source_links = []
for a in package_soup.find_all("a", href=True):
Expand All @@ -114,26 +128,37 @@ def _resolve_debian_package_page_to_pool_tarball(
source_links.append(href)

if not source_links:
return ""
return "", ""

version_tokens = _version_tokens_for_match(checkout_version)
if version_tokens:
if package_version and any(
token.lower() in package_version.lower() for token in version_tokens
):
return (
_normalize_debian_pool_download_from_tarball_hrefs(source_links),
package_version,
)

version_matched = []
for href in source_links:
low = href.lower()
if any(token.lower() in low for token in version_tokens):
version_matched.append(href)
source_links = version_matched
if not source_links:
return ""
return "", ""

return _normalize_debian_pool_download_from_tarball_hrefs(source_links)
resolved_link = _normalize_debian_pool_download_from_tarball_hrefs(source_links)
if not resolved_link:
return "", ""
return resolved_link, package_version


def _resolve_debian_search_to_source_tarball(
search_url: str, checkout_version: str = ""
) -> str:
"""Resolve Debian search URL to a pool tarball URL when possible.
) -> tuple[str, str]:
"""Resolve Debian search URL to ``(pool_tarball_url, matched_version)``.

Walks package pages for **oldoldstable**, **oldstable**, **stable**, **testing**,
and **unstable** when those hits appear on the search results, so a binary
Expand All @@ -145,7 +170,7 @@ def _resolve_debian_search_to_source_tarball(
try:
r = requests.get(search_url, timeout=10)
if r.status_code != 200:
return ""
return "", ""
search_soup = BeautifulSoup(r.text, "html.parser")

pairs = _collect_debian_suite_package_urls(search_soup)
Expand All @@ -160,22 +185,22 @@ def _resolve_debian_search_to_source_tarball(
else:
fb = _fallback_any_package_page_url(search_soup)
if not fb:
return ""
return "", ""
visit = [fb]

seen: set[str] = set()
for package_url in visit:
if not package_url or package_url in seen:
continue
seen.add(package_url)
got = _resolve_debian_package_page_to_pool_tarball(
got, matched_version = _resolve_debian_package_page_to_pool_tarball(
package_url, checkout_version
)
if got:
return got
return got, matched_version
except Exception as e:
logger.info(f"Failed to resolve Debian search URL {search_url}: {e}")
return ""
return "", ""


def version_exists(pkg_type, origin_name, version):
Expand Down Expand Up @@ -499,9 +524,11 @@ def get_downloadable_url(link, checkout_version):
result_link = link

if link.startswith("https://packages.debian.org/search?"):
resolved = _resolve_debian_search_to_source_tarball(link, checkout_version)
resolved, debian_version = _resolve_debian_search_to_source_tarball(
link, checkout_version
)
if resolved:
return True, resolved, "", "", "deb"
return True, resolved, "", debian_version, "deb"

oss_name, oss_version, new_link, pkg_type = extract_name_version_from_link(link, checkout_version)
new_link = new_link.replace('http://', '')
Expand Down
35 changes: 26 additions & 9 deletions src/fosslight_util/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def cli_download_and_extract(link: str, target_dir: str, log_dir: str, checkout_
msg_wget = ""
oss_name = ""
oss_version = ""
downloaded_link = ""
log_file_name = "fosslight_download_" + \
datetime.now().strftime('%Y%m%d_%H-%M-%S')+".txt"
logger, log_item = init_log(os.path.join(log_dir, log_file_name))
Expand Down Expand Up @@ -205,18 +206,24 @@ def cli_download_and_extract(link: str, target_dir: str, log_dir: str, checkout_
link, target_dir, checkout_to, tag, branch,
ssh_key, id, git_token, called_cli)
link = change_ssh_link_to_https(link)
if success_git:
downloaded_link = link
if (not is_rubygems) and (not success_git):
Comment thread
soimkim marked this conversation as resolved.
if os.path.isfile(target_dir):
shutil.rmtree(target_dir)

success, downloaded_file, msg_wget, oss_name, oss_version = download_wget(
success, downloaded_file, msg_wget, oss_name, oss_version, resolved_link = download_wget(
link, target_dir, compressed_only, checkout_to
)
if success and downloaded_file:
success = extract_compressed_file(downloaded_file, target_dir, True, compressed_only)
if success:
downloaded_link = resolved_link
# Download from rubygems.org
elif is_rubygems and shutil.which("gem"):
success = gem_download(link, target_dir, checkout_to)
if success:
downloaded_link = link
if msg:
msg = f'git fail: {msg}'
if is_rubygems:
Expand All @@ -234,9 +241,11 @@ def cli_download_and_extract(link: str, target_dir: str, log_dir: str, checkout_
msg = str(error)

clarified_version = clarified_version_from_oss_version(oss_version)
output_link = downloaded_link if success else ""
output_result = {
"success": success,
"message": msg,
"link": output_link,
"oss_name": oss_name,
"oss_version": oss_version,
"clarified_version": clarified_version,
Expand Down Expand Up @@ -357,32 +366,38 @@ def get_remote_refs(git_url: str):
)


def _strip_debian_epoch_prefix(s: str) -> str:
if re.match(r'^\d+:', s):
return s.split(':', 1)[1]
return s


def clarified_version_from_oss_version(oss_version: str) -> str:
"""Extract major, major.minor, or major.minor.patch from oss_version/ref string."""
s = (oss_version or "").strip()
if not s:
return ""
core = _strip_leading_v_prefix(s)
core = _strip_leading_v_prefix(_strip_debian_epoch_prefix(s))
if _PURE_DOT_NUMERIC_VERSION.match(core):
return core
m = _BASE_SEMVER_FOR_CHECKOUT.match(s)
m = _BASE_SEMVER_FOR_CHECKOUT.match(core)
if m:
if m.group(3):
return f"{m.group(1)}.{m.group(2)}.{m.group(3)}"
return f"{m.group(1)}.{m.group(2)}"
m = _CLARIFIED_MAJOR_ONLY_FULL.match(s)
m = _CLARIFIED_MAJOR_ONLY_FULL.match(core)
if m:
return m.group(1)
m = _SEMVER_IN_REF.search(s) or _SEMVER_AT_REF_START.match(s)
m = _SEMVER_IN_REF.search(core) or _SEMVER_AT_REF_START.match(core)
if m:
return f"{m.group(1)}.{m.group(2)}.{m.group(3)}"
m = _SEMVER_DOT_QUALIFIER_IN_STR.search(s)
m = _SEMVER_DOT_QUALIFIER_IN_STR.search(core)
if m:
return f"{m.group(1)}.{m.group(2)}.{m.group(3)}"
m = _CLARIFIED_TWO_IN_STR.search(s)
m = _CLARIFIED_TWO_IN_STR.search(core)
if m:
return f"{m.group(1)}.{m.group(2)}"
m = _CLARIFIED_MAJOR_IN_STR.search(s)
m = _CLARIFIED_MAJOR_IN_STR.search(core)
if m:
return m.group(1)
return ""
Expand Down Expand Up @@ -736,6 +751,7 @@ def download_wget(link, target_dir, compressed_only, checkout_to):
oss_name = ""
oss_version = ""
downloaded_file = ""
resolved_link = ""

try:
if platform.system() != "Windows":
Expand All @@ -750,6 +766,7 @@ def download_wget(link, target_dir, compressed_only, checkout_to):
ret, new_link, oss_name, oss_version, pkg_type = get_downloadable_url(link, checkout_to)
if ret and new_link:
link = new_link
resolved_link = link

if compressed_only:
# Check if link ends with known compression extensions
Expand Down Expand Up @@ -806,7 +823,7 @@ def download_wget(link, target_dir, compressed_only, checkout_to):
msg = str(error)
logger.warning(f"wget - failed: {error}")

return success, downloaded_file, msg, oss_name, oss_version
return success, downloaded_file, msg, oss_name, oss_version, resolved_link


def _download_file_once(url, target_dir, request_headers=None):
Expand Down
Loading
Loading