diff --git a/CHANGELOG.md b/CHANGELOG.md index b743710..64e836e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The - **README and docs homepage refreshed around reasons-to-choose.** Replaced the 10-bullet "Why OpenArmature" feature inventory in `README.md` with 5 differentiating reasons (LLM-infused workflows to agents on one engine; crash-safe resume by contract; destination-pluggable observability with OTel + Langfuse, no SaaS lock-in; compile-time topology checks; spec + conformance). The docs homepage (`docs/index.md`) card grid carries the same five plus a sixth card retained from the previous grid for async-first / LLM-agnostic: workflows-to-agents, crash-safe, pluggable observability, bad-graphs-don't-compile, parallelism (fan-out + parallel-branches + nested correctness), async-first. - **Docs sweep: stale references and em-dash normalization.** Fixed three definite stale references (`spec_version='0.26.0'` in the Langfuse example output now reads `'0.38.0'`; the dangling `v0.16.1` qualifier dropped from the parallel-branches concept page; `compiled.attach_observer` corrected to `graph.attach_observer` in `non-obvious-shapes.md` for variable-name consistency with the rest of the docs). Swept em dashes out of the user-facing docs (130 instances across 17 files) per the convention set during the patterns expansion. mkdocs strict build clean; no broken intra-docs links. +- **Example 08 (checkpointing-and-migration) grows a crash-and-resume drama.** The first invoke of the v1 graph now hits a simulated transient failure inside `size_crew` (raises a `RuntimeError` on its first attempt only). The example catches `NodeException` at the `invoke()` boundary, prints what's saved on disk (`define_objective`'s position is already in `completed_positions`), then re-invokes with `resume_invocation=`. The retried `size_crew` succeeds, `draft_timeline` runs, and the pipeline finishes - dramatizing the synchronous-checkpoint-by-contract reliability claim from the README pitch. The existing v1->v2 migration phase rides on top of the crash-survived checkpoint, so both reliability stories compose in one demo. Walk-through doc rewritten to cover both phases. ### Added diff --git a/docs/examples/08-checkpointing-and-migration.md b/docs/examples/08-checkpointing-and-migration.md index 80b9004..b449d40 100644 --- a/docs/examples/08-checkpointing-and-migration.md +++ b/docs/examples/08-checkpointing-and-migration.md @@ -1,9 +1,9 @@ # 08 - Checkpointing and migration A lunar-mission planning pipeline that writes a SQLite checkpoint -after every step, then resumes the saved invocation under an -upgraded state schema with a v1->v2 migration backfilling new -fields. +after every step, survives a simulated mid-pipeline crash, and then +resumes the saved invocation under an upgraded state schema with a +v1->v2 migration backfilling new fields. ## Overview @@ -15,41 +15,76 @@ A planning pipeline drafts a lunar mission plan in three steps: 3. `draft_timeline` - draft a one-sentence timeline. The graph is wired with a `SQLiteCheckpointer` in JSON mode, so the -engine writes a record after every `completed` event. The whole v1 -pipeline runs once, and the saved record persists on disk in a -temporary directory. - -Then "some time later" - in the same script for demo purposes - a -v2 schema lands. It adds a `risk_assessment` field and a new -`assess_risks` node at the end. A migration function backfills -`risk_assessment=""` for v1 records. The v2 graph resumes the -saved v1 invocation, the migration runs once on load, and -execution continues at `assess_risks`. The original three nodes do -not re-execute. +engine writes a record after every `completed` event. The demo +stages two reliability stories on top of that wiring: + +**Phase 1 - crash and resume.** The first attempt at the v1 graph +runs `define_objective` to completion (its checkpoint saves), then +`size_crew` raises a simulated transient failure - the kind of +mid-LLM-call infrastructure blip you can't engineer away in +production (OOM kill, pod preemption, network drop). `NodeException` +reaches the `invoke()` boundary. A second `invoke()` call passes +`resume_invocation=` of the same saved record. The engine reads +the record, skips `define_objective` (its position is already in +`completed_positions`), retries `size_crew`, runs `draft_timeline`, +and the pipeline finishes. + +**Phase 2 - migration on resume.** Then "some time later" - in the +same script for demo purposes - a v2 schema lands. It adds a +`risk_assessment` field and a new `assess_risks` node at the end. A +migration function backfills `risk_assessment=""` for v1 records. +The v2 graph resumes the (now-completed) v1 invocation, the +migration runs once on load, and execution continues at +`assess_risks`. The original three v1 nodes do not re-execute. ## What it teaches -- [`SQLiteCheckpointer(path, serialization="json")`](../concepts/checkpointing.md) - writing records to a SQLite file. JSON is the - migration-eligible serialization; `pickle` mode is faster but - can't bridge schemas. -- [`with_checkpointer`](../concepts/checkpointing.md) wiring the - checkpointer to the graph. The engine fires a save at every - `completed` event for outermost and subgraph-internal nodes. -- [`State.schema_version`](../concepts/checkpointing.md) as a +- **[`SQLiteCheckpointer(path, serialization="json")`](../concepts/checkpointing.md)** + writes records to a SQLite file synchronously after every node + completes. The save returns before the next node starts, so a + crash mid-next-node can't lose the previous node's record. JSON + is the migration-eligible serialization; `pickle` mode is faster + but can't bridge schemas. +- **[`with_checkpointer`](../concepts/checkpointing.md)** wires the + checkpointer into a `GraphBuilder`. The engine fires a save at + every `completed` event for outermost and subgraph-internal + nodes. +- **[`NodeException` at the `invoke()` boundary](../concepts/graphs.md)**. + When a node raises, the engine wraps the cause in `NodeException` + and re-raises it from `invoke()`. The caller catches it; the + checkpointer's record of the *previously* completed nodes is + already durable on disk. `exc.__cause__` carries the original + exception, `exc.node_name` identifies the failing node, and + `exc.recoverable_state` carries the state as it was just before + the failing node ran. +- **[`invoke(state, resume_invocation=)`](../concepts/checkpointing.md)** + resumes from a saved record. The engine reads the record, skips + nodes whose `completed` events are already in + `completed_positions`, and continues at the first uncompleted + node. The retried node runs from a clean slate against the + loaded state - whatever transient condition caused the previous + failure can resolve cleanly. +- **Each `invoke()` mints its own `invocation_id`, even on resume.** + The pre-crash record stays under the original id; the resumed + attempt's new checkpoints save under a fresh id. Phase 2's resume + target is the *resumed attempt's* id (the post-resume completed + record), not the original id (the pre-crash partial record). + The example re-queries `CheckpointFilter(correlation_id=run_id)` + after the phase 1 resume completes to capture the new id; the + shared `correlation_id` is the cross-attempt join key. +- **[`State.schema_version`](../concepts/checkpointing.md)** as a `ClassVar[str]` declaration. Empty string opts the class out of migration support; any non-empty value opts it in. -- [`with_state_migration(from_version, to_version, migrate)`](../concepts/checkpointing.md) - registering one edge of the migration chain. The `migrate` - callable is pure (dict in, dict out, no I/O). -- [`invoke(state, resume_invocation=)`](../concepts/checkpointing.md) - resuming from a saved record. The engine reads the record, - applies the migration chain, re-deserializes against the current - state class, and continues at the first uncompleted node. -- The migration registry's BFS resolution: with a v3 schema and - two migration edges (`v1→v2`, `v2→v3`), the registry walks the - shortest chain automatically. A v1 record loaded under a v3 - graph runs `v1→v2` then `v2→v3` without caller-side composition. +- **[`with_state_migration(from_version, to_version, migrate)`](../concepts/checkpointing.md)** + registers one edge of the migration chain. The `migrate` + callable is pure (dict in, dict out, no I/O). The engine applies + it on load when the saved record's `schema_version` doesn't + match the current state class's. +- **The migration registry's BFS resolution.** With a v3 schema + and two migration edges (`v1->v2`, `v2->v3`), the registry walks + the shortest chain automatically. A v1 record loaded under a v3 + graph runs `v1->v2` then `v2->v3` without caller-side + composition. ## How to run @@ -60,8 +95,8 @@ LLM_API_KEY=sk-... uv run python examples/08-checkpointing-and-migration/main.py The SQLite database is created in a `TemporaryDirectory` that's cleaned up automatically. The demo runs both phases in one -invocation so you can see the resume behavior end-to-end without -manual orchestration. +invocation so you can see the crash, the resume, and the migration +end-to-end without manual orchestration. ## The graph @@ -101,18 +136,30 @@ as a plain dict and returns a dict at the new schema (here, just ``` ======================================================================== -Phase 1 - invoke v1 graph; checkpoints save after every node +Phase 1 - invoke v1 graph; size_crew crashes; resume picks up ======================================================================== destination: Lunar South Pole checkpoint db: /tmp/oa-checkpoint-demo-.../checkpoints.sqlite -v1 result: - objective: - crew_size: 4 - timeline: - - v1 invocation_id: + first attempt: + NodeException at node 'size_crew': simulated transient mid-pipeline crash before size_crew completed + saved invocation_id: + completed nodes: ['define_objective'] + + second attempt (resume from saved invocation): + objective: + crew_size: 4 + timeline: + trace: ['define_objective', 'size_crew', 'draft_timeline'] + resumed invocation_id: + + Each node name appears exactly once across two invoke() calls. + define_objective is in trace from the first attempt (its append + survived the crash via the synchronous checkpoint); size_crew + + draft_timeline are from the resumed attempt. size_crew has no + duplicate entry because its first call raised before returning + a state update. ======================================================================== Phase 2 - invoke v2 graph with resume; v1->v2 migration runs @@ -121,30 +168,60 @@ Phase 2 - invoke v2 graph with resume; v1->v2 migration runs v2 adds: risk_assessment field + assess_risks node migration: backfills risk_assessment='' for v1 records -v2 result after resume: - objective: - crew_size: 4 - timeline: - risk_assessment: - - trace: ['assess_risks'] - -The v1 nodes appear once each in v1's trace and NOT in v2's -trace - they were skipped on resume because completed_positions -already covered them. Only assess_risks ran in phase 2. + v2 result after resume: + objective: + crew_size: 4 + timeline: + risk_assessment: + trace: ['define_objective', 'size_crew', 'draft_timeline', 'assess_risks'] + + v2's trace appends 'assess_risks' to the v1 entries the migration + preserved. Each v1 node appears exactly once (no duplicates from + the v2 graph re-running them) because completed_positions skipped + them. Only assess_risks was new work in phase 2. ``` -- **`v1 invocation_id`** is the saved record's correlation key. We - passed a deterministic `correlation_id` to `invoke()` so the - checkpoint filter can find the right record; in production, the - caller usually owns the correlation_id and persists it alongside - the request that produced the run. -- **`trace: ['assess_risks']`** on the v2 result is the key signal. - The v1 nodes (`define_objective`, `size_crew`, `draft_timeline`) - did not re-execute on resume. Their `completed_positions` entries - in the saved record told the engine they were already done; the - v2 pipeline began at the first uncompleted position, which is - `assess_risks`. +- **`NodeException at node 'size_crew'`** is the signal that the + engine caught the simulated crash and surfaced it at the + `invoke()` boundary. The caller's `try / except NodeException` + is the canonical error boundary for nodes; `exc.__cause__` + carries the original `RuntimeError`. +- **`completed nodes: ['define_objective']`** on the loaded + record proves the durability claim. The `define_objective` + checkpoint is on disk before `size_crew` even started; the + crash can't take that record with it. The example projects + `record.completed_positions` (a tuple of `NodePosition` entries + carrying namespace, node_name, step, attempt_index) down to + just the node names for display. +- **`saved invocation_id` and `resumed invocation_id` are + different.** Each `invoke()` mints a fresh `invocation_id`, + including the resumed call. The pre-crash record (one completed + node) stays under the original id; the resumed attempt's + checkpoints (size_crew + draft_timeline completions) save under + the new id. The cross-attempt join key that ties them together + is `correlation_id` (here: `demo-mission-plan-1`), supplied by + the caller on the first invoke and preserved across the resume. + Phase 2's `resume_invocation=` target is the resumed attempt's + id, NOT the original; resuming from the original would reload + the pre-crash partial record and re-run `size_crew` + + `draft_timeline`, defeating the "completed v1 then migrate" + story. +- **`trace: ['define_objective', 'size_crew', 'draft_timeline']`** + after the resume is the cross-attempt continuity proof. The + resumed invoke starts from the saved state (so `trace` already + carries the first attempt's `define_objective` entry), and the + `append` reducer accumulates entries from the post-crash nodes + on top. Each node name appears exactly once: `define_objective` + ran once on the first attempt; `size_crew` ran twice but only + the second call returned a state update (the first call raised + before its return); `draft_timeline` ran once on the resume. The + *absence* of duplicates is the engine-side skip-set's signature. +- **`trace: [..., 'assess_risks']`** on the v2 result extends the + v1 entries with one new entry. The v1 nodes did not re-execute + on the v2 resume; their `completed_positions` entries told the + engine they were already done. The migration preserved their + trace entries (via `{**state_dict, ...}`), and the v2 pipeline + began at the first uncompleted position (`assess_risks`). - **`crew_size: 4`** and the other v1 fields are present on the v2 result because the migration preserved them via `{**state_dict, ...}`. A migration that *changed* an existing field (e.g., diff --git a/examples/08-checkpointing-and-migration/main.py b/examples/08-checkpointing-and-migration/main.py index 38a7058..ed405d7 100644 --- a/examples/08-checkpointing-and-migration/main.py +++ b/examples/08-checkpointing-and-migration/main.py @@ -1,30 +1,46 @@ -"""openarmature demo: a lunar-mission planning pipeline that checkpoints -its progress, then resumes under an upgraded state schema. +"""openarmature demo: a lunar-mission planning pipeline that survives a +mid-pipeline crash and later resumes under an upgraded state schema. **Use case:** A multi-step planning pipeline drafts a lunar mission plan -(objective, crew size, timeline). It writes a checkpoint after every -step so a crash or restart can pick up where it left off. Some time -later, you add a new analysis step (risk assessment) and a new state -field (``risk_assessment``) to support it. Resuming an old checkpoint -shouldn't require re-running the work that already finished — and -shouldn't fail because the saved state has the old shape. - -That's exactly what state migration is for. The pipeline runs once -against the v1 schema, the checkpoint persists, and the v2 schema -declares a migration from v1 that backfills the new field. The v2 -graph resumes from the v1 checkpoint, the migration runs once on the -loaded state, and execution picks up at the new node. +(objective, crew size, timeline). Two production-grade reliability +scenarios stack on top of each other: + +1. **Crash and resume.** The pipeline writes a checkpoint after every + step. Mid-run, ``size_crew`` raises a simulated transient failure + (OOM kill, pod preemption, network blip during the LLM call); the + first invoke() bubbles a ``NodeException`` at its boundary. The + define_objective record is durable on disk. A second invoke() with + ``resume_invocation=`` reads the saved record, skips the + already-completed node, retries size_crew (which now succeeds), and + runs through to END. + +2. **State migration on resume.** Some time later, you add a new + analysis step (risk assessment) and a new state field + (``risk_assessment``) to support it. The v2 schema declares a + migration from v1 that backfills the new field. The v2 graph + resumes from the (crash-survived) v1 invocation, the migration runs + once on the loaded state, and execution picks up at the new node. **What's interesting in the implementation:** - ``SQLiteCheckpointer(path, serialization="json")`` writes records to a SQLite file in JSON mode. JSON is the migration-eligible - serialization — it lets the engine load the saved state as a plain + serialization; it lets the engine load the saved state as a plain dict, apply migrations, and re-deserialize against the current state class. ``pickle`` mode is faster but can't bridge schemas. - ``GraphBuilder.with_checkpointer(...)`` wires the checkpointer to - the graph. The engine then fires a save at every ``completed`` - event for outermost and subgraph-internal nodes. + the graph. The engine fires a save synchronously at every + ``completed`` event for outermost and subgraph-internal nodes; + the save returns before the next node starts, so a crash mid-next- + node can't lose the previous node's record. +- ``NodeException`` reaches the caller from ``invoke()`` when a node + raises. ``exc.__cause__`` is the original exception; ``exc.node_name`` + identifies the failing node; ``exc.recoverable_state`` is the state + as it was just before the failing node ran. +- ``compiled.invoke(state, resume_invocation=)`` resumes a saved + invocation. The engine reads the record, skips nodes whose + ``completed`` event is already in ``completed_positions``, and + continues execution from the first uncompleted node. - ``State.schema_version`` is a ``ClassVar[str]`` declared on the state class. Empty string is the "no migration support" sentinel; any non-empty value opts the class into the migration registry. @@ -32,11 +48,8 @@ migrate)`` registers one edge of the migration chain. The ``migrate`` callable receives the saved state as a dict and returns the dict at the new schema. Pure function; no I/O, no side effects. -- ``compiled.invoke(state, resume_invocation=)`` resumes from a - saved record. The engine reads the record, applies any registered - migration chain that bridges the saved ``schema_version`` to the - current state class's, and continues execution from the first node - whose ``completed`` event isn't in the record. + Migration runs once on resume when the saved record's + ``schema_version`` doesn't match the current state class's. **Configuration** (env vars; OpenAI defaults shown): @@ -67,6 +80,7 @@ END, CompiledGraph, GraphBuilder, + NodeException, State, append, ) @@ -74,6 +88,19 @@ _provider_instance: OpenAIProvider | None = None +# Crash-and-resume drama: the first call to ``size_crew_v1`` raises a +# transient RuntimeError to simulate a mid-pipeline infrastructure +# failure (OOM kill, pod preemption, network blip during the LLM +# round-trip). The second call (during the resumed invocation) runs +# normally so the pipeline can complete. +# +# A real process restart would reset this counter when the OS rebooted +# the worker; the demo keeps the value across phases because both +# phases run in the same Python process. The engine-side invariant on +# display is unchanged: define_objective's saved checkpoint is durable +# across the failure, and the engine skips it on resume. +_size_crew_attempt_count = 0 + def _get_provider() -> OpenAIProvider: global _provider_instance @@ -125,6 +152,12 @@ async def define_objective_v1(s: MissionPlanStateV1) -> Mapping[str, Any]: async def size_crew_v1(s: MissionPlanStateV1) -> Mapping[str, Any]: + # Demo crash-trigger; see the _size_crew_attempt_count comment at + # the top of the module for the framing. + global _size_crew_attempt_count + _size_crew_attempt_count += 1 + if _size_crew_attempt_count == 1: + raise RuntimeError("simulated transient mid-pipeline crash before size_crew completed") content = await _chat( system=( "You are a mission planner. Given the objective below, reply with " @@ -170,7 +203,7 @@ def build_graph_v1(checkpointer: SQLiteCheckpointer) -> CompiledGraph[MissionPla # --------------------------------------------------------------------------- # v2 adds a ``risk_assessment`` field and a new ``assess_risks`` node at # the end of the pipeline. The migration backfills ``risk_assessment`` -# with an empty string for v1 records — the new node will fill it in +# with an empty string for v1 records; the new node will fill it in # when resume executes. @@ -225,7 +258,7 @@ async def draft_timeline_v2(s: MissionPlanStateV2) -> Mapping[str, Any]: async def assess_risks_v2(s: MissionPlanStateV2) -> Mapping[str, Any]: - """The new step v2 introduces — names the top risk for the plan.""" + """The new step v2 introduces; names the top risk for the plan.""" content = await _chat( system=( "You are a mission planner. Given the timeline below, identify " @@ -279,41 +312,94 @@ async def main() -> None: try: print("=" * 72) - print("Phase 1 — invoke v1 graph; checkpoints save after every node") + print("Phase 1 - invoke v1 graph; size_crew crashes; resume picks up") print("=" * 72) print() print(f" destination: {destination}") print(f" checkpoint db: {db_path}") print() - # Pass a deterministic correlation_id so phase 2 can find the - # invocation's saved records via the checkpoint filter. Without a - # caller-supplied correlation_id, invoke() generates a UUIDv4. + # Pass a deterministic correlation_id so we can look up the + # invocation's saved records via the checkpoint filter + # between attempts. Without a caller-supplied correlation_id, + # invoke() generates a UUIDv4. run_id = "demo-mission-plan-1" graph_v1 = build_graph_v1(checkpointer) initial_v1 = MissionPlanStateV1(destination=destination) - final_v1 = await graph_v1.invoke(initial_v1, correlation_id=run_id) + + # First attempt: define_objective completes (its checkpoint + # saves synchronously before size_crew runs); size_crew + # raises the simulated transient failure; NodeException + # bubbles to the invoke() boundary. + print(" first attempt:") + try: + await graph_v1.invoke(initial_v1, correlation_id=run_id) + except NodeException as exc: + cause = exc.__cause__ + cause_msg = str(cause) if cause is not None else "" + print(f" NodeException at node {exc.node_name!r}: {cause_msg}") await graph_v1.drain() - # Look up the saved record's invocation_id by correlation_id. The - # invocation_id is generated by invoke() and isn't exposed on the - # returned state; finding it through the checkpointer's list API - # is the canonical lookup path. + # Look up the saved record's invocation_id by correlation_id. + # The invocation_id is generated by invoke() and isn't + # exposed on the returned state (the call raised), so the + # checkpointer's list API is the canonical lookup path. + # The list API returns lightweight CheckpointSummary records; + # load() returns the full CheckpointRecord with + # completed_positions for the inspection below. summaries = list(await checkpointer.list(CheckpointFilter(correlation_id=run_id))) assert summaries, "expected at least one saved checkpoint" invocation_id = summaries[-1].invocation_id + record = await checkpointer.load(invocation_id) + assert record is not None, "expected the saved record to load" + completed_node_names = sorted(p.node_name for p in record.completed_positions) + print(f" saved invocation_id: {invocation_id}") + print(f" completed nodes: {completed_node_names}") + print() + + # Second attempt: resume from the pre-crash record. The + # engine reads the saved record, skips define_objective (its + # position is in completed_positions), retries size_crew + # (now succeeds because _size_crew_attempt_count is past 1), + # then runs draft_timeline to END. The user-supplied state + # argument is a placeholder; the engine ignores it on resume + # and starts from the saved record's state instead. + # + # Important: each invoke() mints its OWN invocation_id, even + # on resume. The pre-crash record stays under the original + # id; the resumed attempt's new checkpoints (size_crew + + # draft_timeline completions) save under a fresh id. After + # the resume we re-query to capture that new id, which is + # the one phase 2 needs as its resume target. + print(" second attempt (resume from saved invocation):") + final_v1 = await graph_v1.invoke( + MissionPlanStateV1(destination=destination), + resume_invocation=invocation_id, + ) + await graph_v1.drain() + + resume_summaries = list(await checkpointer.list(CheckpointFilter(correlation_id=run_id))) + resumed_invocation_id = resume_summaries[-1].invocation_id - print("v1 result:") - print(f" objective: {final_v1.objective}") - print(f" crew_size: {final_v1.crew_size}") - print(f" timeline: {final_v1.timeline}") + print(f" objective: {final_v1.objective}") + print(f" crew_size: {final_v1.crew_size}") + print(f" timeline: {final_v1.timeline}") + print(f" trace: {final_v1.trace}") + print(f" resumed invocation_id: {resumed_invocation_id}") print() - print(f" v1 invocation_id: {invocation_id}") + print( + " Each node name appears exactly once across two invoke() " + "calls. define_objective is in trace from the first attempt " + "(its append survived the crash via the synchronous " + "checkpoint); size_crew + draft_timeline are from the " + "resumed attempt. size_crew has no duplicate entry because " + "its first call raised before returning a state update." + ) print() print("=" * 72) - print("Phase 2 — invoke v2 graph with resume; v1->v2 migration runs") + print("Phase 2 - invoke v2 graph with resume; v1->v2 migration runs") print("=" * 72) print() print(" v2 adds: risk_assessment field + assess_risks node") @@ -321,29 +407,36 @@ async def main() -> None: print() graph_v2 = build_graph(checkpointer) - # Resume from the v1 invocation. The engine reads the saved record, - # applies migrate_v1_to_v2, re-deserializes against - # MissionPlanStateV2, and continues at the first uncompleted node - # (assess_risks — the v1 pipeline's three nodes are all in - # completed_positions, the new v2 node is not). + # Resume from the post-crash, post-resume completed record + # (resumed_invocation_id), NOT the pre-crash partial record + # (invocation_id). The pre-crash record only has + # define_objective in completed_positions; resuming from it + # would re-run size_crew + draft_timeline, defeating the + # "completed v1 then migrate" narrative. The engine reads + # the resumed-id record, applies migrate_v1_to_v2, + # re-deserializes against MissionPlanStateV2, and continues + # at the first uncompleted node (assess_risks - the v1 + # pipeline's three nodes are all in completed_positions on + # this record, the new v2 node is not). final_v2 = await graph_v2.invoke( MissionPlanStateV2(destination=destination), - resume_invocation=invocation_id, + resume_invocation=resumed_invocation_id, ) await graph_v2.drain() - print("v2 result after resume:") - print(f" objective: {final_v2.objective}") - print(f" crew_size: {final_v2.crew_size}") - print(f" timeline: {final_v2.timeline}") - print(f" risk_assessment: {final_v2.risk_assessment}") - print() - print(f" trace: {final_v2.trace}") + print(" v2 result after resume:") + print(f" objective: {final_v2.objective}") + print(f" crew_size: {final_v2.crew_size}") + print(f" timeline: {final_v2.timeline}") + print(f" risk_assessment: {final_v2.risk_assessment}") + print(f" trace: {final_v2.trace}") print() print( - "The v1 nodes appear once each in v1's trace and NOT in v2's " - "trace — they were skipped on resume because completed_positions " - "already covered them. Only assess_risks ran in phase 2." + " v2's trace appends 'assess_risks' to the v1 entries the " + "migration preserved. Each v1 node appears exactly once " + "(no duplicates from the v2 graph re-running them) because " + "completed_positions skipped them. Only assess_risks was " + "new work in phase 2." ) finally: if _provider_instance is not None: diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index 29aeb93..4bbe214 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -1462,7 +1462,7 @@ _Runnable example programs shipped in the source tree at `examples/`. The full c - **`examples/05-fan-out-with-retry/main.py`** — openarmature demo: summarize a batch of lunar-mission headlines in parallel, with per-headline retries and timing. - **`examples/06-parallel-branches/main.py`** — openarmature demo: enrich a lunar-mission news article with three independent analyses running concurrently. - **`examples/07-multimodal-prompt/main.py`** — openarmature demo: two independent analyses of a lunar-mission photograph using versioned prompt templates, a fallback prompt backend, and a multimodal user message. -- **`examples/08-checkpointing-and-migration/main.py`** — openarmature demo: a lunar-mission planning pipeline that checkpoints its progress, then resumes under an upgraded state schema. +- **`examples/08-checkpointing-and-migration/main.py`** — openarmature demo: a lunar-mission planning pipeline that survives a mid-pipeline crash and later resumes under an upgraded state schema. - **`examples/09-tool-use/main.py`** — openarmature demo: a lunar-mission assistant that calls local Python functions as tools to answer fact and physics questions about Apollo / Artemis missions. - **`examples/10-langfuse-observability/main.py`** — openarmature demo: Langfuse observer + prompt linkage on a lunar mission Q&A pipeline. - **`examples/11-chat-with-multimodal/main.py`** — openarmature demo: multi-turn chat with conversation memory and a multimodal turn, using ChatPrompt + PlaceholderSegment.