Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

- **README and docs homepage refreshed around reasons-to-choose.** Replaced the 10-bullet "Why OpenArmature" feature inventory in `README.md` with 5 differentiating reasons (LLM-infused workflows to agents on one engine; crash-safe resume by contract; destination-pluggable observability with OTel + Langfuse, no SaaS lock-in; compile-time topology checks; spec + conformance). The docs homepage (`docs/index.md`) card grid carries the same five plus a sixth card retained from the previous grid for async-first / LLM-agnostic: workflows-to-agents, crash-safe, pluggable observability, bad-graphs-don't-compile, parallelism (fan-out + parallel-branches + nested correctness), async-first.
- **Docs sweep: stale references and em-dash normalization.** Fixed three definite stale references (`spec_version='0.26.0'` in the Langfuse example output now reads `'0.38.0'`; the dangling `v0.16.1` qualifier dropped from the parallel-branches concept page; `compiled.attach_observer` corrected to `graph.attach_observer` in `non-obvious-shapes.md` for variable-name consistency with the rest of the docs). Swept em dashes out of the user-facing docs (130 instances across 17 files) per the convention set during the patterns expansion. mkdocs strict build clean; no broken intra-docs links.
- **Example 08 (checkpointing-and-migration) grows a crash-and-resume drama.** The first invoke of the v1 graph now hits a simulated transient failure inside `size_crew` (raises a `RuntimeError` on its first attempt only). The example catches `NodeException` at the `invoke()` boundary, prints what's saved on disk (`define_objective`'s position is already in `completed_positions`), then re-invokes with `resume_invocation=<id>`. The retried `size_crew` succeeds, `draft_timeline` runs, and the pipeline finishes - dramatizing the synchronous-checkpoint-by-contract reliability claim from the README pitch. The existing v1->v2 migration phase rides on top of the crash-survived checkpoint, so both reliability stories compose in one demo. Walk-through doc rewritten to cover both phases.

### Added

Expand Down
205 changes: 141 additions & 64 deletions docs/examples/08-checkpointing-and-migration.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# 08 - Checkpointing and migration

A lunar-mission planning pipeline that writes a SQLite checkpoint
after every step, then resumes the saved invocation under an
upgraded state schema with a v1->v2 migration backfilling new
fields.
after every step, survives a simulated mid-pipeline crash, and then
resumes the saved invocation under an upgraded state schema with a
v1->v2 migration backfilling new fields.

## Overview

Expand All @@ -15,41 +15,76 @@ A planning pipeline drafts a lunar mission plan in three steps:
3. `draft_timeline` - draft a one-sentence timeline.

The graph is wired with a `SQLiteCheckpointer` in JSON mode, so the
engine writes a record after every `completed` event. The whole v1
pipeline runs once, and the saved record persists on disk in a
temporary directory.

Then "some time later" - in the same script for demo purposes - a
v2 schema lands. It adds a `risk_assessment` field and a new
`assess_risks` node at the end. A migration function backfills
`risk_assessment=""` for v1 records. The v2 graph resumes the
saved v1 invocation, the migration runs once on load, and
execution continues at `assess_risks`. The original three nodes do
not re-execute.
engine writes a record after every `completed` event. The demo
stages two reliability stories on top of that wiring:

**Phase 1 - crash and resume.** The first attempt at the v1 graph
runs `define_objective` to completion (its checkpoint saves), then
`size_crew` raises a simulated transient failure - the kind of
mid-LLM-call infrastructure blip you can't engineer away in
production (OOM kill, pod preemption, network drop). `NodeException`
reaches the `invoke()` boundary. A second `invoke()` call passes
`resume_invocation=<id>` of the same saved record. The engine reads
the record, skips `define_objective` (its position is already in
`completed_positions`), retries `size_crew`, runs `draft_timeline`,
and the pipeline finishes.

**Phase 2 - migration on resume.** Then "some time later" - in the
same script for demo purposes - a v2 schema lands. It adds a
`risk_assessment` field and a new `assess_risks` node at the end. A
migration function backfills `risk_assessment=""` for v1 records.
The v2 graph resumes the (now-completed) v1 invocation, the
migration runs once on load, and execution continues at
`assess_risks`. The original three v1 nodes do not re-execute.

## What it teaches

