Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 135 additions & 15 deletions src/skillspector/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from skillspector import __version__
from skillspector.graph import graph
from skillspector.logging_config import get_logger, set_level
from skillspector.multi_skill import MultiSkillDetectionResult, detect_skills

logger = get_logger(__name__)

Expand Down Expand Up @@ -171,6 +172,14 @@ def scan(
help="Directory containing additional YARA rule files (.yar/.yara) to load alongside built-in rules.",
),
] = None,
recursive: Annotated[
bool,
typer.Option(
"--recursive",
"-r",
help="Scan directories containing multiple skills (immediate subdirectories with SKILL.md) independently.",
),
] = False,
verbose: Annotated[
bool,
typer.Option(
Expand All @@ -188,6 +197,7 @@ def scan(
skillspector scan ./my-skill/
skillspector scan ./my-skill/ --format json --output report.json
skillspector scan https://github.com/user/my-skill --no-llm
skillspector scan ./skill-collection/ --recursive

Environment variables:

Expand All @@ -205,33 +215,41 @@ def scan(
ANTHROPIC_API_KEY for SKILLSPECTOR_PROVIDER=anthropic
NVIDIA_INFERENCE_KEY for the NVIDIA providers
"""
if verbose:
set_level("DEBUG")

resolved_path = Path(input_path).resolve()
if recursive and resolved_path.is_dir():
detection = detect_skills(resolved_path)
if detection.is_multi_skill:
_scan_multi_skill(detection, format, output, no_llm, yara_rules_dir, verbose)
return
if not detection.has_root_skill and len(detection.skills) == 0:
console.print(
"[yellow]Warning:[/yellow] --recursive specified but no sub-skills "
"detected. Scanning as single skill."
)
elif resolved_path.is_dir():
detection = detect_skills(resolved_path)
if detection.is_multi_skill:
console.print(
f"[yellow]Warning:[/yellow] Found {len(detection.skills)} skills in "
f"this directory. Use --recursive to scan each independently."
)

result = None
try:
yara_dir = str(yara_rules_dir.resolve()) if yara_rules_dir else None
state = _scan_state(input_path, format, no_llm, yara_rules_dir=yara_dir)
if verbose:
set_level("DEBUG")
console.print("[dim]Running scan...[/dim]")
logger.debug(
"Scan started: input_path=%s, format=%s, use_llm=%s",
input_path,
format,
not no_llm,
)
env = os.environ.get("ENV", "dev")
tags = ["skillspector", f"environment:{env}"]
extra_tags = os.environ.get("LANGCHAIN_TAGS_EXTRA", "")
tags.extend(t.strip() for t in extra_tags.split(",") if t.strip())
trace_config: RunnableConfig = {
"run_name": "skillspector-scan",
"tags": tags,
"metadata": {
"input_path": input_path,
"use_llm": not no_llm,
"output_format": format.value,
"version": __version__,
},
}
trace_config = _build_trace_config(input_path, format, no_llm)
result = graph.invoke(state, config=trace_config)

_write_result(result, output, format)
Expand All @@ -254,5 +272,107 @@ def scan(
_cleanup_result(result)


def _build_trace_config(input_path: str, format: FormatChoice, no_llm: bool) -> RunnableConfig:
"""Build LangSmith trace config for a scan invocation."""
env = os.environ.get("ENV", "dev")
tags = ["skillspector", f"environment:{env}"]
extra_tags = os.environ.get("LANGCHAIN_TAGS_EXTRA", "")
tags.extend(t.strip() for t in extra_tags.split(",") if t.strip())
return {
"run_name": "skillspector-scan",
"tags": tags,
"metadata": {
"input_path": input_path,
"use_llm": not no_llm,
"output_format": format.value,
"version": __version__,
},
}


def _scan_multi_skill(
detection: MultiSkillDetectionResult,
format: FormatChoice,
output: Path | None,
no_llm: bool,
yara_rules_dir: Path | None,
verbose: bool,
) -> None:
"""Scan each detected sub-skill independently and produce a combined report."""
skills = detection.skills
console.print(f"[bold]Multi-skill directory detected:[/bold] {len(skills)} skills found\n")

results: list[dict[str, object]] = []
max_score = 0

for i, skill in enumerate(skills, 1):
console.print(
f" [{i}/{len(skills)}] Scanning [bold]{skill.name}[/bold] ({skill.relative_path}/)"
)
yara_dir = str(yara_rules_dir.resolve()) if yara_rules_dir else None
state = _scan_state(str(skill.path), format, no_llm, yara_rules_dir=yara_dir)
trace_config = _build_trace_config(str(skill.path), format, no_llm)

try:
result = graph.invoke(state, config=trace_config)
results.append(result)
score = result.get("risk_score") or 0
if isinstance(score, int) and score > max_score:
max_score = score
severity = result.get("risk_severity") or "LOW"
console.print(f" Score: {score}/100 ({severity})\n")
except Exception as e:
console.print(f" [red]Error:[/red] {e}\n")
results.append({"skill_name": skill.name, "error": str(e)})

console.print("\n[bold]═══ Multi-Skill Summary ═══[/bold]\n")
console.print(f" {'Skill':<30} {'Score':<8} {'Severity':<12} {'Findings':<10}")
console.print(f" {'─' * 30} {'─' * 8} {'─' * 12} {'─' * 10}")

for skill, result in zip(skills, results, strict=True):
if "error" in result:
console.print(f" {skill.name:<30} {'ERROR':<8} {'—':<12} {'—':<10}")
continue
score = result.get("risk_score", 0)
severity = result.get("risk_severity", "LOW")
filtered = result.get("filtered_findings") or result.get("findings")
finding_count = len(filtered) if isinstance(filtered, list) else 0
console.print(f" {skill.name:<30} {score:<8} {severity:<12} {finding_count:<10}")

console.print("")

if output and format == FormatChoice.json:
combined = {
"multi_skill": True,
"skill_count": len(skills),
"max_risk_score": max_score,
"skills": [],
}
for skill, result in zip(skills, results, strict=True):
if "error" in result:
combined["skills"].append({"name": skill.name, "error": result["error"]})
else:
combined["skills"].append(
{
"name": skill.name,
"path": skill.relative_path,
"risk_score": result.get("risk_score", 0),
"risk_severity": result.get("risk_severity", "LOW"),
"finding_count": len(
result.get("filtered_findings") or result.get("findings") or []
),
}
)
Path(output).write_text(json.dumps(combined, indent=2), encoding="utf-8")
console.print(f"[green]Combined report saved to:[/green] {output}")
elif output:
for _skill, result in zip(skills, results, strict=True):
if "error" not in result:
_write_result(result, None, format)

if max_score > 50:
raise typer.Exit(code=1)


if __name__ == "__main__":
app()
127 changes: 127 additions & 0 deletions src/skillspector/multi_skill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Multi-skill directory detection and per-skill scanning.

Detects when a scanned directory contains multiple independent skills
(each with their own SKILL.md) and supports scanning each independently
to produce per-skill reports instead of one inflated monolithic result.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path

from skillspector.logging_config import get_logger

logger = get_logger(__name__)


@dataclass
class SkillDirectory:
"""A detected skill within a multi-skill directory."""

path: Path
name: str
relative_path: str


@dataclass
class MultiSkillDetectionResult:
"""Result of scanning a directory for multiple skills."""

is_multi_skill: bool
skills: list[SkillDirectory] = field(default_factory=list)
has_root_skill: bool = False


def detect_skills(directory: Path) -> MultiSkillDetectionResult:
"""Detect whether a directory contains multiple independent skills.

A directory is considered multi-skill when:
- It has NO root-level SKILL.md (or skill.md)
- At least 2 immediate subdirectories contain SKILL.md (or skill.md)

If a root SKILL.md exists, the directory is treated as a single skill
(the standard behavior) regardless of nested SKILL.md files.

Returns a MultiSkillDetectionResult with detected skills.
"""
if not directory.is_dir():
return MultiSkillDetectionResult(is_multi_skill=False)

has_root = _has_skill_md(directory)
if has_root:
return MultiSkillDetectionResult(is_multi_skill=False, has_root_skill=True)

skills: list[SkillDirectory] = []
for child in sorted(directory.iterdir()):
if not child.is_dir():
continue
if child.name.startswith("."):
continue
if _has_skill_md(child):
name = _extract_skill_name(child)
skills.append(
SkillDirectory(
path=child,
name=name,
relative_path=child.name,
)
)

is_multi = len(skills) >= 2
return MultiSkillDetectionResult(
is_multi_skill=is_multi,
skills=skills,
has_root_skill=False,
)


def _has_skill_md(directory: Path) -> bool:
"""Check if directory contains a SKILL.md or skill.md at root level."""
return (directory / "SKILL.md").is_file() or (directory / "skill.md").is_file()


def _extract_skill_name(skill_dir: Path) -> str:
"""Extract skill name from SKILL.md frontmatter, falling back to directory name."""
import re

import yaml

for name in ("SKILL.md", "skill.md"):
path = skill_dir / name
if not path.is_file():
continue
try:
content = path.read_text(encoding="utf-8", errors="replace")
except OSError:
continue
if not content.startswith("---"):
break
end_match = re.search(r"\n---\s*\n", content[3:])
if not end_match:
break
frontmatter = content[3 : end_match.start() + 3]
try:
data = yaml.safe_load(frontmatter)
except yaml.YAMLError:
break
if isinstance(data, dict) and "name" in data:
return str(data["name"])
break

return skill_dir.name
Loading