Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions src/skillspector/nodes/deduplicate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Cross-analyzer finding deduplication.

Merges findings that represent the same conceptual issue observed multiple
times — either within the same file or across files with identical patterns.

Deduplication strategy:
1. Same-file dedup: Same rule_id + same file + same matched_text
→ keep highest confidence instance
2. Cross-file consolidation: Same rule_id + same matched_text across files
→ keep highest confidence instance
"""

from __future__ import annotations

from skillspector.logging_config import get_logger
from skillspector.models import Finding

logger = get_logger(__name__)


def _same_file_key(finding: Finding) -> tuple[str, str, str]:
"""Build a deduplication key for same-file matches."""
matched = (finding.matched_text or "").strip()[:100]
return (finding.rule_id, finding.file, matched)


def _cross_file_key(finding: Finding) -> tuple[str, str]:
"""Build a cross-file deduplication key from rule_id and normalized matched_text."""
matched = (finding.matched_text or "").strip()[:100]
return (finding.rule_id, matched)


def deduplicate(findings: list[Finding]) -> list[Finding]:
"""Deduplicate a list of findings, returning a reduced list.

Two-pass deduplication:
1. Same-file: identical (rule_id, file, matched_text) → keep highest confidence
2. Cross-file: identical (rule_id, matched_text) across different files
→ keep highest confidence representative

Findings without matched_text are never cross-file deduplicated (they lack
a reliable identity signal).
"""
if not findings:
return []

original_count = len(findings)

# Pass 1: Same-file deduplication
same_file_best: dict[tuple[str, str, str], Finding] = {}
for f in findings:
key = _same_file_key(f)
existing = same_file_best.get(key)
if existing is None or f.confidence > existing.confidence:
same_file_best[key] = f

after_same_file = list(same_file_best.values())

# Pass 2: Cross-file deduplication (only for findings WITH matched_text)
cross_file_best: dict[tuple[str, str], Finding] = {}
no_text_findings: list[Finding] = []

for f in after_same_file:
matched = (f.matched_text or "").strip()
if not matched:
no_text_findings.append(f)
continue
key = _cross_file_key(f)
existing = cross_file_best.get(key)
if existing is None or f.confidence > existing.confidence:
cross_file_best[key] = f

deduplicated = list(cross_file_best.values()) + no_text_findings

removed = original_count - len(deduplicated)
if removed > 0:
logger.info(
"Deduplication: %d → %d findings (%d duplicates removed)",
original_count,
len(deduplicated),
removed,
)

severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3}
deduplicated.sort(
key=lambda f: (severity_order.get(f.severity.upper(), 4), f.file, f.start_line)
)

return deduplicated
20 changes: 10 additions & 10 deletions src/skillspector/nodes/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from skillspector.llm_utils import is_llm_available
from skillspector.logging_config import get_logger
from skillspector.models import Finding
from skillspector.nodes.deduplicate import deduplicate
from skillspector.sarif_models import (
SARIF_SCHEMA_URI,
SarifArtifactLocation,
Expand Down Expand Up @@ -376,12 +377,11 @@ def _format_markdown(

def report(state: SkillspectorState) -> dict[str, object]:
"""Generate SARIF, compute risk score, and set report_body from output_format."""
findings = state.get("filtered_findings", state.get("findings", []))
# When use_llm is False, meta_analyzer is skipped; ensure final state has filtered_findings
raw_findings = state.get("filtered_findings", state.get("findings", []))
if "filtered_findings" not in state:
filtered_findings = state.get("findings", [])
else:
filtered_findings = findings
raw_findings = state.get("findings", [])
findings_for_scoring = deduplicate(raw_findings)
filtered_findings = raw_findings
component_metadata = state.get("component_metadata") or []
has_executable_scripts = state.get("has_executable_scripts", False)
manifest = state.get("manifest") or {}
Expand All @@ -390,13 +390,13 @@ def report(state: SkillspectorState) -> dict[str, object]:
use_llm = state.get("use_llm", True)

risk_score, risk_severity, risk_recommendation = _compute_risk_score(
findings, has_executable_scripts
findings_for_scoring, has_executable_scripts
)
sarif_report = _build_sarif(findings)
sarif_report = _build_sarif(filtered_findings)

if output_format == "terminal":
report_body = _format_terminal(
findings,
filtered_findings,
component_metadata,
manifest,
skill_path,
Expand All @@ -407,7 +407,7 @@ def report(state: SkillspectorState) -> dict[str, object]:
)
elif output_format == "json":
report_body = _format_json(
findings,
filtered_findings,
component_metadata,
manifest,
skill_path,
Expand All @@ -419,7 +419,7 @@ def report(state: SkillspectorState) -> dict[str, object]:
)
elif output_format == "markdown":
report_body = _format_markdown(
findings,
filtered_findings,
component_metadata,
manifest,
skill_path,
Expand Down
224 changes: 224 additions & 0 deletions tests/nodes/test_deduplicate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for cross-analyzer finding deduplication."""