- [`SQLiteCheckpointer(path, serialization="json")`](../concepts/checkpointing.md)
writing records to a SQLite file. JSON is the
migration-eligible serialization; `pickle` mode is faster but
can't bridge schemas.
- [`with_checkpointer`](../concepts/checkpointing.md) wiring the
checkpointer to the graph. The engine fires a save at every
`completed` event for outermost and subgraph-internal nodes.
- [`State.schema_version`](../concepts/checkpointing.md) as a
- **[`SQLiteCheckpointer(path, serialization="json")`](../concepts/checkpointing.md)**
writes records to a SQLite file synchronously after every node
completes. The save returns before the next node starts, so a
crash mid-next-node can't lose the previous node's record. JSON
is the migration-eligible serialization; `pickle` mode is faster
but can't bridge schemas.
- **[`with_checkpointer`](../concepts/checkpointing.md)** wires the
checkpointer into a `GraphBuilder`. The engine fires a save at
every `completed` event for outermost and subgraph-internal
nodes.
- **[`NodeException` at the `invoke()` boundary](../concepts/graphs.md)**.
When a node raises, the engine wraps the cause in `NodeException`
and re-raises it from `invoke()`. The caller catches it; the
checkpointer's record of the *previously* completed nodes is
already durable on disk. `exc.__cause__` carries the original
exception, `exc.node_name` identifies the failing node, and
`exc.recoverable_state` carries the state as it was just before
the failing node ran.
- **[`invoke(state, resume_invocation=<id>)`](../concepts/checkpointing.md)**
resumes from a saved record. The engine reads the record, skips
nodes whose `completed` events are already in
`completed_positions`, and continues at the first uncompleted
node. The retried node runs from a clean slate against the
loaded state - whatever transient condition caused the previous
failure can resolve cleanly.
- **Each `invoke()` mints its own `invocation_id`, even on resume.**
The pre-crash record stays under the original id; the resumed
attempt's new checkpoints save under a fresh id. Phase 2's resume
target is the *resumed attempt's* id (the post-resume completed
record), not the original id (the pre-crash partial record).
The example re-queries `CheckpointFilter(correlation_id=run_id)`
after the phase 1 resume completes to capture the new id; the
shared `correlation_id` is the cross-attempt join key.
- **[`State.schema_version`](../concepts/checkpointing.md)** as a
`ClassVar[str]` declaration. Empty string opts the class out of
migration support; any non-empty value opts it in.
- [`with_state_migration(from_version, to_version, migrate)`](../concepts/checkpointing.md)
registering one edge of the migration chain. The `migrate`
callable is pure (dict in, dict out, no I/O).
- [`invoke(state, resume_invocation=<id>)`](../concepts/checkpointing.md)
resuming from a saved record. The engine reads the record,
applies the migration chain, re-deserializes against the current
state class, and continues at the first uncompleted node.
- The migration registry's BFS resolution: with a v3 schema and
two migration edges (`v1→v2`, `v2→v3`), the registry walks the
shortest chain automatically. A v1 record loaded under a v3
graph runs `v1→v2` then `v2→v3` without caller-side composition.
- **[`with_state_migration(from_version, to_version, migrate)`](../concepts/checkpointing.md)**
registers one edge of the migration chain. The `migrate`
callable is pure (dict in, dict out, no I/O). The engine applies
it on load when the saved record's `schema_version` doesn't
match the current state class's.
- **The migration registry's BFS resolution.** With a v3 schema
and two migration edges (`v1->v2`, `v2->v3`), the registry walks
the shortest chain automatically. A v1 record loaded under a v3
graph runs `v1->v2` then `v2->v3` without caller-side
composition.

## How to run

Expand All @@ -60,8 +95,8 @@ LLM_API_KEY=sk-... uv run python examples/08-checkpointing-and-migration/main.py

The SQLite database is created in a `TemporaryDirectory` that's
cleaned up automatically. The demo runs both phases in one
invocation so you can see the resume behavior end-to-end without
manual orchestration.
invocation so you can see the crash, the resume, and the migration
end-to-end without manual orchestration.

## The graph

