diff --git a/changelog/655.improvement.md b/changelog/655.improvement.md new file mode 100644 index 000000000..ecfaec2dc --- /dev/null +++ b/changelog/655.improvement.md @@ -0,0 +1,5 @@ +Reworked the layout of execution output directories. +New executions now write to ``////`` +instead of ``///``, +so reruns of the same diagnostic group no longer overwrite earlier outputs. +Existing rows on disk continue to resolve through their stored ``Execution.output_fragment``. diff --git a/packages/climate-ref/src/climate_ref/executor/fragment.py b/packages/climate-ref/src/climate_ref/executor/fragment.py index 86a31965c..f61ce980c 100644 --- a/packages/climate-ref/src/climate_ref/executor/fragment.py +++ b/packages/climate-ref/src/climate_ref/executor/fragment.py @@ -3,7 +3,31 @@ """ import datetime +import hashlib +import re +from collections.abc import Iterable, Mapping from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + from climate_ref.models.execution import Execution + +_TOKEN_RE = re.compile(r"[^A-Za-z0-9_-]+") +_DEFAULT_TOKEN_LIMIT = 64 +_GROUP_SHORT_MAX = 96 + +PLACEHOLDER_FRAGMENT = "_pending" +"""Output-fragment placeholder used until ``Execution.id`` is known.""" + +_DEFAULT_DIAGNOSTIC_VERSION = 1 +""" +Default integer version baked into the output-fragment hash and suffix. + +Diagnostics do not currently expose a version attribute; this constant is the +value used for every diagnostic until that is introduced. +""" def allocate_output_fragment(base_fragment: str, results_dir: Path) -> str: @@ -42,3 +66,158 @@ def allocate_output_fragment(base_fragment: str, results_dir: Path) -> str: ) return fragment + + +_UNSAFE_SEGMENT_RE = re.compile(r"[/\\\x00]") + + +def _validate_path_segment(value: str, *, label: str) -> str: + r""" + Ensure *value* is a safe single path segment. + + Rejects empty strings, path separators (``/``, ``\``), NUL bytes, and dot-segments + (``.``, ``..``) so that downstream ``Path`` joins cannot escape the intended + results/scratch base directories. + """ + if not value or _UNSAFE_SEGMENT_RE.search(value) or value in (".", ".."): + raise ValueError(f"Invalid {label}: {value!r} is not a safe single path segment") + return value + + +def _sanitize_token(value: str) -> str: + """ + Sanitize a single selector value to ``[A-Za-z0-9_-]+``. + + Non-conforming characters are collapsed to a single underscore. + Leading/trailing underscores are stripped. + """ + cleaned = _TOKEN_RE.sub("_", value).strip("_") + return cleaned + + +def _truncate_at_boundary(text: str, limit: int) -> str: + """ + Truncate *text* at *limit* characters, preferring an underscore boundary. + + If *text* is at or below *limit*, return it unchanged. + Otherwise truncate to the rightmost ``_`` at or before *limit*; fall back to a + hard cut if no boundary exists. + """ + if len(text) <= limit: + return text + head = text[:limit] + boundary = head.rfind("_") + if boundary > 0: + return head[:boundary] + return head + + +def compute_group_short( + selectors: Mapping[str, Iterable[tuple[str, str]]], + group_id: int, + diagnostic_version: int, + *, + token_limit: int = _DEFAULT_TOKEN_LIMIT, +) -> str: + """ + Compute a short, deterministic, human-readable path segment for an execution group. + + Human-readable hint for operators browsing the filesystem. + Not unique -- ``execution_id`` is the uniqueness guarantee. + + Selector values across all source types are sorted (first by source-type key, + then by facet key), + sanitized to ``[A-Za-z0-9_-]``, + joined by ``_``, + and truncated to *token_limit* characters at an underscore boundary. + A suffix ``_g{group_id}_v{diagnostic_version}_{digest}`` is appended, + where ``digest`` is an 8-character BLAKE2s hash of the canonical + ``group_id|diagnostic_version|sorted_selectors`` representation. + + The returned string is ASCII, capped at roughly 96 characters, + and deterministic for fixed inputs. + + Parameters + ---------- + selectors + Mapping from source-type key (e.g. ``"cmip6"``) to an iterable of + ``(facet_key, facet_value)`` tuples. + group_id + The ``ExecutionGroup.id`` this fragment belongs to. + diagnostic_version + The integer ``Diagnostic.version`` at solve time. + token_limit + Maximum length of the sanitized selector portion before the suffix. + + Returns + ------- + : + A short, sanitized, deterministic identifier suitable for use as a + filesystem path segment. + """ + # Build a canonical representation: sort source types, then facet pairs. + canonical_pairs: list[tuple[str, tuple[tuple[str, str], ...]]] = [] + for source_type in sorted(selectors.keys()): + pairs = tuple(sorted((str(k), str(v)) for k, v in selectors[source_type])) + canonical_pairs.append((source_type, pairs)) + + # Build the human-readable token portion from the values only. + raw_tokens: list[str] = [] + for _, pairs in canonical_pairs: + for _, value in pairs: + token = _sanitize_token(value) + if token: + raw_tokens.append(token) + + token_part = _truncate_at_boundary("_".join(raw_tokens), token_limit) + + # Stable hash input: group_id, version, and the canonical selector pairs. + # BLAKE2s with a 4-byte digest emits an 8-char hex string without truncation; + # it is non-cryptographic for our purposes but avoids the deprecated-hash linter. + hash_payload = repr((group_id, diagnostic_version, canonical_pairs)).encode("utf-8") + digest = hashlib.blake2s(hash_payload, digest_size=4).hexdigest() + + suffix = f"_g{group_id}_v{diagnostic_version}_{digest}" + + if token_part: + result = f"{token_part}{suffix}" + else: + # Strip leading underscore so we don't start with one. + result = suffix.lstrip("_") + + if len(result) > _GROUP_SHORT_MAX: + # Trim the token portion further so the suffix is preserved. + overflow = len(result) - _GROUP_SHORT_MAX + trimmed_token = _truncate_at_boundary(token_part, max(0, len(token_part) - overflow)) + result = f"{trimmed_token}{suffix}" if trimmed_token else suffix.lstrip("_") + + # Hard cap: boundary-aware trimming above may still leave overflow when no + # underscore boundary exists close enough to the limit. + if len(result) > _GROUP_SHORT_MAX: + result = result[:_GROUP_SHORT_MAX] + + return result + + +def assign_execution_fragment( # noqa: PLR0913 + session: "Session", + execution: "Execution", + *, + provider_slug: str, + diagnostic_slug: str, + selectors: Mapping[str, Iterable[tuple[str, str]]], + group_id: int, + diagnostic_version: int = _DEFAULT_DIAGNOSTIC_VERSION, +) -> str: + """Flush *execution* to materialise its id, then assign the canonical output fragment. + + Returns the assigned fragment string. + """ + _validate_path_segment(provider_slug, label="provider slug") + _validate_path_segment(diagnostic_slug, label="diagnostic slug") + session.flush() + group_short = compute_group_short(selectors, group_id=group_id, diagnostic_version=diagnostic_version) + fragment = str(Path(provider_slug) / diagnostic_slug / group_short / str(execution.id)) + execution.output_fragment = fragment + session.flush() + return fragment diff --git a/packages/climate-ref/src/climate_ref/executor/reingest.py b/packages/climate-ref/src/climate_ref/executor/reingest.py index e148a64bf..23df88ed5 100644 --- a/packages/climate-ref/src/climate_ref/executor/reingest.py +++ b/packages/climate-ref/src/climate_ref/executor/reingest.py @@ -20,7 +20,7 @@ from loguru import logger from climate_ref.datasets import get_slug_column -from climate_ref.executor.fragment import allocate_output_fragment +from climate_ref.executor.fragment import PLACEHOLDER_FRAGMENT, assign_execution_fragment from climate_ref.executor.result_handling import handle_execution_result from climate_ref.models.execution import ( Execution, @@ -45,6 +45,10 @@ from climate_ref_core.diagnostics import Diagnostic +class _ReingestSavepointAbort(Exception): + """Internal sentinel used to roll back the savepoint on a soft-fail path.""" + + def reconstruct_execution_definition( config: "Config", execution: Execution, @@ -242,48 +246,67 @@ def reingest_execution( diagnostic, scratch_dir = resolved execution_group = execution.execution_group + provider_slug = execution_group.diagnostic.provider.slug + diagnostic_slug = execution_group.diagnostic.slug - # Allocate a new output fragment with a timestamp suffix - # Previous execution scratch will be copied there - new_fragment = allocate_output_fragment(execution.output_fragment, config.paths.results) - new_scratch_dir = config.paths.scratch / new_fragment + # Convert the JSON-stored selector dict (lists-of-pairs) back into the + # mapping[str, iterable[tuple[str, str]]] shape that ``compute_group_short`` expects. + selectors = { + source_key: [tuple(pair) for pair in pairs] for source_key, pairs in execution_group.selectors.items() + } + + new_fragment: str | None = None + new_scratch_dir: Path | None = None + new_execution: Execution | None = None try: - new_scratch_dir.parent.mkdir(parents=True, exist_ok=True) - shutil.copytree(scratch_dir, new_scratch_dir) + try: + with database.session.begin_nested(): + new_execution = Execution( + execution_group=execution_group, + dataset_hash=execution.dataset_hash, + output_fragment=PLACEHOLDER_FRAGMENT, + ) + database.session.add(new_execution) + + new_fragment = assign_execution_fragment( + database.session, + new_execution, + provider_slug=provider_slug, + diagnostic_slug=diagnostic_slug, + selectors=selectors, + group_id=execution_group.id, + ) - definition = reconstruct_execution_definition( - config, execution, diagnostic, output_fragment=new_fragment - ) + new_scratch_dir = config.paths.scratch / new_fragment + new_scratch_dir.parent.mkdir(parents=True, exist_ok=True) + shutil.copytree(scratch_dir, new_scratch_dir) - result = diagnostic.build_execution_result(definition) + definition = reconstruct_execution_definition( + config, execution, diagnostic, output_fragment=new_fragment + ) - if not result.successful or result.metric_bundle_filename is None: - logger.warning( - f"build_execution_result returned unsuccessful result for execution {execution.id}. Skipping." - ) - if new_scratch_dir.exists(): - shutil.rmtree(new_scratch_dir) - return False + result = diagnostic.build_execution_result(definition) - with database.session.begin_nested(): - # Create new Execution record - new_execution = Execution( - execution_group=execution_group, - dataset_hash=execution.dataset_hash, - output_fragment=new_fragment, - ) - database.session.add(new_execution) - database.session.flush() - - # Copy dataset links from the original execution - for dataset in execution.datasets: - database.session.execute( - execution_datasets.insert().values( - execution_id=new_execution.id, - dataset_id=dataset.id, + if not result.successful or result.metric_bundle_filename is None: + logger.warning( + f"build_execution_result returned unsuccessful result " + f"for execution {execution.id}. Skipping." ) - ) + raise _ReingestSavepointAbort + + # Copy dataset links from the original execution + for dataset in execution.datasets: + database.session.execute( + execution_datasets.insert().values( + execution_id=new_execution.id, + dataset_id=dataset.id, + ) + ) + except _ReingestSavepointAbort: + if new_scratch_dir is not None and new_scratch_dir.exists(): + shutil.rmtree(new_scratch_dir) + return False handle_execution_result( config, @@ -294,11 +317,12 @@ def reingest_execution( ) except Exception: logger.exception(f"Ingestion failed for execution {execution.id}. Rolling back changes.") - if new_scratch_dir.exists(): + if new_scratch_dir is not None and new_scratch_dir.exists(): shutil.rmtree(new_scratch_dir) - new_results_dir = config.paths.results / new_fragment - if new_results_dir.exists(): - shutil.rmtree(new_results_dir) + if new_fragment is not None: + new_results_dir = config.paths.results / new_fragment + if new_results_dir.exists(): + shutil.rmtree(new_results_dir) return False logger.info(f"Successfully reingested execution {execution.id} -> new execution {new_execution.id}") diff --git a/packages/climate-ref/src/climate_ref/solver.py b/packages/climate-ref/src/climate_ref/solver.py index c4672c7c5..ebee55a2d 100644 --- a/packages/climate-ref/src/climate_ref/solver.py +++ b/packages/climate-ref/src/climate_ref/solver.py @@ -10,6 +10,7 @@ import typing from collections.abc import Mapping, Sequence +import attrs import pandas as pd from attrs import define, frozen from loguru import logger @@ -24,6 +25,7 @@ PMPClimatologyDatasetAdapter, get_slug_column, ) +from climate_ref.executor.fragment import PLACEHOLDER_FRAGMENT, assign_execution_fragment from climate_ref.models import Diagnostic as DiagnosticModel from climate_ref.models import ExecutionGroup from climate_ref.models import Provider as ProviderModel @@ -98,13 +100,16 @@ def selectors(self) -> dict[str, Selector]: def build_execution_definition(self, output_root: pathlib.Path) -> ExecutionDefinition: """ - Build the execution definition for the current diagnostic execution + Build the execution definition for the current diagnostic execution. + + The returned definition uses a placeholder fragment for the output directory. + ``solve_required_executions`` rewrites ``output_directory`` via + :func:`attrs.evolve` once the new ``Execution.id`` is known. """ # Ensure that the output root is always an absolute path output_root = output_root.resolve() - # This is the desired path relative to the output directory - fragment = pathlib.Path() / self.provider.slug / self.diagnostic.slug / self.datasets.hash + fragment = pathlib.Path() / self.provider.slug / self.diagnostic.slug / PLACEHOLDER_FRAGMENT return ExecutionDefinition( diagnostic=self.diagnostic, @@ -698,10 +703,24 @@ def solve_required_executions( # noqa: PLR0912, PLR0913, PLR0915 execution = Execution( execution_group=execution_group, dataset_hash=definition.datasets.hash, - output_fragment=str(definition.output_fragment()), + output_fragment=PLACEHOLDER_FRAGMENT, ) db.session.add(execution) - db.session.flush() + + fragment = assign_execution_fragment( + db.session, + execution, + provider_slug=provider_slug, + diagnostic_slug=potential_execution.diagnostic.slug, + selectors=potential_execution.selectors, + group_id=execution_group.id, + ) + + # Rebuild the definition so the executor sees the resolved output path. + definition = attrs.evolve( + definition, + output_directory=config.paths.scratch.resolve() / pathlib.Path(fragment), + ) # Add links to the datasets used in the execution execution.register_datasets(db, definition.datasets) diff --git a/packages/climate-ref/src/climate_ref/testing.py b/packages/climate-ref/src/climate_ref/testing.py index af21d1e57..3f1c87d0d 100644 --- a/packages/climate-ref/src/climate_ref/testing.py +++ b/packages/climate-ref/src/climate_ref/testing.py @@ -17,6 +17,7 @@ from climate_ref import SAMPLE_DATA_VERSION from climate_ref.config import Config from climate_ref.database import Database +from climate_ref.executor.fragment import PLACEHOLDER_FRAGMENT, assign_execution_fragment from climate_ref.models import Execution, ExecutionGroup from climate_ref_core.dataset_registry import dataset_registry_manager, fetch_all_files from climate_ref_core.datasets import ExecutionDatasetCollection @@ -108,10 +109,18 @@ def validate_result( execution = Execution( execution_group_id=execution_group.id, dataset_hash=result.definition.datasets.hash, - output_fragment=str(result.definition.output_fragment()), + output_fragment=PLACEHOLDER_FRAGMENT, ) database.session.add(execution) - database.session.flush() + + assign_execution_fragment( + database.session, + execution, + provider_slug=diagnostic.provider.slug, + diagnostic_slug=diagnostic.slug, + selectors=result.definition.datasets.selectors, + group_id=execution_group.id, + ) assert result.successful diff --git a/packages/climate-ref/tests/unit/executor/test_fragment.py b/packages/climate-ref/tests/unit/executor/test_fragment.py index 080d6b3c4..a9d575ce6 100644 --- a/packages/climate-ref/tests/unit/executor/test_fragment.py +++ b/packages/climate-ref/tests/unit/executor/test_fragment.py @@ -1,11 +1,15 @@ -"""Tests for the allocate_output_fragment helper.""" +"""Tests for the fragment allocation and group-short helpers.""" import datetime from unittest.mock import patch import pytest -from climate_ref.executor.fragment import allocate_output_fragment +from climate_ref.executor.fragment import ( + _validate_path_segment, + allocate_output_fragment, + compute_group_short, +) class TestAllocateOutputFragment: @@ -42,3 +46,93 @@ def test_raises_if_directory_already_exists(self, tmp_path): (tmp_path / fragment).mkdir(parents=True) with pytest.raises(FileExistsError, match="Output directory already exists"): allocate_output_fragment("provider/diag/abc123", tmp_path) + + +class TestComputeGroupShort: + """Tests for the ``compute_group_short`` helper.""" + + def test_compute_group_short_is_deterministic(self): + """Same inputs should always produce the same output.""" + selectors = {"cmip6": [("source_id", "ACCESS-ESM1-5"), ("variable_id", "tas")]} + out1 = compute_group_short(selectors, group_id=7, diagnostic_version=1) + out2 = compute_group_short(selectors, group_id=7, diagnostic_version=1) + assert out1 == out2 + + def test_compute_group_short_includes_group_id_and_version(self): + """The result should include human-readable ``g{id}`` and ``v{version}`` markers.""" + selectors = {"cmip6": [("source_id", "MODEL")]} + out = compute_group_short(selectors, group_id=42, diagnostic_version=2) + assert "g42" in out + assert "v2" in out + # Should also be ASCII-only + assert out.isascii() + + def test_compute_group_short_truncation(self): + """Selector strings longer than the token limit should be truncated cleanly.""" + # Build selectors whose joined values are well over 100 characters. + long_value = "X" * 30 + selectors = { + "cmip6": [(f"facet_{i}", f"{long_value}_{i}") for i in range(6)], + } + out = compute_group_short(selectors, group_id=1, diagnostic_version=1) + # Whole result is capped at ~96 chars; suffix is preserved. + assert len(out) <= 96 + assert out.endswith("_g1_v1_" + out.split("_")[-1]) + # Truncation should not leave a stray boundary character. + assert "g1" in out and "v1" in out + + def test_compute_group_short_collision_resistance(self): + """Two selector sets sharing a prefix should yield distinct hash suffixes.""" + # Both selector sets start with the same value but differ further on. + a = {"cmip6": [("source_id", "MODEL"), ("variable_id", "tas")]} + b = {"cmip6": [("source_id", "MODEL"), ("variable_id", "pr")]} + out_a = compute_group_short(a, group_id=1, diagnostic_version=1) + out_b = compute_group_short(b, group_id=1, diagnostic_version=1) + assert out_a != out_b + # The 8-char hash digest is the trailing ``_xxxxxxxx`` segment. + digest_a = out_a.rsplit("_", 1)[-1] + digest_b = out_b.rsplit("_", 1)[-1] + assert digest_a != digest_b + assert len(digest_a) == 8 + assert len(digest_b) == 8 + + def test_compute_group_short_handles_unicode_selector_values(self): + """Non-ASCII selector values should be sanitized to ASCII tokens.""" + selectors = {"cmip6": [("source_id", "MODéL")]} + out = compute_group_short(selectors, group_id=1, diagnostic_version=1) + assert out.isascii() + + def test_compute_group_short_empty_selectors(self): + """An empty selector mapping still produces a valid suffix.""" + out = compute_group_short({}, group_id=3, diagnostic_version=1) + assert "g3" in out + assert "v1" in out + assert out.isascii() + + def test_compute_group_short_hard_cap_enforced(self): + """Result must never exceed _GROUP_SHORT_MAX even with a huge group_id and long selectors.""" + long_value = "X" * 30 + selectors = { + "cmip6": [(f"facet_{i}", f"{long_value}_{i}") for i in range(10)], + } + out = compute_group_short(selectors, group_id=10**9, diagnostic_version=1) + assert len(out) <= 96, f"Expected len <= 96, got {len(out)}: {out!r}" + + +class TestValidatePathSegment: + """``_validate_path_segment`` rejects values that could escape the base directory.""" + + @pytest.mark.parametrize( + "value", + ["", "foo/bar", "foo\\bar", "..", ".", "foo\x00bar"], + ) + def test_rejects_unsafe_segments(self, value): + with pytest.raises(ValueError, match="not a safe single path segment"): + _validate_path_segment(value, label="test slug") + + @pytest.mark.parametrize( + "value", + ["esmvaltool", "my-diag", "foo_bar", "..hidden", "diag.v2"], + ) + def test_accepts_safe_segments(self, value): + assert _validate_path_segment(value, label="test slug") == value diff --git a/packages/climate-ref/tests/unit/executor/test_reingest.py b/packages/climate-ref/tests/unit/executor/test_reingest.py index e6e021bc2..8f856816b 100644 --- a/packages/climate-ref/tests/unit/executor/test_reingest.py +++ b/packages/climate-ref/tests/unit/executor/test_reingest.py @@ -1,6 +1,5 @@ """Tests for the reingest module.""" -import datetime import json import pathlib @@ -475,17 +474,13 @@ def test_twice_creates_distinct_fragments( mock_result_factory, mocker, ): - """Running reingest twice should create distinct output fragments.""" + """ + Running reingest twice should create distinct output fragments whose + final segment is the new ``Execution.id`` (no timestamp suffix). + """ mock_result = mock_result_factory(scratch_dir_with_results) _patch_build_result(mocker, mock_provider_registry, mock_result) - t1 = datetime.datetime(2026, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) - t2 = datetime.datetime(2026, 1, 1, 12, 0, 1, tzinfo=datetime.timezone.utc) - mocker.patch( - "climate_ref.executor.fragment.datetime.datetime", - **{"now.side_effect": [t1, t2]}, - ) - ok1 = reingest_execution( config=config, database=reingest_db, @@ -505,11 +500,16 @@ def test_twice_creates_distinct_fragments( reingest_db.session.commit() assert ok2 is True - all_executions = reingest_db.session.query(Execution).all() + all_executions = reingest_db.session.query(Execution).order_by(Execution.id.asc()).all() assert len(all_executions) == 3 fragments = [e.output_fragment for e in all_executions] assert len(set(fragments)) == 3, f"Expected unique fragments, got: {fragments}" + # The two new (reingested) executions must end with their own ``Execution.id``; + # there is no longer a timestamp suffix. + for execution in all_executions[1:]: + assert execution.output_fragment.split("/")[-1] == str(execution.id) + @pytest.mark.filterwarnings("ignore:Unknown dimension values.*:UserWarning") def test_copies_results_to_new_directory( self, @@ -1138,6 +1138,11 @@ def test_selects_oldest_execution(self, db_seeded): Reingested executions only have results directories, not scratch. Selecting the oldest ensures we always reingest from the execution whose scratch directory actually exists. + + The ``output_fragment`` strings used here are literal sentinels; + they intentionally distinguish two rows for the ordering check and + are *not* meant to mirror the path layout produced by the + production code. """ with db_seeded.session.begin(): diag = db_seeded.session.query(DiagnosticModel).first() @@ -1157,7 +1162,7 @@ def test_selects_oldest_execution(self, db_seeded): reingested = Execution( execution_group_id=eg.id, successful=True, - output_fragment="original_fragment_20260405T120000000000", + output_fragment="reingested_fragment", dataset_hash="h-orig", ) db_seeded.session.add(reingested) diff --git a/packages/climate-ref/tests/unit/test_solver.py b/packages/climate-ref/tests/unit/test_solver.py index e76bc3903..8976aa996 100644 --- a/packages/climate-ref/tests/unit/test_solver.py +++ b/packages/climate-ref/tests/unit/test_solver.py @@ -623,7 +623,12 @@ def test_solve_metrics_default_solver(mocker, mock_metric_execution, mock_execut # Check that a result is created assert db_seeded.session.query(Execution).count() == 1 execution_result = db_seeded.session.query(Execution).first() - assert execution_result.output_fragment == "output_fragment" + # Output fragment now follows ``///`` + fragment_parts = execution_result.output_fragment.split("/") + assert len(fragment_parts) == 4 + assert fragment_parts[0] == "mock_provider" + assert fragment_parts[1] == "mock" + assert fragment_parts[3] == str(execution_result.id) assert execution_result.dataset_hash == "123456" assert execution_result.execution_group.key == "key" # Nested tuples are converted into nested lists after going through the DB @@ -640,10 +645,63 @@ def test_solve_metrics_default_solver(mocker, mock_metric_execution, mock_execut # so compare by id rather than identity. assert mock_executor.return_value.run.call_count == 1 run_kwargs = mock_executor.return_value.run.call_args.kwargs - assert run_kwargs["definition"] == mock_metric_execution.build_execution_definition() + # The solver rewrites ``output_directory`` to the resolved + # ``///`` path, + # so the run definition is no longer the same object as the placeholder + # returned by ``mock_metric_execution.build_execution_definition()``. + assert ( + run_kwargs["definition"].diagnostic == mock_metric_execution.build_execution_definition().diagnostic + ) + assert str(run_kwargs["definition"].output_directory).endswith(execution_result.output_fragment) assert run_kwargs["execution"].id == execution_result.id +def test_two_executions_same_group_distinct_output_fragments( + mocker, mock_metric_execution, mock_executor, db_seeded +): + """ + Two solve passes against the same execution group should produce distinct + ``Execution.output_fragment`` values whose final ``/`` segment + differs while the leading ``///`` prefix matches. + """ + mock_build_solver = mocker.patch.object(ExecutionSolver, "build_from_db") + solver_mock = mock.MagicMock(spec=ExecutionSolver) + solver_mock.solve.return_value = [mock_metric_execution] + mock_build_solver.return_value = solver_mock + + # First solve creates the execution group + first execution. + solve_required_executions(db_seeded) + + first = db_seeded.session.query(Execution).order_by(Execution.id.asc()).all() + assert len(first) == 1 + + # Mark the first execution successful and the group dirty so the next solve + # is willing to schedule a fresh execution against the same group. + execution = first[0] + execution.successful = True + execution.path = "diagnostic.json" + execution.execution_group.dirty = True + db_seeded.session.commit() + + # Second solve adds a second execution to the same group. + solver_mock.solve.return_value = [mock_metric_execution] + solve_required_executions(db_seeded) + + executions = db_seeded.session.query(Execution).order_by(Execution.id.asc()).all() + assert len(executions) == 2 + fragment_a = executions[0].output_fragment + fragment_b = executions[1].output_fragment + + # Fragments differ overall but share the leading ``//``. + assert fragment_a != fragment_b + prefix_a = "/".join(fragment_a.split("/")[:-1]) + prefix_b = "/".join(fragment_b.split("/")[:-1]) + assert prefix_a == prefix_b + # The trailing segment is the new ``Execution.id``. + assert fragment_a.split("/")[-1] == str(executions[0].id) + assert fragment_b.split("/")[-1] == str(executions[1].id) + + def test_solve_metrics(mocker, db_seeded, solver, data_regression, mock_executor): mock_build_solver = mocker.patch.object(ExecutionSolver, "build_from_db") @@ -1100,7 +1158,8 @@ def test_solve_with_one_per_provider( # so compare by id rather than identity. assert mock_executor.return_value.run.call_count == 1 run_kwargs = mock_executor.return_value.run.call_args.kwargs - assert run_kwargs["definition"] == mock_metric_execution.build_execution_definition() + # ``output_directory`` is rewritten by the solver after flushing the new ``Execution`` row. + assert str(run_kwargs["definition"].output_directory).endswith(execution_result.output_fragment) assert run_kwargs["execution"].id == execution_result.id @@ -1126,7 +1185,8 @@ def test_solve_with_one_per_diagnostic( # so compare by id rather than identity. assert mock_executor.return_value.run.call_count == 1 run_kwargs = mock_executor.return_value.run.call_args.kwargs - assert run_kwargs["definition"] == mock_metric_execution.build_execution_definition() + # ``output_directory`` is rewritten by the solver after flushing the new ``Execution`` row. + assert str(run_kwargs["definition"].output_directory).endswith(execution_result.output_fragment) assert run_kwargs["execution"].id == execution_result.id @@ -1342,6 +1402,12 @@ def test_diagnostic_execution_build_definition(mock_diagnostic, provider, tmp_pa assert mock_diagnostic.slug in str(definition.output_directory) # output_directory should be under the resolved tmp_path assert str(tmp_path.resolve()) in str(definition.output_directory) + # ``build_execution_definition`` no longer reaches the dataset hash; + # it returns a placeholder fragment that the solver rewrites once + # ``Execution.id`` is known. + assert definition.output_directory.name == "_pending" + assert definition.output_directory.parent.name == mock_diagnostic.slug + assert definition.output_directory.parent.parent.name == provider.slug def test_diagnostic_execution_selectors(mock_diagnostic, provider):