diff --git a/src/fosslight_source/_help.py b/src/fosslight_source/_help.py index 4883923..cf20cbe 100644 --- a/src/fosslight_source/_help.py +++ b/src/fosslight_source/_help.py @@ -42,6 +42,8 @@ --no_correction Skip OSS information correction with sbom-info.yaml --correct_fpath Path to custom sbom-info.yaml file --hide_progress Hide the progress bar during scanning + --kb_url KB API URL (priority: parameter > KB_URL env > default) + --kb_token KB bearer token (priority: parameter > KB_TOKEN env) 💡 Examples ──────────────────────────────────────────────────────────────────── diff --git a/src/fosslight_source/_scan_item.py b/src/fosslight_source/_scan_item.py index b0a742d..b3d9cd9 100644 --- a/src/fosslight_source/_scan_item.py +++ b/src/fosslight_source/_scan_item.py @@ -32,7 +32,17 @@ MAX_LICENSE_LENGTH = 200 MAX_LICENSE_TOTAL_LENGTH = 600 SUBSTRING_LICENSE_COMMENT = "Maximum character limit (License)" -KB_URL = "http://fosslight-kb.lge.com/" +DEFAULT_KB_URL = "http://fosslight-kb.lge.com/" + + +def resolve_kb_config(kb_url: str = "", kb_token: str = "") -> tuple[str, str]: + url = (kb_url or os.environ.get("KB_URL", DEFAULT_KB_URL)).strip() or DEFAULT_KB_URL + + token = (kb_token or "").strip() + if not token: + token = (os.environ.get("KB_TOKEN") or "").strip() + + return f"{url.rstrip('/')}/", token class SourceItem(FileItem): @@ -114,15 +124,21 @@ def _get_hash(self, path_to_scan: str = "") -> tuple: logger.debug(f"Failed to compute MD5 for {self.source_name_or_path}: {e}") return md5_hex, wfp - def _get_origin_url_from_md5_hash(self, md5_hash: str, wfp: str = "") -> str: + def _get_origin_url_from_md5_hash( + self, md5_hash: str, wfp: str = "", kb_url: str = DEFAULT_KB_URL, kb_token: str = "" + ) -> str: """Return origin_url from KB API.""" try: payload = {"file_hash": md5_hash} if wfp and wfp.strip(): payload["wfp_base64"] = base64.b64encode(wfp.strip().encode("utf-8")).decode("ascii") - request = urllib.request.Request(f"{KB_URL}query", data=json.dumps(payload).encode('utf-8'), method='POST') + request = urllib.request.Request( + f"{kb_url}query", data=json.dumps(payload).encode('utf-8'), method='POST' + ) request.add_header('Accept', 'application/json') request.add_header('Content-Type', 'application/json') + if kb_token: + request.add_header('Authorization', f'Bearer {kb_token}') with urllib.request.urlopen(request, timeout=10) as response: data = json.loads(response.read().decode()) @@ -179,7 +195,9 @@ def _extract_oss_info_from_url(self, url: str) -> tuple: logger.debug(f"Failed to extract OSS info from URL {url}: {e}") return "", "", "" - def set_oss_item(self, path_to_scan: str = "", run_kb: bool = False) -> None: + def set_oss_item( + self, path_to_scan: str = "", run_kb: bool = False, kb_url: str = DEFAULT_KB_URL, kb_token: str = "" + ) -> None: self.oss_items = [] if self.download_location: for url in self.download_location: @@ -192,7 +210,7 @@ def set_oss_item(self, path_to_scan: str = "", run_kb: bool = False) -> None: if run_kb and not self.is_license_text: md5_hash, wfp = self._get_hash(path_to_scan) if md5_hash: - origin_url = self._get_origin_url_from_md5_hash(md5_hash, wfp) + origin_url = self._get_origin_url_from_md5_hash(md5_hash, wfp, kb_url, kb_token) if origin_url: self.kb_origin_url = origin_url self.kb_evidence = "exact_match" diff --git a/src/fosslight_source/cli.py b/src/fosslight_source/cli.py index 5318667..e88e18f 100755 --- a/src/fosslight_source/cli.py +++ b/src/fosslight_source/cli.py @@ -28,7 +28,7 @@ import argparse from .run_spdx_extractor import get_spdx_downloads from .run_manifest_extractor import get_manifest_licenses -from ._scan_item import SourceItem, KB_URL +from ._scan_item import SourceItem, resolve_kb_config from fosslight_util.oss_item import ScannerItem from typing import Tuple from ._scan_item import is_manifest_file @@ -84,6 +84,8 @@ def main() -> None: parser.add_argument('--no_correction', action='store_true', required=False) parser.add_argument('--correct_fpath', nargs=1, type=str, required=False) parser.add_argument('--hide_progress', action='store_true', required=False) + parser.add_argument('--kb_url', type=str, required=False, default="") + parser.add_argument('--kb_token', type=str, required=False, default="") args = parser.parse_args() @@ -112,6 +114,8 @@ def main() -> None: if args.correct_fpath: correct_filepath = ''.join(args.correct_fpath) hide_progress = args.hide_progress + kb_url = args.kb_url + kb_token = args.kb_token time_out = args.timeout core = args.cores @@ -120,7 +124,8 @@ def main() -> None: result = [] result = run_scanners(path_to_scan, output_file_name, write_json_file, core, True, print_matched_text, formats, time_out, correct_mode, correct_filepath, - selected_scanner, path_to_exclude, hide_progress=hide_progress) + selected_scanner, path_to_exclude, hide_progress=hide_progress, + kb_url=kb_url, kb_token=kb_token) _result_log["Scan Result"] = result[1] @@ -268,10 +273,12 @@ def create_report_file( return scan_item -def check_kb_server_reachable() -> bool: +def check_kb_server_reachable(kb_url: str, kb_token: str = "") -> bool: for attempt in range(3): try: - request = urllib.request.Request(f"{KB_URL}health", method='GET') + request = urllib.request.Request(f"{kb_url}health", method='GET') + if kb_token: + request.add_header('Authorization', f'Bearer {kb_token}') with urllib.request.urlopen(request, timeout=10) as response: logger.debug(f"KB server is reachable. Response status: {response.status}") return True @@ -326,7 +333,7 @@ def mark_oss_info_correction_files_as_excluded(scan_results: list) -> None: def merge_results( scancode_result: list = [], scanoss_result: list = [], spdx_downloads: dict = {}, path_to_scan: str = "", run_kb: bool = False, manifest_licenses: dict = {}, - excluded_files: set = None, hide_progress: bool = False + excluded_files: set = None, hide_progress: bool = False, kb_url: str = "", kb_token: str = "" ) -> list: """ @@ -337,6 +344,8 @@ def merge_results( :param path_to_scan: path to the scanned directory for constructing absolute file paths. :param run_kb: if True, load kb result. :param excluded_files: set of relative paths to exclude from KB-only file discovery. + :param kb_url: KB API base URL. + :param kb_token: KB API bearer token. :return merged_result: list of merged result in SourceItem. """ if excluded_files is None: @@ -373,7 +382,7 @@ def merge_results( scancode_result.append(new_result_item) for item in scancode_result: - item.set_oss_item(path_to_scan, run_kb) + item.set_oss_item(path_to_scan, run_kb, kb_url, kb_token) # Add OSSItem for files in path_to_scan that are not in scancode_result # when KB returns an origin URL for their MD5 hash (skip excluded_files) @@ -392,7 +401,7 @@ def merge_results( if rel_path in scancode_paths or rel_path in excluded_files: continue extra_item = SourceItem(rel_path) - extra_item.set_oss_item(path_to_scan, run_kb) + extra_item.set_oss_item(path_to_scan, run_kb, kb_url, kb_token) if extra_item.download_location: scancode_result.append(extra_item) scancode_paths.add(rel_path) @@ -407,7 +416,7 @@ def run_scanners( formats: list = [], time_out: int = 120, correct_mode: bool = True, correct_filepath: str = "", selected_scanner: str = ALL_MODE, path_to_exclude: list = [], - all_exclude_mode: tuple = (), hide_progress: bool = False + all_exclude_mode: tuple = (), hide_progress: bool = False, kb_url: str = "", kb_token: str = "" ) -> Tuple[bool, str, 'ScannerItem', list, list]: """ Run Scancode and scanoss.py for the given path. @@ -419,6 +428,8 @@ def run_scanners( :param called_by_cli: if not called by cli, initialize logger. :param print_matched_text: if requested, output matched text (only for scancode). :param format: output format (excel, csv, opossum). + :param kb_url: KB API base URL. If empty, read KB_URL environment variable, then use default. + :param kb_token: KB API bearer token. If empty, read KB_TOKEN environment variable. :return success: success or failure of scancode. :return result_log["Scan Result"]: :return merged_result: merged scan result of scancode and scanoss. @@ -435,6 +446,7 @@ def run_scanners( result_log = {} scan_item = [] api_limit_exceed = False + kb_url, kb_token = resolve_kb_config(kb_url, kb_token) success, msg, output_path, output_files, output_extensions, formats = check_output_formats_v2(output_file_name, formats) @@ -485,15 +497,16 @@ def run_scanners( if selected_scanner in SCANNER_TYPE: run_kb = True if selected_scanner in ['kb', ALL_MODE] else False if run_kb: - if not check_kb_server_reachable(): + if not check_kb_server_reachable(kb_url, kb_token): run_kb = False - run_kb_msg = "KB Unreachable" + run_kb_msg = f"KB({kb_url}) Unreachable" else: - run_kb_msg = "KB Enabled" + run_kb_msg = f"KB({kb_url}) Enabled" spdx_downloads, manifest_licenses = metadata_collector(path_to_scan, excluded_files) merged_result = merge_results(scancode_result, scanoss_result, spdx_downloads, - path_to_scan, run_kb, manifest_licenses, excluded_files, hide_progress) + path_to_scan, run_kb, manifest_licenses, excluded_files, + hide_progress, kb_url, kb_token) mark_oss_info_correction_files_as_excluded(merged_result) scan_item = create_report_file(start_time, merged_result, license_list, scanoss_result, selected_scanner, print_matched_text, output_path, output_files, output_extensions, correct_mode,