Expand Down Expand Up @@ -101,18 +136,30 @@ as a plain dict and returns a dict at the new schema (here, just

```
========================================================================
Phase 1 - invoke v1 graph; checkpoints save after every node
Phase 1 - invoke v1 graph; size_crew crashes; resume picks up
========================================================================

destination: Lunar South Pole
checkpoint db: /tmp/oa-checkpoint-demo-.../checkpoints.sqlite

v1 result:
objective: <objective sentence>
crew_size: 4
timeline: <timeline sentence>

v1 invocation_id: <uuid>
first attempt:
NodeException at node 'size_crew': simulated transient mid-pipeline crash before size_crew completed
saved invocation_id: <uuid>
completed nodes: ['define_objective']

second attempt (resume from saved invocation):
objective: <objective sentence>
crew_size: 4
timeline: <timeline sentence>
trace: ['define_objective', 'size_crew', 'draft_timeline']
resumed invocation_id: <uuid-B, different from the pre-crash uuid above>

Each node name appears exactly once across two invoke() calls.
define_objective is in trace from the first attempt (its append
survived the crash via the synchronous checkpoint); size_crew +
draft_timeline are from the resumed attempt. size_crew has no
duplicate entry because its first call raised before returning
a state update.

========================================================================
Phase 2 - invoke v2 graph with resume; v1->v2 migration runs
Expand All @@ -121,30 +168,60 @@ Phase 2 - invoke v2 graph with resume; v1->v2 migration runs
v2 adds: risk_assessment field + assess_risks node
migration: backfills risk_assessment='' for v1 records

v2 result after resume:
objective: <same objective sentence from v1>
crew_size: 4
timeline: <same timeline sentence from v1>
risk_assessment: <new sentence from assess_risks>

trace: ['assess_risks']

The v1 nodes appear once each in v1's trace and NOT in v2's
trace - they were skipped on resume because completed_positions
already covered them. Only assess_risks ran in phase 2.
v2 result after resume:
objective: <same objective sentence>
crew_size: 4
timeline: <same timeline sentence>
risk_assessment: <new sentence from assess_risks>
trace: ['define_objective', 'size_crew', 'draft_timeline', 'assess_risks']

v2's trace appends 'assess_risks' to the v1 entries the migration
preserved. Each v1 node appears exactly once (no duplicates from
the v2 graph re-running them) because completed_positions skipped
them. Only assess_risks was new work in phase 2.
```

- **`v1 invocation_id`** is the saved record's correlation key. We
passed a deterministic `correlation_id` to `invoke()` so the
checkpoint filter can find the right record; in production, the
caller usually owns the correlation_id and persists it alongside
the request that produced the run.
- **`trace: ['assess_risks']`** on the v2 result is the key signal.
The v1 nodes (`define_objective`, `size_crew`, `draft_timeline`)
did not re-execute on resume. Their `completed_positions` entries
in the saved record told the engine they were already done; the
v2 pipeline began at the first uncompleted position, which is
`assess_risks`.
- **`NodeException at node 'size_crew'`** is the signal that the
engine caught the simulated crash and surfaced it at the
`invoke()` boundary. The caller's `try / except NodeException`
is the canonical error boundary for nodes; `exc.__cause__`
carries the original `RuntimeError`.
- **`completed nodes: ['define_objective']`** on the loaded
record proves the durability claim. The `define_objective`
checkpoint is on disk before `size_crew` even started; the
crash can't take that record with it. The example projects
`record.completed_positions` (a tuple of `NodePosition` entries
carrying namespace, node_name, step, attempt_index) down to
just the node names for display.
- **`saved invocation_id` and `resumed invocation_id` are
different.** Each `invoke()` mints a fresh `invocation_id`,
including the resumed call. The pre-crash record (one completed
node) stays under the original id; the resumed attempt's
checkpoints (size_crew + draft_timeline completions) save under
the new id. The cross-attempt join key that ties them together
is `correlation_id` (here: `demo-mission-plan-1`), supplied by
the caller on the first invoke and preserved across the resume.
Phase 2's `resume_invocation=` target is the resumed attempt's
id, NOT the original; resuming from the original would reload
the pre-crash partial record and re-run `size_crew` +
`draft_timeline`, defeating the "completed v1 then migrate"
story.
- **`trace: ['define_objective', 'size_crew', 'draft_timeline']`**
after the resume is the cross-attempt continuity proof. The
resumed invoke starts from the saved state (so `trace` already
carries the first attempt's `define_objective` entry), and the
`append` reducer accumulates entries from the post-crash nodes
on top. Each node name appears exactly once: `define_objective`
ran once on the first attempt; `size_crew` ran twice but only
the second call returned a state update (the first call raised
before its return); `draft_timeline` ran once on the resume. The
*absence* of duplicates is the engine-side skip-set's signature.
- **`trace: [..., 'assess_risks']`** on the v2 result extends the
v1 entries with one new entry. The v1 nodes did not re-execute
on the v2 resume; their `completed_positions` entries told the
engine they were already done. The migration preserved their
trace entries (via `{**state_dict, ...}`), and the v2 pipeline
began at the first uncompleted position (`assess_risks`).
- **`crew_size: 4`** and the other v1 fields are present on the v2
result because the migration preserved them via `{**state_dict,
...}`. A migration that *changed* an existing field (e.g.,
Expand Down
Loading