from __future__ import annotations

from skillspector.models import Finding
from skillspector.nodes.deduplicate import deduplicate


def _finding(
rule_id: str = "TM1",
file: str = "tool.py",
matched_text: str = "subprocess.run(cmd, shell=True)",
confidence: float = 0.8,
severity: str = "HIGH",
start_line: int = 1,
) -> Finding:
return Finding(
rule_id=rule_id,
message=f"Test finding {rule_id}",
severity=severity,
confidence=confidence,
file=file,
start_line=start_line,
matched_text=matched_text,
)


class TestSameFileDedup:
"""Same rule_id + same file + same matched_text → keep highest confidence."""

def test_exact_duplicates_reduced_to_one(self) -> None:
"""Two identical findings in same file → one output."""
findings = [
_finding(file="a.py", start_line=1),
_finding(file="a.py", start_line=5),
]
result = deduplicate(findings)
assert len(result) == 1

def test_keeps_highest_confidence(self) -> None:
"""When duplicates exist, the highest confidence one is kept."""
findings = [
_finding(file="a.py", confidence=0.6),
_finding(file="a.py", confidence=0.9),
_finding(file="a.py", confidence=0.3),
]
result = deduplicate(findings)
assert len(result) == 1
assert result[0].confidence == 0.9

def test_different_rules_same_file_not_deduped(self) -> None:
"""Different rule_ids in same file are independent findings."""
findings = [
_finding(rule_id="TM1", file="a.py"),
_finding(rule_id="TM2", file="a.py"),
]
result = deduplicate(findings)
assert len(result) == 2

def test_different_matched_text_same_file_not_deduped(self) -> None:
"""Same rule but different matched text in same file → separate findings."""
findings = [
_finding(file="a.py", matched_text="subprocess.run(cmd, shell=True)"),
_finding(file="a.py", matched_text="subprocess.Popen(cmd, shell=True)"),
]
result = deduplicate(findings)
assert len(result) == 2


class TestCrossFileDedup:
"""Same rule_id + same matched_text across files → keep best."""

def test_same_pattern_across_files_deduplicated(self) -> None:
"""Same rule + same matched text in different files → one output."""
findings = [
_finding(file="step1.py"),
_finding(file="step2.py"),
_finding(file="step3.py"),
_finding(file="step4.py"),
]
result = deduplicate(findings)
assert len(result) == 1

def test_cross_file_keeps_highest_confidence(self) -> None:
"""Cross-file dedup keeps the highest confidence finding."""
findings = [
_finding(file="a.py", confidence=0.5),
_finding(file="b.py", confidence=0.9),
_finding(file="c.py", confidence=0.7),
]
result = deduplicate(findings)
assert len(result) == 1
assert result[0].confidence == 0.9
assert result[0].file == "b.py"

def test_different_patterns_across_files_not_deduped(self) -> None:
"""Different matched texts are independent even with same rule_id."""
findings = [
_finding(file="a.py", matched_text="curl -k"),
_finding(file="b.py", matched_text="wget --no-check-certificate"),
]
result = deduplicate(findings)
assert len(result) == 2

