Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/655.improvement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Reworked the layout of execution output directories.
New executions now write to ``<provider>/<diagnostic>/<group_short>/<execution_id>/``
instead of ``<provider>/<diagnostic>/<dataset_hash>/``,
so reruns of the same diagnostic group no longer overwrite earlier outputs.
Existing rows on disk continue to resolve through their stored ``Execution.output_fragment``.
154 changes: 154 additions & 0 deletions packages/climate-ref/src/climate_ref/executor/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,26 @@
"""

import datetime
import hashlib
import re
from collections.abc import Iterable, Mapping
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from climate_ref.models.execution import Execution

_TOKEN_RE = re.compile(r"[^A-Za-z0-9_-]+")
_DEFAULT_TOKEN_LIMIT = 64
_GROUP_SHORT_MAX = 96

PLACEHOLDER_FRAGMENT = "_pending"
"""Output-fragment placeholder used until ``Execution.id`` is known."""

_DEFAULT_DIAGNOSTIC_VERSION = 1
"""Default ``Diagnostic.version`` used when a diagnostic does not declare its own."""


def allocate_output_fragment(base_fragment: str, results_dir: Path) -> str:
Expand Down Expand Up @@ -42,3 +61,138 @@ def allocate_output_fragment(base_fragment: str, results_dir: Path) -> str:
)

return fragment


def _sanitize_token(value: str) -> str:
"""
Sanitize a single selector value to ``[A-Za-z0-9_-]+``.

Non-conforming characters are collapsed to a single underscore.
Leading/trailing underscores are stripped.
"""
cleaned = _TOKEN_RE.sub("_", value).strip("_")
return cleaned


def _truncate_at_boundary(text: str, limit: int) -> str:
"""
Truncate *text* at *limit* characters, preferring an underscore boundary.

If *text* is at or below *limit*, return it unchanged.
Otherwise truncate to the rightmost ``_`` at or before *limit*; fall back to a
hard cut if no boundary exists.
"""
if len(text) <= limit:
return text
head = text[:limit]
boundary = head.rfind("_")
if boundary > 0:
return head[:boundary]
return head


def compute_group_short(
selectors: Mapping[str, Iterable[tuple[str, str]]],
group_id: int,
diagnostic_version: int,
*,
token_limit: int = _DEFAULT_TOKEN_LIMIT,
) -> str:
"""
Compute a short, deterministic, human-readable path segment for an execution group.

Human-readable hint for operators browsing the filesystem.
Not unique -- ``execution_id`` is the uniqueness guarantee.

Selector values across all source types are sorted (first by source-type key,
then by facet key),
sanitized to ``[A-Za-z0-9_-]``,
joined by ``_``,
and truncated to *token_limit* characters at an underscore boundary.
A suffix ``_g{group_id}_v{diagnostic_version}_{sha1[:8]}`` is appended,
where the SHA1 hash is computed over the canonical
``group_id|diagnostic_version|sorted_selectors`` representation.

The returned string is ASCII, capped at roughly 96 characters,
and deterministic for fixed inputs.

Parameters
----------
selectors
Mapping from source-type key (e.g. ``"cmip6"``) to an iterable of
``(facet_key, facet_value)`` tuples.
group_id
The ``ExecutionGroup.id`` this fragment belongs to.
diagnostic_version
The integer ``Diagnostic.version`` at solve time.
token_limit
Maximum length of the sanitized selector portion before the suffix.

