From 58ee77281d32a18db5a516b278831014b7927453 Mon Sep 17 00:00:00 2001 From: Kenneth Belitzky Date: Sat, 9 May 2026 20:39:00 -0300 Subject: [PATCH] Add structure lint command and MCP tool --- docs/cli-reference.md | 32 ++++ docs/mcp-integration.md | 23 +++ structkit/commands/lint.py | 352 ++++++++++++++++++++++++++++++++++ structkit/main.py | 2 + structkit/mcp_server.py | 70 ++++++- tests/test_commands.py | 41 ++++ tests/test_mcp_integration.py | 31 +++ 7 files changed, 550 insertions(+), 1 deletion(-) create mode 100644 structkit/commands/lint.py diff --git a/docs/cli-reference.md b/docs/cli-reference.md index d40a891..510e123 100644 --- a/docs/cli-reference.md +++ b/docs/cli-reference.md @@ -80,6 +80,38 @@ structkit validate [-h] [-l LOG] [-c CONFIG_FILE] [-i LOG_FILE] yaml_file - `yaml_file`: Path to the YAML configuration file. +### `lint` + +Run stricter quality checks against one or more StructKit YAML files or structure names. `validate` checks whether a structure is syntactically usable; `lint` adds quality and safety checks that can flag risky, ambiguous, or inconsistent definitions. + +**Usage:** + +```sh +structkit lint [-h] [-l LOG] [-c CONFIG_FILE] [-i LOG_FILE] [-s STRUCTURES_PATH] [--all] [--json] [targets ...] +``` + +**Arguments:** + +- `targets`: One or more built-in structure names, custom structure names, or local `.yaml`/`.yml` files. +- `-s STRUCTURES_PATH, --structures-path STRUCTURES_PATH`: Path to custom structure definitions. Can be set via the `STRUCTKIT_STRUCTURES_PATH` environment variable. +- `--all`: Lint all bundled contrib structures. +- `--json`: Print machine-readable JSON containing `summary` counts and individual `issues`. + +**Rules and exit behavior:** + +- Errors: invalid YAML, missing targets, missing files, non-mapping top-level YAML, invalid variable/hook shapes, duplicate variables, duplicate file/folder entries, template syntax errors, undefined template variables, and clearly destructive hooks such as filesystem-root removal. +- Warnings: missing top-level descriptions, declared variables that are never referenced, suspicious hooks, unpinned GitHub remote URLs, and variable naming convention issues. +- The command exits with status `1` when one or more lint errors are found. Warnings alone do not cause a non-zero exit. + +Examples: + +```sh +structkit lint .struct.yaml +structkit lint structkit/contribs/project/python.yaml +structkit lint --all +structkit lint .struct.yaml --json +``` + ### `generate` Generate the project structure. diff --git a/docs/mcp-integration.md b/docs/mcp-integration.md index f66f0ba..693e0b5 100644 --- a/docs/mcp-integration.md +++ b/docs/mcp-integration.md @@ -103,6 +103,27 @@ Validate a structure configuration YAML file. **Parameters:** - `yaml_file` (required): Path to the YAML configuration file to validate +### 6. lint_structure +Lint one or more structure YAML files or structure names for quality and safety issues. + +```json +{ + "name": "lint_structure", + "arguments": { + "targets": ["project/python", "/path/to/.struct.yaml"], + "structures_path": "/path/to/custom/structures", + "lint_all": false, + "output": "json" + } +} +``` + +**Parameters:** +- `targets` (optional): YAML file paths or structure names to lint. Required unless `lint_all` is true. +- `structures_path` (optional): Custom path to structure definitions. +- `lint_all` (optional): Lint all bundled contrib structures (default: false). +- `output` (optional): Output format - "text" or "json" (default: "text"). + ## Usage ### Starting the MCP Server (FastMCP stdio / http / sse) @@ -246,6 +267,7 @@ The MCP tools can be chained together for complex workflows: 2. Get detailed info about a specific structure 3. Generate the structure with custom mappings 4. Validate any custom configurations +5. Lint structures for stricter quality and safety checks ### Integration Examples @@ -374,6 +396,7 @@ Once connected, you can use these tools: - `generate_structure` - Generate project structures - `get_structure_vars` - Inspect declared structure variables - `validate_structure` - Validate YAML configuration files +- `lint_structure` - Lint YAML files or structure names for quality and safety issues ## Troubleshooting diff --git a/structkit/commands/lint.py b/structkit/commands/lint.py new file mode 100644 index 0000000..b27811c --- /dev/null +++ b/structkit/commands/lint.py @@ -0,0 +1,352 @@ +import json +import os +import re +from dataclasses import asdict, dataclass +from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple + +import yaml +from jinja2 import Environment, TemplateSyntaxError, meta + +from structkit.commands import Command +from structkit.completers import structures_completer + + +@dataclass +class LintIssue: + severity: str + rule: str + message: str + file: str + context: str = "" + + +class LintCommand(Command): + """Run structure quality checks on StructKit YAML definitions.""" + + TEMPLATE_GLOBALS = { + 'current_repo', + 'env', + 'mappings', + 'now', + 'read_file', + 'uuid', + } + VARIABLE_NAME_RE = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$') + STABLE_REF_RE = re.compile(r'^[a-f0-9]{40}$|^v?\d+(?:\.\d+){1,3}(?:[-+][A-Za-z0-9_.-]+)?$') + + def __init__(self, parser): + super().__init__(parser) + parser.description = "Lint StructKit YAML files for quality and safety issues" + target_arg = parser.add_argument( + 'targets', + nargs='*', + help='YAML file paths or structure names to lint', + ) + target_arg.completer = structures_completer + parser.add_argument( + '-s', + '--structures-path', + type=str, + help='Path to structure definitions (env: STRUCTKIT_STRUCTURES_PATH)', + default=os.getenv('STRUCTKIT_STRUCTURES_PATH', None), + ) + parser.add_argument('--all', action='store_true', help='Lint all bundled contrib structures') + parser.add_argument('--json', action='store_true', help='Output lint results as JSON') + parser.set_defaults(func=self.execute) + + self.env = Environment( + trim_blocks=True, + block_start_string='{%@', + block_end_string='@%}', + variable_start_string='{{@', + variable_end_string='@}}', + comment_start_string='{#@', + comment_end_string='@#}', + ) + # Register lightweight stand-ins so Jinja can parse StructKit templates + # during static analysis without executing any helper/filter logic. + self.env.filters.update({ + 'default_branch': lambda value: value, + 'from_json': lambda value: value, + 'from_yaml': lambda value: value, + 'latest_release': lambda value: value, + 'slugify': lambda value: value, + 'to_json': lambda value: value, + 'to_yaml': lambda value: value, + }) + self.env.globals.update({name: None for name in self.TEMPLATE_GLOBALS}) + + def execute(self, args): + results = self.lint(args.targets, structures_path=args.structures_path, lint_all=args.all) + if args.json: + print(json.dumps(results, indent=2)) + else: + self._print_text(results) + + if results['summary']['errors'] > 0: + raise SystemExit(1) + + def lint( + self, + targets: Optional[Sequence[str]], + structures_path: Optional[str] = None, + lint_all: bool = False, + ) -> Dict[str, Any]: + files = self._resolve_targets(targets or [], structures_path, lint_all) + issues: List[LintIssue] = [] + + if not files: + issues.append(LintIssue( + 'error', + 'target-required', + 'Provide one or more YAML files, structure names, or --all.', + '', + )) + + for file_path in files: + issues.extend(self._lint_file(file_path)) + + error_count = sum(1 for issue in issues if issue.severity == 'error') + warning_count = sum(1 for issue in issues if issue.severity == 'warning') + return { + 'summary': { + 'files': len(files), + 'errors': error_count, + 'warnings': warning_count, + }, + 'issues': [asdict(issue) for issue in issues], + } + + def _resolve_targets( + self, + targets: Sequence[str], + structures_path: Optional[str], + lint_all: bool, + ) -> List[str]: + files: List[str] = [] + seen: Set[str] = set() + + def add(path: str): + full = os.path.abspath(path) + if full not in seen: + seen.add(full) + files.append(full) + + if lint_all: + for root, _, names in os.walk(self._contribs_path()): + for name in names: + if name.endswith(('.yaml', '.yml')): + add(os.path.join(root, name)) + + for target in targets: + resolved = self._resolve_target(target, structures_path) + add(resolved) + + return files + + def _resolve_target(self, target: str, structures_path: Optional[str]) -> str: + if target.startswith('file://'): + return target[7:] + if target.endswith(('.yaml', '.yml')) or os.path.exists(target): + return target + + candidates = [] + if structures_path: + candidates.append(os.path.join(structures_path, f'{target}.yaml')) + candidates.append(os.path.join(structures_path, f'{target}.yml')) + candidates.append(os.path.join(self._contribs_path(), f'{target}.yaml')) + candidates.append(os.path.join(self._contribs_path(), f'{target}.yml')) + + for candidate in candidates: + if os.path.exists(candidate): + return candidate + return candidates[0] if candidates else target + + def _contribs_path(self) -> str: + this_file = os.path.dirname(os.path.realpath(__file__)) + return os.path.normpath(os.path.join(this_file, '..', 'contribs')) + + def _lint_file(self, file_path: str) -> List[LintIssue]: + issues: List[LintIssue] = [] + if not os.path.exists(file_path): + return [LintIssue('error', 'file-not-found', f'File not found: {file_path}', file_path)] + + try: + with open(file_path, 'r') as handle: + config = yaml.safe_load(handle) or {} + except yaml.YAMLError as exc: + return [LintIssue('error', 'invalid-yaml', f'Invalid YAML: {exc}', file_path)] + except OSError as exc: + return [LintIssue('error', 'read-error', f'Failed to read file: {exc}', file_path)] + + if not isinstance(config, dict): + return [LintIssue('error', 'top-level-mapping', 'Top-level YAML content must be a mapping.', file_path)] + + if not str(config.get('description') or '').strip(): + issues.append(LintIssue('warning', 'missing-description', 'Missing top-level description.', file_path)) + + declared_vars, variable_issues = self._declared_variables(config, file_path) + issues.extend(variable_issues) + + references, reference_issues = self._referenced_variables(config, file_path) + issues.extend(reference_issues) + for name in sorted(references - declared_vars - self.TEMPLATE_GLOBALS): + issues.append(LintIssue( + 'error', + 'undefined-variable', + f"Template references variable '{name}' but it is not declared.", + file_path, + )) + for name in sorted(declared_vars - references): + issues.append(LintIssue( + 'warning', + 'unused-variable', + f"Variable '{name}' is declared but never referenced.", + file_path, + )) + + issues.extend(self._duplicate_entry_issues(config, file_path)) + issues.extend(self._hook_issues(config, file_path)) + issues.extend(self._remote_url_issues(config, file_path)) + return issues + + def _declared_variables(self, config: Dict[str, Any], file_path: str) -> Tuple[Set[str], List[LintIssue]]: + variables = config.get('variables', []) or [] + declared: Set[str] = set() + issues: List[LintIssue] = [] + if not isinstance(variables, list): + return declared, [LintIssue('error', 'invalid-variables', "The 'variables' key must be a list.", file_path)] + + for item in variables: + if not isinstance(item, dict): + issues.append(LintIssue('error', 'invalid-variable-entry', 'Each variable entry must be a mapping.', file_path)) + continue + if isinstance(item.get('name'), str): + names = [item['name']] + else: + names = list(item.keys()) + for name in names: + if not isinstance(name, str): + issues.append(LintIssue('error', 'invalid-variable-name', 'Variable names must be strings.', file_path)) + continue + if name in declared: + issues.append(LintIssue('error', 'duplicate-variable', f"Variable '{name}' is declared more than once.", file_path)) + declared.add(name) + if not self.VARIABLE_NAME_RE.match(name): + issues.append(LintIssue( + 'warning', + 'naming-convention', + f"Variable '{name}' should use a Python/Jinja-friendly identifier name.", + file_path, + )) + return declared, issues + + def _referenced_variables(self, config: Dict[str, Any], file_path: str) -> Tuple[Set[str], List[LintIssue]]: + references: Set[str] = set() + issues: List[LintIssue] = [] + for context, value in self._template_values(config): + try: + parsed = self.env.parse(value) + references.update(meta.find_undeclared_variables(parsed)) + except TemplateSyntaxError as exc: + issues.append(LintIssue('error', 'template-syntax', f'Template syntax error: {exc}', file_path, context)) + return references, issues + + def _template_values(self, config: Dict[str, Any]) -> Iterable[Tuple[str, str]]: + for key in ('files', 'structure'): + for item in config.get(key, []) or []: + if isinstance(item, dict): + for path, content in item.items(): + yield f'{key}.{path}', str(path) + yield from self._string_values(content, f'{key}.{path}') + elif isinstance(item, str): + yield key, item + for item in config.get('folders', []) or []: + if isinstance(item, dict): + for path, content in item.items(): + yield f'folders.{path}', str(path) + if isinstance(content, dict): + yield from self._string_values(content.get('with', {}), f'folders.{path}.with') + elif isinstance(item, str): + yield 'folders', item + for key in ('pre_hooks', 'post_hooks'): + yield from self._string_values(config.get(key, []), key) + + def _string_values(self, value: Any, context: str) -> Iterable[Tuple[str, str]]: + if isinstance(value, str): + yield context, value + elif isinstance(value, dict): + for key, child in value.items(): + yield from self._string_values(child, f'{context}.{key}') + elif isinstance(value, list): + for index, child in enumerate(value): + yield from self._string_values(child, f'{context}[{index}]') + + def _duplicate_entry_issues(self, config: Dict[str, Any], file_path: str) -> List[LintIssue]: + issues: List[LintIssue] = [] + for key in ('files', 'structure', 'folders'): + seen: Set[str] = set() + for item in config.get(key, []) or []: + if isinstance(item, dict): + names = [str(name) for name in item.keys()] + else: + names = [str(item)] + for name in names: + normalized = os.path.normpath(name) + if normalized in seen: + issues.append(LintIssue('error', f'duplicate-{key}-entry', f"Duplicate {key} entry '{name}'.", file_path)) + seen.add(normalized) + return issues + + def _hook_issues(self, config: Dict[str, Any], file_path: str) -> List[LintIssue]: + issues: List[LintIssue] = [] + for key in ('pre_hooks', 'post_hooks'): + hooks = config.get(key, []) or [] + if hooks and not isinstance(hooks, list): + issues.append(LintIssue('error', 'invalid-hooks', f"The '{key}' key must be a list of shell commands.", file_path)) + continue + for command in hooks: + if not isinstance(command, str): + issues.append(LintIssue('error', 'invalid-hook-command', f"Each item in '{key}' must be a string.", file_path)) + continue + compact = re.sub(r'\s+', ' ', command.strip().lower()) + if re.search(r'\brm\s+-[a-z]*r[f]?\s+/(?:\s|$)', compact): + issues.append(LintIssue('error', 'unsafe-hook', 'Hook appears to remove the filesystem root.', file_path, key)) + elif re.search(r'\b(curl|wget)\b.*\|\s*(sh|bash)\b', compact): + issues.append(LintIssue('warning', 'suspicious-hook', 'Hook pipes remote content directly into a shell.', file_path, key)) + elif re.search(r'\b(eval|sudo)\b|chmod\s+777', compact): + issues.append(LintIssue('warning', 'suspicious-hook', 'Hook uses a potentially risky shell operation.', file_path, key)) + return issues + + def _remote_url_issues(self, config: Dict[str, Any], file_path: str) -> List[LintIssue]: + issues: List[LintIssue] = [] + for context, value in self._string_values(config, ''): + if not isinstance(value, str) or not value.startswith(('http://', 'https://')): + continue + if self._is_unpinned_remote(value): + issues.append(LintIssue( + 'warning', + 'unpinned-remote-url', + f"Remote URL is not pinned to a stable ref: {value}", + file_path, + context, + )) + return issues + + def _is_unpinned_remote(self, url: str) -> bool: + raw_match = re.search(r'raw\.githubusercontent\.com/[^/]+/[^/]+/([^/]+)/', url) + if raw_match: + return not self.STABLE_REF_RE.match(raw_match.group(1)) + github_blob = re.search(r'github\.com/[^/]+/[^/]+/(?:blob|raw)/([^/]+)/', url) + if github_blob: + return not self.STABLE_REF_RE.match(github_blob.group(1)) + return False + + def _print_text(self, results: Dict[str, Any]): + summary = results['summary'] + print(f"Linted {summary['files']} file(s): {summary['errors']} error(s), {summary['warnings']} warning(s)") + for issue in results['issues']: + location = issue['file'] or '' + if issue.get('context'): + location = f"{location} ({issue['context']})" + print(f"{issue['severity'].upper()}: {location}: [{issue['rule']}] {issue['message']}") diff --git a/structkit/main.py b/structkit/main.py index 8f05631..cc4cd09 100644 --- a/structkit/main.py +++ b/structkit/main.py @@ -8,6 +8,7 @@ from structkit.commands.vars import VarsCommand from structkit.commands.explain import ExplainCommand from structkit.commands.validate import ValidateCommand +from structkit.commands.lint import LintCommand from structkit.commands.list import ListCommand from structkit.commands.search import SearchCommand from structkit.commands.generate_schema import GenerateSchemaCommand @@ -35,6 +36,7 @@ def get_parser(): InfoCommand(subparsers.add_parser('info', help='Show information about the package')) ValidateCommand(subparsers.add_parser('validate', help='Validate the YAML configuration file')) + LintCommand(subparsers.add_parser('lint', help='Lint structure YAML files for quality issues')) GenerateCommand(subparsers.add_parser('generate', help='Generate the project structure')) VarsCommand(subparsers.add_parser('vars', help='Inspect structure variables')) ExplainCommand(subparsers.add_parser('explain', help='Explain structure resolution without generating files')) diff --git a/structkit/mcp_server.py b/structkit/mcp_server.py index bb8c1fa..bc62804 100644 --- a/structkit/mcp_server.py +++ b/structkit/mcp_server.py @@ -8,18 +8,20 @@ 4. Validating structure configurations 5. Inspecting structure variables 6. Explaining structure resolution without generating files +7. Linting structure definitions for quality and safety issues """ import asyncio import logging import os import sys import yaml -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from fastmcp import FastMCP from structkit.commands.generate import GenerateCommand from structkit.commands.validate import ValidateCommand +from structkit.commands.lint import LintCommand from structkit.commands.vars import VarsCommand from structkit.commands.explain import ExplainCommand from structkit import __version__ @@ -266,6 +268,31 @@ def _get_structure_vars_logic( finally: sys.stdout = old + + def _lint_structure_logic( + self, + targets: Optional[List[str]] = None, + structures_path: Optional[str] = None, + lint_all: bool = False, + output: str = "text", + ) -> str: + import argparse + dummy_parser = argparse.ArgumentParser() + lint_command = LintCommand(dummy_parser) + results = lint_command.lint(targets or [], structures_path=structures_path, lint_all=lint_all) + if output == "json": + import json + return json.dumps(results, indent=2) + from io import StringIO + buf = StringIO() + old = sys.stdout + sys.stdout = buf + try: + lint_command._print_text(results) + return buf.getvalue().strip() + finally: + sys.stdout = old + # ===================== # FastMCP tool registration (maps to logic above) # ===================== @@ -371,6 +398,28 @@ async def generate_structure( self.logger.debug(f"MCP response: generate_structure len={len(result)} preview=\n{preview}") return result + + @self.app.tool(name="lint_structure", description="Lint structure YAML files for quality and safety issues") + async def lint_structure( + targets: Optional[List[str]] = None, + structures_path: Optional[str] = None, + lint_all: bool = False, + output: str = "text", + ) -> str: + self.logger.debug( + "MCP request: lint_structure args=%s", + { + "targets": targets, + "structures_path": structures_path, + "lint_all": lint_all, + "output": output, + }, + ) + result = self._lint_structure_logic(targets, structures_path, lint_all, output) + preview = result if len(result) <= 1000 else result[:1000] + f"... [truncated {len(result)-1000} chars]" + self.logger.debug(f"MCP response: lint_structure len={len(result)} preview=\n{preview}") + return result + @self.app.tool(name="validate_structure", description="Validate a structure configuration YAML file") async def validate_structure(yaml_file: str) -> str: self.logger.debug(f"MCP request: validate_structure args={{'yaml_file': {yaml_file!r}}}") @@ -501,6 +550,25 @@ def __init__(self, content): return MockResult([MockContent(result_text)]) + async def _handle_lint_structure(self, params: Dict[str, Any]): + """Compatibility method for tests that expect MCP-style responses.""" + result_text = self._lint_structure_logic( + params.get('targets', []), + params.get('structures_path'), + params.get('lint_all', False), + params.get('output', 'text'), + ) + + class MockContent: + def __init__(self, text): + self.text = text + + class MockResult: + def __init__(self, content): + self.content = content + + return MockResult([MockContent(result_text)]) + async def main(): logging.basicConfig(level=logging.INFO) diff --git a/tests/test_commands.py b/tests/test_commands.py index b6ea1c0..d2888f7 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -618,3 +618,44 @@ def test_vars_command_invalid_config_exits_nonzero(tmp_path): with pytest.raises(SystemExit) as exc: command.execute(args) assert exc.value.code == 1 + + +def test_lint_command_detects_errors_and_json(parser, tmp_path, capsys): + from structkit.commands.lint import LintCommand + + command = LintCommand(parser) + structure = tmp_path / 'bad.yaml' + structure.write_text(''' +files: + - README.md: + content: "# {{@ project_name @}}" + - README.md: + content: duplicate +variables: + - unused_var: + type: string +post_hooks: + - "curl https://example.com/install.sh | sh" +''') + + args = parser.parse_args([str(structure), '--json']) + try: + command.execute(args) + except SystemExit as exc: + assert exc.code == 1 + output = json.loads(capsys.readouterr().out) + assert output['summary']['errors'] == 2 + rules = {issue['rule'] for issue in output['issues']} + assert 'undefined-variable' in rules + assert 'duplicate-files-entry' in rules + assert 'missing-description' in rules + assert 'suspicious-hook' in rules + + +def test_lint_command_all_registers_in_main_parser(): + from structkit.main import get_parser + + parser = get_parser() + args = parser.parse_args(['lint', '--all']) + assert hasattr(args, 'all') + assert args.all is True diff --git a/tests/test_mcp_integration.py b/tests/test_mcp_integration.py index 99eaab3..30df4f3 100644 --- a/tests/test_mcp_integration.py +++ b/tests/test_mcp_integration.py @@ -25,6 +25,7 @@ def test_get_structure_vars_tool_is_registered(self): tools = asyncio.run(self.server.app.list_tools()) tool_names = [tool.name for tool in tools] self.assertIn('get_structure_vars', tool_names) + self.assertIn('lint_structure', tool_names) def test_list_structures_logic(self): text = self.server._list_structures_logic() @@ -120,6 +121,36 @@ def test_get_structure_vars_compat_handler(self): finally: os.unlink(f.name) + + def test_lint_structure_logic_and_compat_handler(self): + text = self.server._lint_structure_logic([]) + self.assertIn("Provide one or more YAML files", text) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + yaml.dump({ + 'description': 'Test structure', + 'files': [ + {'README.md': {'content': '# {{@ project_name @}}'}} + ], + 'variables': [ + {'project_name': {'type': 'string', 'default': 'demo'}} + ] + }, f) + f.flush() + try: + json_text = self.server._lint_structure_logic([f.name], output='json') + data = json.loads(json_text) + self.assertEqual(data['summary']['errors'], 0) + + result = asyncio.run(self.server._handle_lint_structure({ + 'targets': [f.name], + 'output': 'json', + })) + data = json.loads(result.content[0].text) + self.assertEqual(data['summary']['files'], 1) + finally: + os.unlink(f.name) + def test_validate_structure_logic(self): # Missing yaml_file text = self.server._validate_structure_logic(None)