diff --git a/src/skillspector/nodes/deduplicate.py b/src/skillspector/nodes/deduplicate.py new file mode 100644 index 0000000..abda513 --- /dev/null +++ b/src/skillspector/nodes/deduplicate.py @@ -0,0 +1,104 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Cross-analyzer finding deduplication. + +Merges findings that represent the same conceptual issue observed multiple +times — either within the same file or across files with identical patterns. + +Deduplication strategy: +1. Same-file dedup: Same rule_id + same file + same matched_text + → keep highest confidence instance +2. Cross-file consolidation: Same rule_id + same matched_text across files + → keep highest confidence instance +""" + +from __future__ import annotations + +from skillspector.logging_config import get_logger +from skillspector.models import Finding + +logger = get_logger(__name__) + + +def _same_file_key(finding: Finding) -> tuple[str, str, str]: + """Build a deduplication key for same-file matches.""" + matched = (finding.matched_text or "").strip()[:100] + return (finding.rule_id, finding.file, matched) + + +def _cross_file_key(finding: Finding) -> tuple[str, str]: + """Build a cross-file deduplication key from rule_id and normalized matched_text.""" + matched = (finding.matched_text or "").strip()[:100] + return (finding.rule_id, matched) + + +def deduplicate(findings: list[Finding]) -> list[Finding]: + """Deduplicate a list of findings, returning a reduced list. + + Two-pass deduplication: + 1. Same-file: identical (rule_id, file, matched_text) → keep highest confidence + 2. Cross-file: identical (rule_id, matched_text) across different files + → keep highest confidence representative + + Findings without matched_text are never cross-file deduplicated (they lack + a reliable identity signal). + """ + if not findings: + return [] + + original_count = len(findings) + + # Pass 1: Same-file deduplication + same_file_best: dict[tuple[str, str, str], Finding] = {} + for f in findings: + key = _same_file_key(f) + existing = same_file_best.get(key) + if existing is None or f.confidence > existing.confidence: + same_file_best[key] = f + + after_same_file = list(same_file_best.values()) + + # Pass 2: Cross-file deduplication (only for findings WITH matched_text) + cross_file_best: dict[tuple[str, str], Finding] = {} + no_text_findings: list[Finding] = [] + + for f in after_same_file: + matched = (f.matched_text or "").strip() + if not matched: + no_text_findings.append(f) + continue + key = _cross_file_key(f) + existing = cross_file_best.get(key) + if existing is None or f.confidence > existing.confidence: + cross_file_best[key] = f + + deduplicated = list(cross_file_best.values()) + no_text_findings + + removed = original_count - len(deduplicated) + if removed > 0: + logger.info( + "Deduplication: %d → %d findings (%d duplicates removed)", + original_count, + len(deduplicated), + removed, + ) + + severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3} + deduplicated.sort( + key=lambda f: (severity_order.get(f.severity.upper(), 4), f.file, f.start_line) + ) + + return deduplicated diff --git a/src/skillspector/nodes/report.py b/src/skillspector/nodes/report.py index 1b76cbf..21aa5c7 100644 --- a/src/skillspector/nodes/report.py +++ b/src/skillspector/nodes/report.py @@ -34,6 +34,7 @@ from skillspector.llm_utils import is_llm_available from skillspector.logging_config import get_logger from skillspector.models import Finding +from skillspector.nodes.deduplicate import deduplicate from skillspector.sarif_models import ( SARIF_SCHEMA_URI, SarifArtifactLocation, @@ -376,12 +377,11 @@ def _format_markdown( def report(state: SkillspectorState) -> dict[str, object]: """Generate SARIF, compute risk score, and set report_body from output_format.""" - findings = state.get("filtered_findings", state.get("findings", [])) - # When use_llm is False, meta_analyzer is skipped; ensure final state has filtered_findings + raw_findings = state.get("filtered_findings", state.get("findings", [])) if "filtered_findings" not in state: - filtered_findings = state.get("findings", []) - else: - filtered_findings = findings + raw_findings = state.get("findings", []) + findings_for_scoring = deduplicate(raw_findings) + filtered_findings = raw_findings component_metadata = state.get("component_metadata") or [] has_executable_scripts = state.get("has_executable_scripts", False) manifest = state.get("manifest") or {} @@ -390,13 +390,13 @@ def report(state: SkillspectorState) -> dict[str, object]: use_llm = state.get("use_llm", True) risk_score, risk_severity, risk_recommendation = _compute_risk_score( - findings, has_executable_scripts + findings_for_scoring, has_executable_scripts ) - sarif_report = _build_sarif(findings) + sarif_report = _build_sarif(filtered_findings) if output_format == "terminal": report_body = _format_terminal( - findings, + filtered_findings, component_metadata, manifest, skill_path, @@ -407,7 +407,7 @@ def report(state: SkillspectorState) -> dict[str, object]: ) elif output_format == "json": report_body = _format_json( - findings, + filtered_findings, component_metadata, manifest, skill_path, @@ -419,7 +419,7 @@ def report(state: SkillspectorState) -> dict[str, object]: ) elif output_format == "markdown": report_body = _format_markdown( - findings, + filtered_findings, component_metadata, manifest, skill_path, diff --git a/tests/nodes/test_deduplicate.py b/tests/nodes/test_deduplicate.py new file mode 100644 index 0000000..16e913c --- /dev/null +++ b/tests/nodes/test_deduplicate.py @@ -0,0 +1,224 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for cross-analyzer finding deduplication.""" + +from __future__ import annotations + +from skillspector.models import Finding +from skillspector.nodes.deduplicate import deduplicate + + +def _finding( + rule_id: str = "TM1", + file: str = "tool.py", + matched_text: str = "subprocess.run(cmd, shell=True)", + confidence: float = 0.8, + severity: str = "HIGH", + start_line: int = 1, +) -> Finding: + return Finding( + rule_id=rule_id, + message=f"Test finding {rule_id}", + severity=severity, + confidence=confidence, + file=file, + start_line=start_line, + matched_text=matched_text, + ) + + +class TestSameFileDedup: + """Same rule_id + same file + same matched_text → keep highest confidence.""" + + def test_exact_duplicates_reduced_to_one(self) -> None: + """Two identical findings in same file → one output.""" + findings = [ + _finding(file="a.py", start_line=1), + _finding(file="a.py", start_line=5), + ] + result = deduplicate(findings) + assert len(result) == 1 + + def test_keeps_highest_confidence(self) -> None: + """When duplicates exist, the highest confidence one is kept.""" + findings = [ + _finding(file="a.py", confidence=0.6), + _finding(file="a.py", confidence=0.9), + _finding(file="a.py", confidence=0.3), + ] + result = deduplicate(findings) + assert len(result) == 1 + assert result[0].confidence == 0.9 + + def test_different_rules_same_file_not_deduped(self) -> None: + """Different rule_ids in same file are independent findings.""" + findings = [ + _finding(rule_id="TM1", file="a.py"), + _finding(rule_id="TM2", file="a.py"), + ] + result = deduplicate(findings) + assert len(result) == 2 + + def test_different_matched_text_same_file_not_deduped(self) -> None: + """Same rule but different matched text in same file → separate findings.""" + findings = [ + _finding(file="a.py", matched_text="subprocess.run(cmd, shell=True)"), + _finding(file="a.py", matched_text="subprocess.Popen(cmd, shell=True)"), + ] + result = deduplicate(findings) + assert len(result) == 2 + + +class TestCrossFileDedup: + """Same rule_id + same matched_text across files → keep best.""" + + def test_same_pattern_across_files_deduplicated(self) -> None: + """Same rule + same matched text in different files → one output.""" + findings = [ + _finding(file="step1.py"), + _finding(file="step2.py"), + _finding(file="step3.py"), + _finding(file="step4.py"), + ] + result = deduplicate(findings) + assert len(result) == 1 + + def test_cross_file_keeps_highest_confidence(self) -> None: + """Cross-file dedup keeps the highest confidence finding.""" + findings = [ + _finding(file="a.py", confidence=0.5), + _finding(file="b.py", confidence=0.9), + _finding(file="c.py", confidence=0.7), + ] + result = deduplicate(findings) + assert len(result) == 1 + assert result[0].confidence == 0.9 + assert result[0].file == "b.py" + + def test_different_patterns_across_files_not_deduped(self) -> None: + """Different matched texts are independent even with same rule_id.""" + findings = [ + _finding(file="a.py", matched_text="curl -k"), + _finding(file="b.py", matched_text="wget --no-check-certificate"), + ] + result = deduplicate(findings) + assert len(result) == 2 + + def test_different_rules_same_pattern_not_deduped(self) -> None: + """Different rules with same matched text are independent.""" + findings = [ + _finding(rule_id="TM1", file="a.py", matched_text="curl -k"), + _finding(rule_id="SC1", file="b.py", matched_text="curl -k"), + ] + result = deduplicate(findings) + assert len(result) == 2 + + +class TestNoMatchedText: + """Findings without matched_text are never cross-file deduplicated.""" + + def test_no_matched_text_kept_independently(self) -> None: + """Findings with empty/None matched_text are all kept.""" + findings = [ + _finding(file="a.py", matched_text=""), + _finding(file="b.py", matched_text=""), + ] + result = deduplicate(findings) + assert len(result) == 2 + + def test_none_matched_text_kept(self) -> None: + """Findings with None matched_text are preserved.""" + f1 = Finding(rule_id="TM1", message="Test", file="a.py", start_line=1, matched_text=None) + f2 = Finding(rule_id="TM1", message="Test", file="b.py", start_line=1, matched_text=None) + result = deduplicate([f1, f2]) + assert len(result) == 2 + + +class TestEdgeCases: + """Edge cases and ordering.""" + + def test_empty_list(self) -> None: + """Empty input returns empty output.""" + assert deduplicate([]) == [] + + def test_single_finding_unchanged(self) -> None: + """A single finding passes through unchanged.""" + findings = [_finding()] + result = deduplicate(findings) + assert len(result) == 1 + assert result[0].rule_id == "TM1" + + def test_output_sorted_by_severity_then_file(self) -> None: + """Output is sorted: CRITICAL > HIGH > MEDIUM > LOW, then by file.""" + findings = [ + _finding(rule_id="A", severity="LOW", file="z.py", matched_text="low"), + _finding(rule_id="B", severity="CRITICAL", file="a.py", matched_text="crit"), + _finding(rule_id="C", severity="HIGH", file="m.py", matched_text="high"), + _finding(rule_id="D", severity="MEDIUM", file="b.py", matched_text="med"), + ] + result = deduplicate(findings) + assert len(result) == 4 + assert [r.severity for r in result] == ["CRITICAL", "HIGH", "MEDIUM", "LOW"] + + def test_real_world_repetitive_skill(self) -> None: + """Simulates a skill with subprocess in 5 files — should deduplicate to 1.""" + findings = [ + _finding( + rule_id="TM1", + file=f"step{i}.py", + matched_text="subprocess.run(cmd, shell=True)", + confidence=0.8, + ) + for i in range(5) + ] + result = deduplicate(findings) + assert len(result) == 1 + + def test_mixed_dedup_scenario(self) -> None: + """Mix of same-file, cross-file, and unique findings.""" + findings = [ + # Same pattern in 3 files → should become 1 + _finding(rule_id="TM1", file="a.py", matched_text="shell=True"), + _finding(rule_id="TM1", file="b.py", matched_text="shell=True"), + _finding(rule_id="TM1", file="c.py", matched_text="shell=True"), + # Different pattern, unique + _finding(rule_id="E1", file="a.py", matched_text="requests.post(url)"), + # Same rule different pattern + _finding(rule_id="TM1", file="d.py", matched_text="--force delete"), + ] + result = deduplicate(findings) + # TM1 shell=True (1) + E1 requests.post (1) + TM1 --force (1) = 3 + assert len(result) == 3 + + def test_whitespace_normalization(self) -> None: + """Leading/trailing whitespace in matched_text is trimmed for key.""" + findings = [ + _finding(file="a.py", matched_text=" curl -k "), + _finding(file="b.py", matched_text="curl -k"), + ] + result = deduplicate(findings) + assert len(result) == 1 + + def test_long_matched_text_truncated_for_key(self) -> None: + """Only first 100 chars of matched_text are used for dedup key.""" + base = "x" * 100 + findings = [ + _finding(file="a.py", matched_text=base + "AAAA"), + _finding(file="b.py", matched_text=base + "BBBB"), + ] + result = deduplicate(findings) + # First 100 chars are identical → deduplicated + assert len(result) == 1 diff --git a/tests/nodes/test_report.py b/tests/nodes/test_report.py index 8e2e994..4f45443 100644 --- a/tests/nodes/test_report.py +++ b/tests/nodes/test_report.py @@ -501,3 +501,32 @@ def test_report_default_output_format_is_sarif(self) -> None: body = result["report_body"] json.loads(body) assert "sarif_report" in result + + def test_report_dedup_affects_score_only_not_report_output(self) -> None: + """Deduplication reduces score but all affected files appear in the report.""" + duplicated = [ + Finding( + rule_id="TM1", + message="shell injection", + severity="HIGH", + confidence=0.8, + file=f"step{i}.py", + start_line=10, + matched_text="subprocess.run(cmd, shell=True)", + ) + for i in range(4) + ] + state: SkillspectorState = { + "filtered_findings": duplicated, + "component_metadata": [], + "has_executable_scripts": False, + "manifest": {"name": "multi-file"}, + "skill_path": "/tmp/skill", + "output_format": "json", + } + result = report(state) + body = json.loads(result["report_body"]) + reported_files = {issue["location"]["file"] for issue in body["issues"]} + assert reported_files == {"step0.py", "step1.py", "step2.py", "step3.py"} + assert len(body["issues"]) == 4 + assert result["risk_score"] < 4 * 25