Returns
-------
:
A short, sanitized, deterministic identifier suitable for use as a
filesystem path segment.
"""
# Build a canonical representation: sort source types, then facet pairs.
canonical_pairs: list[tuple[str, tuple[tuple[str, str], ...]]] = []
for source_type in sorted(selectors.keys()):
pairs = tuple(sorted((str(k), str(v)) for k, v in selectors[source_type]))
canonical_pairs.append((source_type, pairs))

# Build the human-readable token portion from the values only.
raw_tokens: list[str] = []
for _, pairs in canonical_pairs:
for _, value in pairs:
token = _sanitize_token(value)
if token:
raw_tokens.append(token)

token_part = _truncate_at_boundary("_".join(raw_tokens), token_limit)

# Stable hash input: group_id, version, and the canonical selector pairs.
hash_payload = repr((group_id, diagnostic_version, canonical_pairs)).encode("utf-8")
digest = hashlib.sha1(hash_payload).hexdigest()[:8] # noqa: S324

suffix = f"_g{group_id}_v{diagnostic_version}_{digest}"

if token_part:
result = f"{token_part}{suffix}"
else:
# Strip leading underscore so we don't start with one.
result = suffix.lstrip("_")

if len(result) > _GROUP_SHORT_MAX:
# Trim the token portion further so the suffix is preserved.
overflow = len(result) - _GROUP_SHORT_MAX
trimmed_token = _truncate_at_boundary(token_part, max(0, len(token_part) - overflow))
result = f"{trimmed_token}{suffix}" if trimmed_token else suffix.lstrip("_")

# Hard cap: boundary-aware trimming above may still leave overflow when no
# underscore boundary exists close enough to the limit.
if len(result) > _GROUP_SHORT_MAX:
result = result[:_GROUP_SHORT_MAX]

return result


def assign_execution_fragment( # noqa: PLR0913
session: "Session",
execution: "Execution",
*,
provider_slug: str,
diagnostic_slug: str,
selectors: Mapping[str, Iterable[tuple[str, str]]],
group_id: int,
diagnostic_version: int = _DEFAULT_DIAGNOSTIC_VERSION,
) -> str:
"""Flush *execution* to materialise its id, then assign the canonical output fragment.

Returns the assigned fragment string.
"""
session.flush()
group_short = compute_group_short(selectors, group_id=group_id, diagnostic_version=diagnostic_version)
fragment = str(Path(provider_slug) / diagnostic_slug / group_short / str(execution.id))
execution.output_fragment = fragment
session.flush()
return fragment
102 changes: 63 additions & 39 deletions packages/climate-ref/src/climate_ref/executor/reingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from loguru import logger

from climate_ref.datasets import get_slug_column
from climate_ref.executor.fragment import allocate_output_fragment
from climate_ref.executor.fragment import PLACEHOLDER_FRAGMENT, assign_execution_fragment
from climate_ref.executor.result_handling import handle_execution_result
from climate_ref.models.execution import (
Execution,
Expand All @@ -45,6 +45,10 @@
from climate_ref_core.diagnostics import Diagnostic


class _ReingestSavepointAbort(Exception):
"""Internal sentinel used to roll back the savepoint on a soft-fail path."""


def reconstruct_execution_definition(
config: "Config",
execution: Execution,
Expand Down Expand Up @@ -242,48 +246,67 @@ def reingest_execution(
diagnostic, scratch_dir = resolved

execution_group = execution.execution_group
provider_slug = execution_group.diagnostic.provider.slug
diagnostic_slug = execution_group.diagnostic.slug

# Allocate a new output fragment with a timestamp suffix
# Previous execution scratch will be copied there
new_fragment = allocate_output_fragment(execution.output_fragment, config.paths.results)
new_scratch_dir = config.paths.scratch / new_fragment
# Convert the JSON-stored selector dict (lists-of-pairs) back into the
# mapping[str, iterable[tuple[str, str]]] shape that ``compute_group_short`` expects.
selectors = {
source_key: [tuple(pair) for pair in pairs] for source_key, pairs in execution_group.selectors.items()
}

new_fragment: str | None = None
new_scratch_dir: Path | None = None
new_execution: Execution | None = None

try:
new_scratch_dir.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(scratch_dir, new_scratch_dir)
try:
with database.session.begin_nested():
new_execution = Execution(
execution_group=execution_group,
dataset_hash=execution.dataset_hash,
output_fragment=PLACEHOLDER_FRAGMENT,
)
database.session.add(new_execution)

new_fragment = assign_execution_fragment(
database.session,
new_execution,
provider_slug=provider_slug,
diagnostic_slug=diagnostic_slug,
selectors=selectors,
group_id=execution_group.id,
)

definition = reconstruct_execution_definition(
config, execution, diagnostic, output_fragment=new_fragment
)
new_scratch_dir = config.paths.scratch / new_fragment
new_scratch_dir.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(scratch_dir, new_scratch_dir)

result = diagnostic.build_execution_result(definition)
definition = reconstruct_execution_definition(
config, execution, diagnostic, output_fragment=new_fragment
)

if not result.successful or result.metric_bundle_filename is None:
logger.warning(
f"build_execution_result returned unsuccessful result for execution {execution.id}. Skipping."
)
if new_scratch_dir.exists():
shutil.rmtree(new_scratch_dir)
return False
result = diagnostic.build_execution_result(definition)

with database.session.begin_nested():
# Create new Execution record
new_execution = Execution(
execution_group=execution_group,
dataset_hash=execution.dataset_hash,
output_fragment=new_fragment,
)
database.session.add(new_execution)
database.session.flush()

# Copy dataset links from the original execution
for dataset in execution.datasets:
database.session.execute(
execution_datasets.insert().values(
execution_id=new_execution.id,
dataset_id=dataset.id,
if not result.successful or result.metric_bundle_filename is None:
logger.warning(
f"build_execution_result returned unsuccessful result "
f"for execution {execution.id}. Skipping."
)
)
raise _ReingestSavepointAbort

# Copy dataset links from the original execution
for dataset in execution.datasets:
database.session.execute(
execution_datasets.insert().values(
execution_id=new_execution.id,
dataset_id=dataset.id,
)
)
except _ReingestSavepointAbort:
if new_scratch_dir is not None and new_scratch_dir.exists():
shutil.rmtree(new_scratch_dir)
return False

handle_execution_result(
config,
Expand All @@ -294,11 +317,12 @@ def reingest_execution(
)
except Exception:
logger.exception(f"Ingestion failed for execution {execution.id}. Rolling back changes.")
if new_scratch_dir.exists():
if new_scratch_dir is not None and new_scratch_dir.exists():
shutil.rmtree(new_scratch_dir)
new_results_dir = config.paths.results / new_fragment
if new_results_dir.exists():
shutil.rmtree(new_results_dir)
if new_fragment is not None:
new_results_dir = config.paths.results / new_fragment
if new_results_dir.exists():
shutil.rmtree(new_results_dir)
return False

logger.info(f"Successfully reingested execution {execution.id} -> new execution {new_execution.id}")
Expand Down
29 changes: 24 additions & 5 deletions packages/climate-ref/src/climate_ref/solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import typing
from collections.abc import Mapping, Sequence

import attrs
import pandas as pd
from attrs import define, frozen
from loguru import logger
Expand All @@ -24,6 +25,7 @@
PMPClimatologyDatasetAdapter,
get_slug_column,
)
from climate_ref.executor.fragment import PLACEHOLDER_FRAGMENT, assign_execution_fragment
from climate_ref.models import Diagnostic as DiagnosticModel
from climate_ref.models import ExecutionGroup
from climate_ref.models import Provider as ProviderModel
Expand Down Expand Up @@ -98,13 +100,16 @@ def selectors(self) -> dict[str, Selector]:

def build_execution_definition(self, output_root: pathlib.Path) -> ExecutionDefinition:
"""
Build the execution definition for the current diagnostic execution
Build the execution definition for the current diagnostic execution.