def test_different_rules_same_pattern_not_deduped(self) -> None:
"""Different rules with same matched text are independent."""
findings = [
_finding(rule_id="TM1", file="a.py", matched_text="curl -k"),
_finding(rule_id="SC1", file="b.py", matched_text="curl -k"),
]
result = deduplicate(findings)
assert len(result) == 2


class TestNoMatchedText:
"""Findings without matched_text are never cross-file deduplicated."""

def test_no_matched_text_kept_independently(self) -> None:
"""Findings with empty/None matched_text are all kept."""
findings = [
_finding(file="a.py", matched_text=""),
_finding(file="b.py", matched_text=""),
]
result = deduplicate(findings)
assert len(result) == 2

def test_none_matched_text_kept(self) -> None:
"""Findings with None matched_text are preserved."""
f1 = Finding(rule_id="TM1", message="Test", file="a.py", start_line=1, matched_text=None)
f2 = Finding(rule_id="TM1", message="Test", file="b.py", start_line=1, matched_text=None)
result = deduplicate([f1, f2])
assert len(result) == 2


class TestEdgeCases:
"""Edge cases and ordering."""

def test_empty_list(self) -> None:
"""Empty input returns empty output."""
assert deduplicate([]) == []

def test_single_finding_unchanged(self) -> None:
"""A single finding passes through unchanged."""
findings = [_finding()]
result = deduplicate(findings)
assert len(result) == 1
assert result[0].rule_id == "TM1"

def test_output_sorted_by_severity_then_file(self) -> None:
"""Output is sorted: CRITICAL > HIGH > MEDIUM > LOW, then by file."""
findings = [
_finding(rule_id="A", severity="LOW", file="z.py", matched_text="low"),
_finding(rule_id="B", severity="CRITICAL", file="a.py", matched_text="crit"),
_finding(rule_id="C", severity="HIGH", file="m.py", matched_text="high"),
_finding(rule_id="D", severity="MEDIUM", file="b.py", matched_text="med"),
]
result = deduplicate(findings)
assert len(result) == 4
assert [r.severity for r in result] == ["CRITICAL", "HIGH", "MEDIUM", "LOW"]

def test_real_world_repetitive_skill(self) -> None:
"""Simulates a skill with subprocess in 5 files — should deduplicate to 1."""
findings = [
_finding(
rule_id="TM1",
file=f"step{i}.py",
matched_text="subprocess.run(cmd, shell=True)",
confidence=0.8,
)
for i in range(5)
]
result = deduplicate(findings)
assert len(result) == 1

def test_mixed_dedup_scenario(self) -> None:
"""Mix of same-file, cross-file, and unique findings."""
findings = [
# Same pattern in 3 files → should become 1
_finding(rule_id="TM1", file="a.py", matched_text="shell=True"),
_finding(rule_id="TM1", file="b.py", matched_text="shell=True"),
_finding(rule_id="TM1", file="c.py", matched_text="shell=True"),
# Different pattern, unique
_finding(rule_id="E1", file="a.py", matched_text="requests.post(url)"),
# Same rule different pattern
_finding(rule_id="TM1", file="d.py", matched_text="--force delete"),
]
result = deduplicate(findings)
# TM1 shell=True (1) + E1 requests.post (1) + TM1 --force (1) = 3
assert len(result) == 3

def test_whitespace_normalization(self) -> None:
"""Leading/trailing whitespace in matched_text is trimmed for key."""
findings = [
_finding(file="a.py", matched_text=" curl -k "),
_finding(file="b.py", matched_text="curl -k"),
]
result = deduplicate(findings)
assert len(result) == 1

def test_long_matched_text_truncated_for_key(self) -> None:
"""Only first 100 chars of matched_text are used for dedup key."""
base = "x" * 100
findings = [
_finding(file="a.py", matched_text=base + "AAAA"),
_finding(file="b.py", matched_text=base + "BBBB"),
]
result = deduplicate(findings)
# First 100 chars are identical → deduplicated
assert len(result) == 1
Loading