diff --git a/src/skillspector/cli.py b/src/skillspector/cli.py index 83e3224a..57bac058 100644 --- a/src/skillspector/cli.py +++ b/src/skillspector/cli.py @@ -35,6 +35,7 @@ from skillspector import __version__ from skillspector.graph import graph from skillspector.logging_config import get_logger, set_level +from skillspector.multi_skill import MultiSkillDetectionResult, detect_skills logger = get_logger(__name__) @@ -171,6 +172,14 @@ def scan( help="Directory containing additional YARA rule files (.yar/.yara) to load alongside built-in rules.", ), ] = None, + recursive: Annotated[ + bool, + typer.Option( + "--recursive", + "-r", + help="Scan directories containing multiple skills (immediate subdirectories with SKILL.md) independently.", + ), + ] = False, verbose: Annotated[ bool, typer.Option( @@ -188,6 +197,7 @@ def scan( skillspector scan ./my-skill/ skillspector scan ./my-skill/ --format json --output report.json skillspector scan https://github.com/user/my-skill --no-llm + skillspector scan ./skill-collection/ --recursive Environment variables: @@ -205,12 +215,33 @@ def scan( ANTHROPIC_API_KEY for SKILLSPECTOR_PROVIDER=anthropic NVIDIA_INFERENCE_KEY for the NVIDIA providers """ + if verbose: + set_level("DEBUG") + + resolved_path = Path(input_path).resolve() + if recursive and resolved_path.is_dir(): + detection = detect_skills(resolved_path) + if detection.is_multi_skill: + _scan_multi_skill(detection, format, output, no_llm, yara_rules_dir, verbose) + return + if not detection.has_root_skill and len(detection.skills) == 0: + console.print( + "[yellow]Warning:[/yellow] --recursive specified but no sub-skills " + "detected. Scanning as single skill." + ) + elif resolved_path.is_dir(): + detection = detect_skills(resolved_path) + if detection.is_multi_skill: + console.print( + f"[yellow]Warning:[/yellow] Found {len(detection.skills)} skills in " + f"this directory. Use --recursive to scan each independently." + ) + result = None try: yara_dir = str(yara_rules_dir.resolve()) if yara_rules_dir else None state = _scan_state(input_path, format, no_llm, yara_rules_dir=yara_dir) if verbose: - set_level("DEBUG") console.print("[dim]Running scan...[/dim]") logger.debug( "Scan started: input_path=%s, format=%s, use_llm=%s", @@ -218,20 +249,7 @@ def scan( format, not no_llm, ) - env = os.environ.get("ENV", "dev") - tags = ["skillspector", f"environment:{env}"] - extra_tags = os.environ.get("LANGCHAIN_TAGS_EXTRA", "") - tags.extend(t.strip() for t in extra_tags.split(",") if t.strip()) - trace_config: RunnableConfig = { - "run_name": "skillspector-scan", - "tags": tags, - "metadata": { - "input_path": input_path, - "use_llm": not no_llm, - "output_format": format.value, - "version": __version__, - }, - } + trace_config = _build_trace_config(input_path, format, no_llm) result = graph.invoke(state, config=trace_config) _write_result(result, output, format) @@ -254,5 +272,107 @@ def scan( _cleanup_result(result) +def _build_trace_config(input_path: str, format: FormatChoice, no_llm: bool) -> RunnableConfig: + """Build LangSmith trace config for a scan invocation.""" + env = os.environ.get("ENV", "dev") + tags = ["skillspector", f"environment:{env}"] + extra_tags = os.environ.get("LANGCHAIN_TAGS_EXTRA", "") + tags.extend(t.strip() for t in extra_tags.split(",") if t.strip()) + return { + "run_name": "skillspector-scan", + "tags": tags, + "metadata": { + "input_path": input_path, + "use_llm": not no_llm, + "output_format": format.value, + "version": __version__, + }, + } + + +def _scan_multi_skill( + detection: MultiSkillDetectionResult, + format: FormatChoice, + output: Path | None, + no_llm: bool, + yara_rules_dir: Path | None, + verbose: bool, +) -> None: + """Scan each detected sub-skill independently and produce a combined report.""" + skills = detection.skills + console.print(f"[bold]Multi-skill directory detected:[/bold] {len(skills)} skills found\n") + + results: list[dict[str, object]] = [] + max_score = 0 + + for i, skill in enumerate(skills, 1): + console.print( + f" [{i}/{len(skills)}] Scanning [bold]{skill.name}[/bold] ({skill.relative_path}/)" + ) + yara_dir = str(yara_rules_dir.resolve()) if yara_rules_dir else None + state = _scan_state(str(skill.path), format, no_llm, yara_rules_dir=yara_dir) + trace_config = _build_trace_config(str(skill.path), format, no_llm) + + try: + result = graph.invoke(state, config=trace_config) + results.append(result) + score = result.get("risk_score") or 0 + if isinstance(score, int) and score > max_score: + max_score = score + severity = result.get("risk_severity") or "LOW" + console.print(f" Score: {score}/100 ({severity})\n") + except Exception as e: + console.print(f" [red]Error:[/red] {e}\n") + results.append({"skill_name": skill.name, "error": str(e)}) + + console.print("\n[bold]═══ Multi-Skill Summary ═══[/bold]\n") + console.print(f" {'Skill':<30} {'Score':<8} {'Severity':<12} {'Findings':<10}") + console.print(f" {'─' * 30} {'─' * 8} {'─' * 12} {'─' * 10}") + + for skill, result in zip(skills, results, strict=True): + if "error" in result: + console.print(f" {skill.name:<30} {'ERROR':<8} {'—':<12} {'—':<10}") + continue + score = result.get("risk_score", 0) + severity = result.get("risk_severity", "LOW") + filtered = result.get("filtered_findings") or result.get("findings") + finding_count = len(filtered) if isinstance(filtered, list) else 0 + console.print(f" {skill.name:<30} {score:<8} {severity:<12} {finding_count:<10}") + + console.print("") + + if output and format == FormatChoice.json: + combined = { + "multi_skill": True, + "skill_count": len(skills), + "max_risk_score": max_score, + "skills": [], + } + for skill, result in zip(skills, results, strict=True): + if "error" in result: + combined["skills"].append({"name": skill.name, "error": result["error"]}) + else: + combined["skills"].append( + { + "name": skill.name, + "path": skill.relative_path, + "risk_score": result.get("risk_score", 0), + "risk_severity": result.get("risk_severity", "LOW"), + "finding_count": len( + result.get("filtered_findings") or result.get("findings") or [] + ), + } + ) + Path(output).write_text(json.dumps(combined, indent=2), encoding="utf-8") + console.print(f"[green]Combined report saved to:[/green] {output}") + elif output: + for _skill, result in zip(skills, results, strict=True): + if "error" not in result: + _write_result(result, None, format) + + if max_score > 50: + raise typer.Exit(code=1) + + if __name__ == "__main__": app() diff --git a/src/skillspector/multi_skill.py b/src/skillspector/multi_skill.py new file mode 100644 index 00000000..ebf92b7f --- /dev/null +++ b/src/skillspector/multi_skill.py @@ -0,0 +1,127 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Multi-skill directory detection and per-skill scanning. + +Detects when a scanned directory contains multiple independent skills +(each with their own SKILL.md) and supports scanning each independently +to produce per-skill reports instead of one inflated monolithic result. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path + +from skillspector.logging_config import get_logger + +logger = get_logger(__name__) + + +@dataclass +class SkillDirectory: + """A detected skill within a multi-skill directory.""" + + path: Path + name: str + relative_path: str + + +@dataclass +class MultiSkillDetectionResult: + """Result of scanning a directory for multiple skills.""" + + is_multi_skill: bool + skills: list[SkillDirectory] = field(default_factory=list) + has_root_skill: bool = False + + +def detect_skills(directory: Path) -> MultiSkillDetectionResult: + """Detect whether a directory contains multiple independent skills. + + A directory is considered multi-skill when: + - It has NO root-level SKILL.md (or skill.md) + - At least 2 immediate subdirectories contain SKILL.md (or skill.md) + + If a root SKILL.md exists, the directory is treated as a single skill + (the standard behavior) regardless of nested SKILL.md files. + + Returns a MultiSkillDetectionResult with detected skills. + """ + if not directory.is_dir(): + return MultiSkillDetectionResult(is_multi_skill=False) + + has_root = _has_skill_md(directory) + if has_root: + return MultiSkillDetectionResult(is_multi_skill=False, has_root_skill=True) + + skills: list[SkillDirectory] = [] + for child in sorted(directory.iterdir()): + if not child.is_dir(): + continue + if child.name.startswith("."): + continue + if _has_skill_md(child): + name = _extract_skill_name(child) + skills.append( + SkillDirectory( + path=child, + name=name, + relative_path=child.name, + ) + ) + + is_multi = len(skills) >= 2 + return MultiSkillDetectionResult( + is_multi_skill=is_multi, + skills=skills, + has_root_skill=False, + ) + + +def _has_skill_md(directory: Path) -> bool: + """Check if directory contains a SKILL.md or skill.md at root level.""" + return (directory / "SKILL.md").is_file() or (directory / "skill.md").is_file() + + +def _extract_skill_name(skill_dir: Path) -> str: + """Extract skill name from SKILL.md frontmatter, falling back to directory name.""" + import re + + import yaml + + for name in ("SKILL.md", "skill.md"): + path = skill_dir / name + if not path.is_file(): + continue + try: + content = path.read_text(encoding="utf-8", errors="replace") + except OSError: + continue + if not content.startswith("---"): + break + end_match = re.search(r"\n---\s*\n", content[3:]) + if not end_match: + break + frontmatter = content[3 : end_match.start() + 3] + try: + data = yaml.safe_load(frontmatter) + except yaml.YAMLError: + break + if isinstance(data, dict) and "name" in data: + return str(data["name"]) + break + + return skill_dir.name diff --git a/tests/test_multi_skill.py b/tests/test_multi_skill.py new file mode 100644 index 00000000..3c1b634a --- /dev/null +++ b/tests/test_multi_skill.py @@ -0,0 +1,164 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for multi-skill directory detection.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from skillspector.multi_skill import detect_skills + + +@pytest.fixture +def multi_skill_dir(tmp_path: Path) -> Path: + """Create a directory with 3 sub-skills, no root SKILL.md.""" + for name in ("weather-lookup", "email-sender", "file-manager"): + skill_dir = tmp_path / name + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + f"---\nname: {name}\ndescription: A {name} skill\n---\n# {name}\n", + encoding="utf-8", + ) + (skill_dir / "tool.py").write_text(f"# {name} implementation\n", encoding="utf-8") + return tmp_path + + +@pytest.fixture +def single_skill_dir(tmp_path: Path) -> Path: + """Create a single-skill directory with root SKILL.md.""" + (tmp_path / "SKILL.md").write_text( + "---\nname: my-skill\ndescription: A test skill\n---\n# My Skill\n", + encoding="utf-8", + ) + (tmp_path / "tool.py").write_text("# implementation\n", encoding="utf-8") + return tmp_path + + +@pytest.fixture +def nested_with_root(tmp_path: Path) -> Path: + """A directory with root SKILL.md AND sub-skill SKILL.md files.""" + (tmp_path / "SKILL.md").write_text( + "---\nname: parent-skill\n---\n# Parent\n", + encoding="utf-8", + ) + sub = tmp_path / "sub-skill" + sub.mkdir() + (sub / "SKILL.md").write_text( + "---\nname: sub-skill\n---\n# Sub\n", + encoding="utf-8", + ) + return tmp_path + + +class TestDetectSkills: + """Tests for detect_skills().""" + + def test_multi_skill_directory_detected(self, multi_skill_dir: Path) -> None: + """Directory with no root SKILL.md and multiple sub-skills is detected.""" + result = detect_skills(multi_skill_dir) + assert result.is_multi_skill is True + assert len(result.skills) == 3 + assert result.has_root_skill is False + + def test_skill_names_extracted_from_frontmatter(self, multi_skill_dir: Path) -> None: + """Skill names come from SKILL.md frontmatter.""" + result = detect_skills(multi_skill_dir) + names = {s.name for s in result.skills} + assert names == {"weather-lookup", "email-sender", "file-manager"} + + def test_single_skill_not_multi(self, single_skill_dir: Path) -> None: + """Directory with root SKILL.md is not multi-skill.""" + result = detect_skills(single_skill_dir) + assert result.is_multi_skill is False + assert result.has_root_skill is True + assert len(result.skills) == 0 + + def test_root_skill_overrides_nested(self, nested_with_root: Path) -> None: + """Root SKILL.md means it's a single skill even with nested SKILL.md.""" + result = detect_skills(nested_with_root) + assert result.is_multi_skill is False + assert result.has_root_skill is True + + def test_empty_directory_not_multi(self, tmp_path: Path) -> None: + """Empty directory is not multi-skill.""" + result = detect_skills(tmp_path) + assert result.is_multi_skill is False + assert len(result.skills) == 0 + + def test_single_sub_skill_not_multi(self, tmp_path: Path) -> None: + """Only one sub-skill is not considered multi-skill (need >= 2).""" + sub = tmp_path / "only-skill" + sub.mkdir() + (sub / "SKILL.md").write_text("---\nname: only\n---\n# Only\n", encoding="utf-8") + result = detect_skills(tmp_path) + assert result.is_multi_skill is False + assert len(result.skills) == 1 + + def test_hidden_directories_skipped(self, tmp_path: Path) -> None: + """Directories starting with '.' are not scanned for skills.""" + for name in ("skill-a", "skill-b"): + sub = tmp_path / name + sub.mkdir() + (sub / "SKILL.md").write_text(f"---\nname: {name}\n---\n", encoding="utf-8") + hidden = tmp_path / ".hidden-skill" + hidden.mkdir() + (hidden / "SKILL.md").write_text("---\nname: hidden\n---\n", encoding="utf-8") + result = detect_skills(tmp_path) + assert result.is_multi_skill is True + assert len(result.skills) == 2 + names = {s.name for s in result.skills} + assert "hidden" not in names + + def test_nonexistent_path(self, tmp_path: Path) -> None: + """Non-existent path returns not multi-skill.""" + result = detect_skills(tmp_path / "does-not-exist") + assert result.is_multi_skill is False + + def test_skill_directory_paths_are_absolute(self, multi_skill_dir: Path) -> None: + """SkillDirectory.path should be an absolute path.""" + result = detect_skills(multi_skill_dir) + for skill in result.skills: + assert skill.path.is_absolute() + + def test_relative_path_is_dirname(self, multi_skill_dir: Path) -> None: + """SkillDirectory.relative_path is just the directory name.""" + result = detect_skills(multi_skill_dir) + for skill in result.skills: + assert "/" not in skill.relative_path + assert skill.relative_path == skill.path.name + + def test_fallback_name_from_dirname(self, tmp_path: Path) -> None: + """If SKILL.md has no name in frontmatter, use directory name.""" + for name in ("skill-a", "skill-b"): + sub = tmp_path / name + sub.mkdir() + (sub / "SKILL.md").write_text("---\ndescription: no name\n---\n", encoding="utf-8") + result = detect_skills(tmp_path) + assert result.is_multi_skill is True + names = {s.name for s in result.skills} + assert names == {"skill-a", "skill-b"} + + def test_lowercase_skill_md_detected(self, tmp_path: Path) -> None: + """skill.md (lowercase) is also recognized.""" + for name in ("alpha", "beta"): + sub = tmp_path / name + sub.mkdir() + (sub / "skill.md").write_text(f"---\nname: {name}\n---\n", encoding="utf-8") + result = detect_skills(tmp_path) + assert result.is_multi_skill is True + assert len(result.skills) == 2