The returned definition uses a placeholder fragment for the output directory.
``solve_required_executions`` rewrites ``output_directory`` via
:func:`attrs.evolve` once the new ``Execution.id`` is known.
"""
# Ensure that the output root is always an absolute path
output_root = output_root.resolve()

# This is the desired path relative to the output directory
fragment = pathlib.Path() / self.provider.slug / self.diagnostic.slug / self.datasets.hash
fragment = pathlib.Path() / self.provider.slug / self.diagnostic.slug / PLACEHOLDER_FRAGMENT

return ExecutionDefinition(
diagnostic=self.diagnostic,
Expand Down Expand Up @@ -698,10 +703,24 @@ def solve_required_executions( # noqa: PLR0912, PLR0913, PLR0915
execution = Execution(
execution_group=execution_group,
dataset_hash=definition.datasets.hash,
output_fragment=str(definition.output_fragment()),
output_fragment=PLACEHOLDER_FRAGMENT,
)
db.session.add(execution)
db.session.flush()

fragment = assign_execution_fragment(
db.session,
execution,
provider_slug=provider_slug,
diagnostic_slug=potential_execution.diagnostic.slug,
selectors=potential_execution.selectors,
group_id=execution_group.id,
)

# Rebuild the definition so the executor sees the resolved output path.
definition = attrs.evolve(
definition,
output_directory=config.paths.scratch.resolve() / pathlib.Path(fragment),
)

# Add links to the datasets used in the execution
execution.register_datasets(db, definition.datasets)
Expand Down
Loading
Loading