diff --git a/src/pyspector/reporting.py b/src/pyspector/reporting.py index 56b50fa..fb355ee 100644 --- a/src/pyspector/reporting.py +++ b/src/pyspector/reporting.py @@ -1,9 +1,70 @@ import json import html as html_module -# Added 'Region' to imports for better SARIF compliance -from sarif_om import SarifLog, Tool, Run, ReportingDescriptor, Result, ArtifactLocation, Location, PhysicalLocation, Region -# Removed 'asdict' from imports as it is not needed for sarif_om -from dataclasses import asdict, is_dataclass +import importlib.metadata + +from sarif_om import ( + SarifLog, + Tool, + ToolComponent, + Run, + ReportingDescriptor, + ReportingConfiguration, + MultiformatMessageString, + Result, + ArtifactLocation, + Location, + PhysicalLocation, + Region, + Message, +) + + +# Maps internal severity levels to SARIF-compliant level strings. +_SEVERITY_TO_SARIF_LEVEL = { + "CRITICAL": "error", + "HIGH": "error", + "MEDIUM": "warning", + "LOW": "note", +} + + +def _get_version(): + """Return installed PySpector version dynamically.""" + try: + return importlib.metadata.version("pyspector") + except importlib.metadata.PackageNotFoundError: + return "dev" + + +_PYSPECTOR_VERSION = _get_version() + + +def _severity_key(issue) -> str: + """Normalize enum-like severity values.""" + return str(issue.severity).split(".")[-1].upper() + + +def _clean(obj): + + if isinstance(obj, list): + return [_clean(item) for item in obj] + + if isinstance(obj, dict): + return { + k: _clean(v) + for k, v in obj.items() + if v is not None + } + + if hasattr(obj, "__dict__"): + return { + k: _clean(v) + for k, v in obj.__dict__.items() + if v is not None + } + + return obj + class Reporter: def __init__(self, issues: list, report_format: str): @@ -11,43 +72,40 @@ def __init__(self, issues: list, report_format: str): self.format = report_format def generate(self) -> str: - if self.format == 'json': + if self.format == "json": return self.to_json() - if self.format == 'sarif': + if self.format == "sarif": return self.to_sarif() - if self.format == 'html': + if self.format == "html": return self.to_html() return self.to_console() + def to_console(self) -> str: if not self.issues: return "\nNo issues found." output = [] + severity_order = ["CRITICAL", "HIGH", "MEDIUM", "LOW"] - # Define severity order (highest to lowest priority) - severity_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW'] - - # Group issues by severity - issues_by_severity = {} + issues_by_severity: dict[str, list] = {} for issue in self.issues: - severity = str(issue.severity).split('.')[-1].upper() - if severity not in issues_by_severity: - issues_by_severity[severity] = [] - issues_by_severity[severity].append(issue) + severity = _severity_key(issue) + issues_by_severity.setdefault(severity, []).append(issue) - # Output grouped by severity (in priority order) for severity in severity_order: if severity not in issues_by_severity: continue - issues = issues_by_severity[severity] - # Sort issues within each severity group by file path and line number - sorted_issues = sorted(issues, key=lambda i: (i.file_path, i.line_number)) + sorted_issues = sorted( + issues_by_severity[severity], + key=lambda i: (i.file_path, i.line_number), + ) - # Add severity header output.append(f"\n{'='*60}") - output.append(f" {severity} ({len(sorted_issues)} issue{'s' if len(sorted_issues) != 1 else ''})") + output.append( + f" {severity} ({len(sorted_issues)} issue{'s' if len(sorted_issues) != 1 else ''})" + ) output.append(f"{'='*60}") for issue in sorted_issues: @@ -60,6 +118,10 @@ def to_console(self) -> str: return "\n".join(output) + # ------------------------------------------------------------------ # + # JSON # + # ------------------------------------------------------------------ # + def to_json(self) -> str: report = { "summary": {"issue_count": len(self.issues)}, @@ -70,47 +132,120 @@ def to_json(self) -> str: "file_path": issue.file_path, "line_number": issue.line_number, "code": issue.code, - "severity": str(issue.severity).split('.')[-1], + "severity": _severity_key(issue), "remediation": issue.remediation, - } for issue in self.issues - ] + } + for issue in self.issues + ], } + return json.dumps(report, indent=2) + # ------------------------------------------------------------------ # + # SARIF # + # ------------------------------------------------------------------ # + def to_sarif(self) -> str: - tool = Tool(driver=ReportingDescriptor(id="pyspector", name="PySpector")) - rules = [] - results = [] - - # Create a unique list of rules for the SARIF report - rule_map = {} + + rule_index_map: dict[str, int] = {} + rules: list[ReportingDescriptor] = [] + for issue in self.issues: - if issue.rule_id not in rule_map: - rule_map[issue.rule_id] = ReportingDescriptor(id=issue.rule_id, name=issue.description) - - # sarif_om expects lists, not values view - tool.driver.rules = list(rule_map.values()) + + if issue.rule_id in rule_index_map: + continue + + severity_key = _severity_key(issue) + + rule = ReportingDescriptor( + id=issue.rule_id, + name=issue.rule_id, + short_description=MultiformatMessageString( + text=issue.description + ), + help=MultiformatMessageString( + text=issue.remediation or issue.description, + markdown=( + f"**Remediation:** {issue.remediation}" + if issue.remediation + else None + ), + ), + default_configuration=ReportingConfiguration( + level=_SEVERITY_TO_SARIF_LEVEL.get( + severity_key, + "warning", + ) + ), + ) + + rule_index_map[issue.rule_id] = len(rules) + rules.append(rule) + + driver = ToolComponent( + name="PySpector", + version=_PYSPECTOR_VERSION, + information_uri="https://github.com/your-org/pyspector", + rules=rules, + ) + + tool = Tool(driver=driver) + + results: list[Result] = [] for issue in self.issues: - # FIX: Use the Region object from sarif_om instead of a raw dict - region = Region(start_line=issue.line_number) - + + severity_key = _severity_key(issue) + level = _SEVERITY_TO_SARIF_LEVEL.get( + severity_key, + "warning", + ) + + region = Region( + start_line=issue.line_number, + snippet=MultiformatMessageString( + text=issue.code.strip() + ), + ) + location = Location( physical_location=PhysicalLocation( - artifact_location=ArtifactLocation(uri=issue.file_path), - region=region + artifact_location=ArtifactLocation( + uri=issue.file_path, + uri_base_id="%SRCROOT%", + ), + region=region, ) ) - results.append(Result(rule_id=issue.rule_id, message={"text": issue.description}, locations=[location])) - + + result = Result( + rule_id=issue.rule_id, + rule_index=rule_index_map[issue.rule_id], + level=level, + message=Message(text=issue.description), + locations=[location], + ) + + results.append(result) + run = Run(tool=tool, results=results) - log = SarifLog(version="2.1.0", schema_uri="https://schemastore.azurewebsites.net/schemas/json/sarif-2.1.0-rtm.5.json", runs=[run]) - - # FIX: Remove asdict(). Use default lambda to serialize non-dataclass objects. - return json.dumps(log, default=lambda o: o.__dict__, indent=2) - + + log = SarifLog( + version="2.1.0", + schema_uri=( + "https://raw.githubusercontent.com/oasis-tcs/" + "sarif-spec/master/Schemata/sarif-schema-2.1.0.json" + ), + runs=[run], + ) + + return json.dumps(_clean(log), indent=2) + + # ------------------------------------------------------------------ # + # HTML # + # ------------------------------------------------------------------ # + def to_html(self) -> str: - # A simple HTML report html = f""" PySpector Scan Report @@ -119,13 +254,14 @@ def to_html(self) -> str:

Found {len(self.issues)} issues.

- - - - - + + + + + """ + for issue in self.issues: html += f""" @@ -136,5 +272,7 @@ def to_html(self) -> str: """ + html += "
FileLineSeverityDescriptionCodeFileLineSeverityDescriptionCode
{html_module.escape(issue.code)}
" + return html