diff --git a/CHANGELOG.md b/CHANGELOG.md index 197eb30..e2a0681 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The ### Added +- **Three new patterns docs.** `docs/patterns/state-migration-on-resume.md`, `docs/patterns/caller-supplied-trace-identifiers.md`, and `docs/patterns/observer-state-reconciliation.md` graduate the corresponding entries from `docs/agent/non-obvious-shapes.md` into full pattern recipes with code snippets and "when this is right / when it isn't" guidance. The programmatic patterns API (`openarmature.patterns.list()` / `get(name)`) grows from 4 to 7 entries. - **HyperDX OTel integration test path and "Production swap" docs in example 03.** `examples/03-observer-hooks/main.py`'s module docstring grows a "Production swap" section showing how to substitute the demo's `SimpleSpanProcessor` + `ConsoleSpanExporter` for `BatchSpanProcessor` + `OTLPSpanExporter` pointed at HyperDX (or any other OTLP-HTTP collector). A new opt-in integration test (`tests/integration/test_otel_hyperdx_export.py`, gated by `HYPERDX_API_KEY` + `HYPERDX_OTLP_ENDPOINT` env vars and `@pytest.mark.integration`) drives the same production export path end-to-end against a live endpoint. `opentelemetry-exporter-otlp-proto-http` lands as a dev-only dep; not promoted to a public extras group yet. ### Changed (breaking) diff --git a/docs/agent/non-obvious-shapes.md b/docs/agent/non-obvious-shapes.md index f7a1496..e89a9ea 100644 --- a/docs/agent/non-obvious-shapes.md +++ b/docs/agent/non-obvious-shapes.md @@ -125,37 +125,6 @@ Different classes, same OTel-Logs export path. If both are attached against the Catching `Exception` works but is too broad; catching one hierarchy misses the other two. If you want to branch on category strings (e.g., for retry logic), catch the relevant base — `RuntimeGraphError` covers all five spec runtime categories, `LlmProviderError` covers all nine provider categories, `CheckpointError` covers all six checkpoint categories. The `TRANSIENT_CATEGORIES` frozenset in `openarmature.llm` enumerates which provider categories are retriable. -### Reconcile `started` → `completed` pairs via a per-invocation dict keyed on `(namespace, branch_name, attempt_index, fan_out_index)` - -Observers receive `started` and `completed` events as a pair per node attempt, but the engine doesn't carry a `step_id`-like correlation field across the pair (it doesn't need one for its own logic — the events arrive serially per spec §6). Observer code that needs to thread per-call state — start timestamps, request payloads, custom IDs — between the two events has to reconcile manually. - -The pair identity is `(namespace, branch_name, attempt_index, fan_out_index)`: that tuple is unique within an invocation (per graph-engine §6 uniqueness invariants — `branch_name` and `fan_out_index` are independent slots, so a node inside a parallel-branches branch needs `branch_name` in the key to avoid colliding with the same-named node in a sibling branch). Carry per-invocation state in a `dict[invocation_id, dict[tuple, value]]` and look up on `completed`: - -```python -class StepTimingObserver: - def __init__(self) -> None: - # invocation_id -> {(namespace, branch_name, attempt_index, fan_out_index): start_ts} - self._pending: dict[str, dict[tuple[Any, ...], float]] = {} - - async def __call__(self, event: NodeEvent) -> None: - invocation_id = current_invocation_id() - if invocation_id is None: - return - key = (event.namespace, event.branch_name, event.attempt_index, event.fan_out_index) - if event.phase == "started": - self._pending.setdefault(invocation_id, {})[key] = time.monotonic() - elif event.phase == "completed": - start = self._pending.get(invocation_id, {}).pop(key, None) - if start is not None: - duration = time.monotonic() - start - # … emit timing - # Sweep when the dict empties (last completed for this invocation). - if not self._pending.get(invocation_id): - self._pending.pop(invocation_id, None) -``` - -The `_pending[invocation_id]` sub-dict naturally tracks in-flight pairs and drains as completions arrive. Sweep the outer entry when the sub-dict empties so long-running services don't accumulate per-invocation entries. If you also subscribe to drain events, that's another sweep opportunity. The same pattern works for any per-call state the observer needs to thread across the pair. - ### Filter `openarmature.*`-namespaced events when your observer only cares about user nodes OA emits observer events under sentinel node-names for its own internal dispatch: `openarmature.llm.complete` for LLM provider calls (proposal 0024), `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014), `openarmature.checkpoint.save` for checkpoint saves (proposal 0010). These events let the OTel / Langfuse observers emit LLM-provider spans, checkpoint-migrate spans, etc. — but a custom observer that only cares about user-defined node activity sees them as noise: @@ -170,37 +139,6 @@ async def __call__(self, event: NodeEvent) -> None: `event.namespace[0]` is the safest discriminator (the leaf `event.node_name` would also work for LLM events but won't match the checkpoint sentinels since those repurpose `node_name` differently). Don't try to filter on `current_invocation_id() is None` — OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract. -### A `with_state_migration` recipe — register migrations alongside the state class, run on resume - -`GraphBuilder.with_state_migration(s)` registers callables that transform an old-schema state record into the current schema. The engine calls them automatically on `invoke(resume_invocation=...)` when the loaded record's `schema_version` doesn't match `state_cls.schema_version`. The migration callable's signature is `(state_dict: dict, from_version: str, to_version: str) -> dict`; it receives the raw deserialized record and returns the new shape. - -Wire it up at compile time: - -```python -class PipelineState(State): - schema_version: ClassVar[str] = "2" - # … v2 fields - -def _migrate_v1_to_v2(state_dict: dict, from_version: str, to_version: str) -> dict: - # Old field "step_count" renamed to "steps_completed" in v2. - state_dict["steps_completed"] = state_dict.pop("step_count", 0) - return state_dict - -compiled = ( - GraphBuilder(PipelineState) - .add_node("step", _step_body) - .add_edge("step", END) - .set_entry("step") - .with_state_migration(from_version="1", to_version="2", migrate=_migrate_v1_to_v2) - .compile() -) -compiled.attach_checkpointer(checkpointer) -``` - -Important detail: the migration runs once on resume, before any node body fires; the engine dispatches a synthetic `checkpoint_migrated` observer event (per spec §6 cross-ref) so observers can emit a migration span. The migrated state is what `_step_body` sees on resume — you do NOT need to handle both v1 and v2 shapes in node bodies. - -When chaining multiple migrations (v1 → v2 → v3), register each step separately via repeated `with_state_migration` calls; the engine walks the chain in version order. If the chain has gaps (registered v1→v2 and v3→v4 but a record is at v2 with `to_version="4"`), the engine raises `CheckpointStateMigrationMissing` at resume time — fail-loud rather than silently skipping. - ### Fan-out subgraphs that emit `list[X]` per instance produce `list[list[X]]` at `target_field` When a fan-out's per-instance state collects a `list[X]` as its `collect_field` (e.g., each instance produces 0..N records), the engine's contribution step is `[s[cfg.collect_field] for s in successes]` — every instance's value becomes one element of the outer list. With `list[X]` per-instance, the parent receives `list[list[X]]`, and the default `append` reducer on the parent's `Annotated[list[X], append]` field preserves the nesting verbatim. Pydantic then fails to validate each `list[X]` element against `X`: @@ -242,32 +180,3 @@ class PipelineState(State): Single-record-per-instance fan-outs (`collect_field: str`, parent field `Annotated[list[X], append]`) don't hit this — the engine still wraps each instance's value as one element, but `append` flattens it correctly since each element is already an `X`. The two non-flat shapes emerge only when the per-instance value is itself a container: a `list[X]` per instance lands `list[list[X]]` (use `concat_flatten`), and a `dict[str, X]` per instance lands `list[dict]` (use `merge_all`). If a parent field is populated by BOTH direct node writes AND fan-out collection, that's an architectural ambiguity worth fixing upstream — split into two fields, or pick one path. - -### `invoke(metadata=...)` for caller-supplied trace identifiers (tenant IDs, request IDs, feature flags) - -Per spec observability §3.4 / proposal 0034, callers attach arbitrary key/value entries at `invoke()` time and the framework propagates them to every observability backend: - -```python -await compiled.invoke( - initial_state, - metadata={"tenantId": "acme-corp", "requestId": "req-12345", "featureFlag": "v2-canary"}, -) -``` - -The OTel observer emits each entry as an `openarmature.user.` cross-cutting span attribute on every span (invocation, node, subgraph wrapper, fan-out instance, LLM provider). The Langfuse observer merges each entry as a top-level key into `trace.metadata` AND every observation's metadata. Backends that consume OTel attributes (Honeycomb, Datadog APM, HyperDX, Grafana Tempo) pick the entries up for free; backends with typed metadata fields (Langfuse) get them via the per-backend propagation rule. - -Boundary validation runs synchronously: keys MUST NOT start with `openarmature.` or `gen_ai.` (reserved namespaces); values MUST be OTel-attribute-compatible scalars (`str` / `int` / `float` / `bool`) or homogeneous arrays of those. Violations raise `ValueError` before any work begins. - -Mid-invocation augmentation via the public helper: - -```python -from openarmature.observability import set_invocation_metadata - -async def my_node(state: MyState) -> dict: - set_invocation_metadata(productId=state.product_id) - # subsequent spans (this node's completed, next node's started, - # any LLM call inside, etc.) carry productId - return {"score": await compute_score(state)} -``` - -The augmentation respects fan-out / parallel-branches per-instance scoping — each instance's augmentation lives in its own Context copy and doesn't leak to siblings. Sequential nodes in the same engine task see prior nodes' augmentations forward. The helper validates the same rules as the `invoke()` boundary. diff --git a/docs/patterns/caller-supplied-trace-identifiers.md b/docs/patterns/caller-supplied-trace-identifiers.md new file mode 100644 index 0000000..4b440e1 --- /dev/null +++ b/docs/patterns/caller-supplied-trace-identifiers.md @@ -0,0 +1,138 @@ +# Caller-supplied trace identifiers + +**Problem.** A service runs the same graph for many tenants / +requests / feature flag cohorts. How do you tag every span and +trace so downstream observability (Honeycomb, Datadog, Langfuse, +HyperDX, Grafana Tempo) can filter by tenant or join across +services without each node having to thread the identifiers +through manually? + +## Approach + +Pass a `metadata` dict to `invoke()`. The framework propagates each +entry to every observability backend at once: the OTel observer +emits each entry as an `openarmature.user.` cross-cutting span +attribute on every span (invocation, node, subgraph wrapper, +fan-out instance, LLM provider), and the Langfuse observer merges +each entry as a top-level key into `trace.metadata` AND every +observation's metadata. Backends that consume OTel attributes pick +the entries up for free; backends with typed metadata fields get +them via per-backend propagation. + +For metadata that's only known mid-flight (an ID resolved by an +LLM-classification node, a derived feature flag), use +`set_invocation_metadata` from inside a node. The augmentation +respects fan-out / parallel-branches per-instance scoping per +proposal 0045, so each instance's update lives in its own +async-context copy and doesn't leak to siblings. + +## Snippet + +```python +import asyncio + +from openarmature.graph import END, GraphBuilder, State +from openarmature.observability import set_invocation_metadata + + +class RequestState(State): + query: str = "" + answer: str = "" + + +async def answer(s: RequestState) -> dict: + # An entry resolved mid-invocation propagates to subsequent spans + # in the same async-context: this node's `completed`, the LLM + # provider span if any, and onwards. Sibling fan-out instances + # and parallel-branches branches see their own copies. + set_invocation_metadata(modelTier="standard") + return {"answer": "Apollo 13 aborted due to an O2 tank failure."} + + +graph = ( + GraphBuilder(RequestState) + .add_node("answer", answer) + .add_edge("answer", END) + .set_entry("answer") + .compile() +) + + +async def main() -> None: + final = await graph.invoke( + RequestState(query="why did Apollo 13 abort?"), + metadata={ + "tenantId": "acme-corp", + "requestId": "req-12345", + "featureFlag": "v2-canary", + }, + ) + print(final.answer) + + +asyncio.run(main()) +``` + +Every span emitted during this `invoke()` carries +`openarmature.user.tenantId="acme-corp"`, +`openarmature.user.requestId="req-12345"`, and +`openarmature.user.featureFlag="v2-canary"`. Spans inside the +`answer` node (and any downstream nodes if the graph had more) +additionally carry `openarmature.user.modelTier="standard"` from +the `set_invocation_metadata` call. + +## Boundary validation + +Validation runs synchronously, before any node body fires. Both +`invoke(metadata=...)` and `set_invocation_metadata(...)` enforce +the same rules: + +- Keys MUST NOT start with `openarmature.` or `gen_ai.` (reserved + namespaces per the spec). +- Keys MUST NOT collide with the spec's reserved per-trace metadata + keys (`correlation_id`, `entry_node`, `spec_version`, etc.). The + set is enforced at the `invoke()` and `set_invocation_metadata` + boundaries via the validator in + `openarmature.observability.metadata`; it grows per spec proposals + 0041 / 0042, with the canonical list in the spec's observability + §3.4. +- Values MUST be OTel-attribute-compatible scalars (`str` / `int` / + `float` / `bool`) or homogeneous arrays of those. + +Violations raise `ValueError` at the boundary. Failing loud at +construction is better than the bare-key silently clobbering a +spec-reserved key in flat Langfuse `trace.metadata`. + +## When this is the right pattern + +- One service runs the same graph for many distinct callers + (multi-tenant SaaS, per-customer feature flags, A/B test + cohorts). +- Downstream observability needs to filter or join on caller-side + identifiers (tenant ID for billing dashboards, request ID for + cross-service trace stitching, feature flag for experiment + analysis). +- You don't want each node to know about tenancy. The metadata + flows through the framework, not the node bodies. + +## When it isn't + +- The identifier is a per-node decision, not a per-invocation one. + If different nodes in the same invocation produce different + values, that's typed state, not invocation metadata. Put it on + the `State` schema with a clear reducer. +- The value isn't a scalar or homogeneous array. The boundary + validation rejects complex shapes; if you need to attach a nested + object, serialize it to a JSON string before passing. +- The value contains PII you don't want in every span. Metadata is + unconditionally emitted everywhere the observers run; filter at + the caller or skip the propagation for those keys. + +## Cross-references + +- [Observability concept page](../concepts/observability.md): how + OTel attributes and Langfuse metadata propagate. +- [`examples/10-langfuse-observability`](../examples/10-langfuse-observability.md): + runnable example exercising the metadata propagation path. +- Spec: [observability](https://openarmature.org/capabilities/observability/), + the propagation contract for caller-supplied metadata. diff --git a/docs/patterns/index.md b/docs/patterns/index.md index 00e5741..e42be87 100644 --- a/docs/patterns/index.md +++ b/docs/patterns/index.md @@ -33,3 +33,12 @@ docs composing existing primitives. - [Bypass if output exists](bypass-if-output-exists.md) — short-circuit a node whose external output already exists, via middleware. +- [State migration on resume](state-migration-on-resume.md) — let + older in-flight checkpoints resume against an evolved state + schema without each node body having to handle multiple shapes. +- [Caller-supplied trace identifiers](caller-supplied-trace-identifiers.md) + — propagate tenant ID / request ID / feature flags into every + observability span via `invoke(metadata=...)`. +- [Custom observer: reconciling started → completed pairs](observer-state-reconciliation.md) + — thread per-call state between paired events using a per- + invocation dict keyed on the spec's uniqueness tuple. diff --git a/docs/patterns/observer-state-reconciliation.md b/docs/patterns/observer-state-reconciliation.md new file mode 100644 index 0000000..d802402 --- /dev/null +++ b/docs/patterns/observer-state-reconciliation.md @@ -0,0 +1,135 @@ +# Custom observer: reconciling started → completed pairs + +**Problem.** A custom observer needs to thread per-call state +between a node's `started` and `completed` events: measure +duration, capture request/response payloads, attach a custom ID +that downstream uses. The engine doesn't carry a correlation field +across the pair (it doesn't need one for its own logic, since +events arrive serially per spec §6). How does the observer +reconcile which `completed` matches which `started`? + +## Approach + +The pair identity is the tuple +`(namespace, branch_name, attempt_index, fan_out_index)`. That +tuple is unique within an invocation: the namespace separates +subgraph wrappers from their parents, `branch_name` distinguishes +parallel-branches branches, `attempt_index` distinguishes retried +attempts of the same node, and `fan_out_index` distinguishes +per-instance fan-out copies. Carry per-invocation state in +`dict[invocation_id, dict[tuple, value]]`, look up on `completed`, +and sweep the outer entry when the per-invocation sub-dict +empties. + +Both `branch_name` and `fan_out_index` matter even for nodes that +look "the same" by name: a node `score` inside parallel-branches +`branch=fast` vs `branch=slow` produces two distinct pair +identities, and a per-instance fan-out copy at `fan_out_index=3` is +not the same as `fan_out_index=4`. + +## Snippet + +```python +import time +from typing import NamedTuple + +from openarmature.graph import NodeEvent +from openarmature.observability.correlation import current_invocation_id + + +PairKey = tuple[tuple[str, ...], str | None, int, int | None] + + +class StepTiming(NamedTuple): + node_name: str + namespace: tuple[str, ...] + branch_name: str | None + attempt_index: int + fan_out_index: int | None + duration_s: float + + +class StepTimingObserver: + """Custom observer that records wall-clock duration per node + attempt. Stitches started -> completed via the per-invocation + pair-identity dict. + """ + + def __init__(self) -> None: + # invocation_id -> {pair_key: start_monotonic} + self._pending: dict[str, dict[PairKey, float]] = {} + # Final per-call timings, surfaced to whatever consumes them + # (metrics exporter, log line, in-test assertion). + self.timings: list[StepTiming] = [] + + async def __call__(self, event: NodeEvent) -> None: + invocation_id = current_invocation_id() + if invocation_id is None: + return + + key: PairKey = ( + event.namespace, + event.branch_name, + event.attempt_index, + event.fan_out_index, + ) + + if event.phase == "started": + self._pending.setdefault(invocation_id, {})[key] = time.monotonic() + return + + if event.phase == "completed": + start = self._pending.get(invocation_id, {}).pop(key, None) + if start is not None: + self.timings.append( + StepTiming( + node_name=event.node_name, + namespace=event.namespace, + branch_name=event.branch_name, + attempt_index=event.attempt_index, + fan_out_index=event.fan_out_index, + duration_s=time.monotonic() - start, + ) + ) + # Sweep when the dict empties for this invocation. + if not self._pending.get(invocation_id): + self._pending.pop(invocation_id, None) +``` + +Attach with `graph.attach_observer(StepTimingObserver())`. Run +the invocation; the observer's `timings` list carries one entry +per node attempt with its duration and identifying tuple. + +## When this is the right pattern + +- A custom observer needs paired-event state that the spec doesn't + carry across the pair. +- The pair identity needs to be unique across fan-out instances or + parallel-branches branches; a key shape that omits `branch_name` + or `fan_out_index` would collide. +- Long-running services need the dict to drain naturally as + invocations complete. The "sweep when sub-dict empties" pattern + prevents the outer dict from growing per-invocation forever. + +## When it isn't + +- You only need a final-summary signal at invocation completion. + Subscribe to the invocation `completed` event and read the final + state directly; no per-call reconciliation needed. +- The `OTelObserver` or `LangfuseObserver` already provides what + you want. Both stitch `started` / `completed` internally to open + / close spans; you don't need a custom observer to track timings + if a span carries the duration already. +- The metric is cross-invocation. A pair-identity dict scoped to a + single invocation_id won't aggregate; use a global counter or + push to an external metrics backend instead. + +## Cross-references + +- [Observability concept page](../concepts/observability.md): the + `NodeEvent` shape, `started` / `completed` lifecycle. +- [Caller-supplied trace identifiers](caller-supplied-trace-identifiers.md): + adjacent pattern for tagging the events your observer sees. +- Spec: [graph-engine](https://openarmature.org/capabilities/graph-engine/), + observer events and the uniqueness invariants for + `(namespace, branch_name, attempt_index, fan_out_index)`. diff --git a/docs/patterns/state-migration-on-resume.md b/docs/patterns/state-migration-on-resume.md new file mode 100644 index 0000000..188fc5e --- /dev/null +++ b/docs/patterns/state-migration-on-resume.md @@ -0,0 +1,130 @@ +# State migration on resume + +**Problem.** A long-running pipeline has saved checkpoints +mid-flight. You add a field to the state schema and rename another. +How do older checkpoints resume against the new schema without +each node body having to handle both shapes? + +## Approach + +Tag the state class with a `schema_version` and register migration +callables at compile time via `GraphBuilder.with_state_migration`. +On resume, the engine inspects the loaded record's `schema_version`, +walks the registered chain (v1 → v2 → v3 → …), and hands node +bodies a fully-migrated state object. Node code stays single-shape; +all version-aware logic lives in the migration functions. + +The migration callable's typed signature is `Callable[[Any], Any]`. +For JSON-backed checkpointers (the only kind that supports +migration; see [Checkpointing](../concepts/checkpointing.md)), +that resolves to `(state_dict: dict) -> dict`: the callable +receives the deserialized record and returns the new shape. The +`from_version` and `to_version` are registered alongside the +callable on `with_state_migration`; the callable itself stays +signature-light because migrations MUST be pure (no implicit +version-dispatch logic inside the function body). The engine +dispatches a `checkpoint_migrated` observer event after each +migration step so OTel / Langfuse spans can correlate the migration +with the resume. + +## Snippet + +```python +from typing import ClassVar + +from openarmature.checkpoint import SQLiteCheckpointer +from openarmature.graph import END, GraphBuilder, State + + +# v2 schema: renamed `step_count` -> `steps_completed` and added +# `last_node`. Old v1 checkpoints carry `step_count` and lack +# `last_node` entirely. +class PipelineState(State): + schema_version: ClassVar[str] = "2" + + query: str = "" + steps_completed: int = 0 + last_node: str | None = None + + +def _migrate_v1_to_v2(state_dict: dict) -> dict: + # Rename: step_count -> steps_completed. Default missing + # last_node to None (the v2 schema allows it). + state_dict["steps_completed"] = state_dict.pop("step_count", 0) + state_dict.setdefault("last_node", None) + return state_dict + + +async def _step(s: PipelineState) -> dict: + return {"steps_completed": s.steps_completed + 1, "last_node": "step"} + + +# ``serialization="json"`` is required for migration to operate on a +# dict; the default ``"pickle"`` mode round-trips through class +# identity and can't migrate across schemas. +compiled = ( + GraphBuilder(PipelineState) + .add_node("step", _step) + .add_edge("step", END) + .set_entry("step") + .with_checkpointer(SQLiteCheckpointer("ck.db", serialization="json")) + .with_state_migration("1", "2", _migrate_v1_to_v2) + .compile() +) + +# Later, on resume: +# final = await compiled.invoke( +# PipelineState(), # overwritten by the loaded checkpoint +# resume_invocation=prior_invocation_id, +# ) +``` + +When the chain spans multiple versions (v1 → v2 → v3), register +each step separately with repeated `with_state_migration` calls; +the engine walks them in version order. Gaps fail loudly: if v1→v2 +and v3→v4 are registered but a record loads at v2 needing v3, the +engine raises `CheckpointStateMigrationMissing` at resume time +rather than silently using a partial schema. + +## When this is the right pattern + +- A schema change lands while in-flight checkpoints exist. Without + migrations, those resume attempts would fail validation at the + state-merge boundary. +- The change is shape-altering (rename, type change, field + add/remove) rather than purely additive with a safe default. A + bare field add with a Pydantic default doesn't need migration; + Pydantic fills it in on load. +- You want resume to be transparent to node bodies. Migrations let + each node body assume the current schema unconditionally. + +## When it isn't + +- Adding a field with a safe default and NOT bumping + `schema_version`. Pydantic's default handling resolves the missing + field at load. Bumping `schema_version` without a corresponding + migration is fail-loud: the engine raises + `CheckpointStateMigrationMissing` at resume rather than silently + skipping. If you bump the version, register an identity migration + (a callable that returns the dict unchanged) to make the additive + intent explicit. +- Migrations need to call the LLM or do other slow / fallible work. + The migration runs synchronously during resume; long-running work + belongs in a dedicated `recompute` node guarded by + [bypass-if-output-exists](bypass-if-output-exists.md), not in a + migration callable. +- Schema changes are happening on every release. Migration + callables accumulate fast; if the cadence is high enough that + v1→v2→v3→…→v9 starts to feel like a chain, consider whether the + schema would benefit from being more open at the seams (e.g. a + `metadata: dict[str, Any]` field for evolving auxiliary data + instead of dedicated columns). + +## Cross-references + +- [Checkpointing concept page](../concepts/checkpointing.md): + checkpointer backends and the resume contract. +- [`session-as-checkpoint-resume`](session-as-checkpoint-resume.md): + multi-turn agent state via the same checkpointer machinery. +- Spec: [pipeline-utilities](https://openarmature.org/capabilities/pipeline-utilities/), + the state-migration contract and `checkpoint_migrated` event. diff --git a/mkdocs.yml b/mkdocs.yml index 006bf0e..1432f15 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -147,6 +147,9 @@ nav: - Tool dispatch as node: patterns/tool-dispatch-as-node.md - Session as checkpoint resume: patterns/session-as-checkpoint-resume.md - Bypass if output exists: patterns/bypass-if-output-exists.md + - State migration on resume: patterns/state-migration-on-resume.md + - Caller-supplied trace identifiers: patterns/caller-supplied-trace-identifiers.md + - Custom observer reconciliation: patterns/observer-state-reconciliation.md extra: # Hide the "Made with Material for MkDocs" footer. diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index 21e6985..c38e119 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -514,6 +514,281 @@ This pattern is explicitly called out in proposal 0008's *Alternatives considered* section as a userland recipe rather than spec'd behavior — this page is its canonical home. +### Caller-supplied trace identifiers + +**Problem.** A service runs the same graph for many tenants / +requests / feature flag cohorts. How do you tag every span and +trace so downstream observability (Honeycomb, Datadog, Langfuse, +HyperDX, Grafana Tempo) can filter by tenant or join across +services without each node having to thread the identifiers +through manually? + +#### Approach + +Pass a `metadata` dict to `invoke()`. The framework propagates each +entry to every observability backend at once: the OTel observer +emits each entry as an `openarmature.user.` cross-cutting span +attribute on every span (invocation, node, subgraph wrapper, +fan-out instance, LLM provider), and the Langfuse observer merges +each entry as a top-level key into `trace.metadata` AND every +observation's metadata. Backends that consume OTel attributes pick +the entries up for free; backends with typed metadata fields get +them via per-backend propagation. + +For metadata that's only known mid-flight (an ID resolved by an +LLM-classification node, a derived feature flag), use +`set_invocation_metadata` from inside a node. The augmentation +respects fan-out / parallel-branches per-instance scoping per +proposal 0045, so each instance's update lives in its own +async-context copy and doesn't leak to siblings. + +#### Snippet + +```python +import asyncio + +from openarmature.graph import END, GraphBuilder, State +from openarmature.observability import set_invocation_metadata + + +class RequestState(State): + query: str = "" + answer: str = "" + + +async def answer(s: RequestState) -> dict: + # An entry resolved mid-invocation propagates to subsequent spans + # in the same async-context: this node's `completed`, the LLM + # provider span if any, and onwards. Sibling fan-out instances + # and parallel-branches branches see their own copies. + set_invocation_metadata(modelTier="standard") + return {"answer": "Apollo 13 aborted due to an O2 tank failure."} + + +graph = ( + GraphBuilder(RequestState) + .add_node("answer", answer) + .add_edge("answer", END) + .set_entry("answer") + .compile() +) + + +async def main() -> None: + final = await graph.invoke( + RequestState(query="why did Apollo 13 abort?"), + metadata={ + "tenantId": "acme-corp", + "requestId": "req-12345", + "featureFlag": "v2-canary", + }, + ) + print(final.answer) + + +asyncio.run(main()) +``` + +Every span emitted during this `invoke()` carries +`openarmature.user.tenantId="acme-corp"`, +`openarmature.user.requestId="req-12345"`, and +`openarmature.user.featureFlag="v2-canary"`. Spans inside the +`answer` node (and any downstream nodes if the graph had more) +additionally carry `openarmature.user.modelTier="standard"` from +the `set_invocation_metadata` call. + +#### Boundary validation + +Validation runs synchronously, before any node body fires. Both +`invoke(metadata=...)` and `set_invocation_metadata(...)` enforce +the same rules: + +- Keys MUST NOT start with `openarmature.` or `gen_ai.` (reserved + namespaces per the spec). +- Keys MUST NOT collide with the spec's reserved per-trace metadata + keys (`correlation_id`, `entry_node`, `spec_version`, etc.). The + set is enforced at the `invoke()` and `set_invocation_metadata` + boundaries via the validator in + `openarmature.observability.metadata`; it grows per spec proposals + 0041 / 0042, with the canonical list in the spec's observability + §3.4. +- Values MUST be OTel-attribute-compatible scalars (`str` / `int` / + `float` / `bool`) or homogeneous arrays of those. + +Violations raise `ValueError` at the boundary. Failing loud at +construction is better than the bare-key silently clobbering a +spec-reserved key in flat Langfuse `trace.metadata`. + +#### When this is the right pattern + +- One service runs the same graph for many distinct callers + (multi-tenant SaaS, per-customer feature flags, A/B test + cohorts). +- Downstream observability needs to filter or join on caller-side + identifiers (tenant ID for billing dashboards, request ID for + cross-service trace stitching, feature flag for experiment + analysis). +- You don't want each node to know about tenancy. The metadata + flows through the framework, not the node bodies. + +#### When it isn't + +- The identifier is a per-node decision, not a per-invocation one. + If different nodes in the same invocation produce different + values, that's typed state, not invocation metadata. Put it on + the `State` schema with a clear reducer. +- The value isn't a scalar or homogeneous array. The boundary + validation rejects complex shapes; if you need to attach a nested + object, serialize it to a JSON string before passing. +- The value contains PII you don't want in every span. Metadata is + unconditionally emitted everywhere the observers run; filter at + the caller or skip the propagation for those keys. + +#### Cross-references + +- [Observability concept page](https://openarmature.ai/concepts/observability/): how + OTel attributes and Langfuse metadata propagate. +- [`examples/10-langfuse-observability`](https://openarmature.ai/examples/10-langfuse-observability/): + runnable example exercising the metadata propagation path. +- Spec: [observability](https://openarmature.org/capabilities/observability/), + the propagation contract for caller-supplied metadata. + +### Custom observer: reconciling started → completed pairs + +**Problem.** A custom observer needs to thread per-call state +between a node's `started` and `completed` events: measure +duration, capture request/response payloads, attach a custom ID +that downstream uses. The engine doesn't carry a correlation field +across the pair (it doesn't need one for its own logic, since +events arrive serially per spec §6). How does the observer +reconcile which `completed` matches which `started`? + +#### Approach + +The pair identity is the tuple +`(namespace, branch_name, attempt_index, fan_out_index)`. That +tuple is unique within an invocation: the namespace separates +subgraph wrappers from their parents, `branch_name` distinguishes +parallel-branches branches, `attempt_index` distinguishes retried +attempts of the same node, and `fan_out_index` distinguishes +per-instance fan-out copies. Carry per-invocation state in +`dict[invocation_id, dict[tuple, value]]`, look up on `completed`, +and sweep the outer entry when the per-invocation sub-dict +empties. + +Both `branch_name` and `fan_out_index` matter even for nodes that +look "the same" by name: a node `score` inside parallel-branches +`branch=fast` vs `branch=slow` produces two distinct pair +identities, and a per-instance fan-out copy at `fan_out_index=3` is +not the same as `fan_out_index=4`. + +#### Snippet + +```python +import time +from typing import NamedTuple + +from openarmature.graph import NodeEvent +from openarmature.observability.correlation import current_invocation_id + + +PairKey = tuple[tuple[str, ...], str | None, int, int | None] + + +class StepTiming(NamedTuple): + node_name: str + namespace: tuple[str, ...] + branch_name: str | None + attempt_index: int + fan_out_index: int | None + duration_s: float + + +class StepTimingObserver: + """Custom observer that records wall-clock duration per node + attempt. Stitches started -> completed via the per-invocation + pair-identity dict. + """ + + def __init__(self) -> None: + # invocation_id -> {pair_key: start_monotonic} + self._pending: dict[str, dict[PairKey, float]] = {} + # Final per-call timings, surfaced to whatever consumes them + # (metrics exporter, log line, in-test assertion). + self.timings: list[StepTiming] = [] + + async def __call__(self, event: NodeEvent) -> None: + invocation_id = current_invocation_id() + if invocation_id is None: + return + + key: PairKey = ( + event.namespace, + event.branch_name, + event.attempt_index, + event.fan_out_index, + ) + + if event.phase == "started": + self._pending.setdefault(invocation_id, {})[key] = time.monotonic() + return + + if event.phase == "completed": + start = self._pending.get(invocation_id, {}).pop(key, None) + if start is not None: + self.timings.append( + StepTiming( + node_name=event.node_name, + namespace=event.namespace, + branch_name=event.branch_name, + attempt_index=event.attempt_index, + fan_out_index=event.fan_out_index, + duration_s=time.monotonic() - start, + ) + ) + # Sweep when the dict empties for this invocation. + if not self._pending.get(invocation_id): + self._pending.pop(invocation_id, None) +``` + +Attach with `graph.attach_observer(StepTimingObserver())`. Run +the invocation; the observer's `timings` list carries one entry +per node attempt with its duration and identifying tuple. + +#### When this is the right pattern + +- A custom observer needs paired-event state that the spec doesn't + carry across the pair. +- The pair identity needs to be unique across fan-out instances or + parallel-branches branches; a key shape that omits `branch_name` + or `fan_out_index` would collide. +- Long-running services need the dict to drain naturally as + invocations complete. The "sweep when sub-dict empties" pattern + prevents the outer dict from growing per-invocation forever. + +#### When it isn't + +- You only need a final-summary signal at invocation completion. + Subscribe to the invocation `completed` event and read the final + state directly; no per-call reconciliation needed. +- The `OTelObserver` or `LangfuseObserver` already provides what + you want. Both stitch `started` / `completed` internally to open + / close spans; you don't need a custom observer to track timings + if a span carries the duration already. +- The metric is cross-invocation. A pair-identity dict scoped to a + single invocation_id won't aggregate; use a global counter or + push to an external metrics backend instead. + +#### Cross-references + +- [Observability concept page](https://openarmature.ai/concepts/observability/): the + `NodeEvent` shape, `started` / `completed` lifecycle. +- [Caller-supplied trace identifiers](#caller-supplied-trace-identifiers): + adjacent pattern for tagging the events your observer sees. +- Spec: [graph-engine](https://openarmature.org/capabilities/graph-engine/), + observer events and the uniqueness invariants for + `(namespace, branch_name, attempt_index, fan_out_index)`. + ### Parameterized entry point **Problem.** How do I start the graph at an arbitrary node? @@ -730,6 +1005,137 @@ state and the session table holds the join keys. single-resume baseline. - Spec: [pipeline-utilities](https://openarmature.org/capabilities/pipeline-utilities/) +### State migration on resume + +**Problem.** A long-running pipeline has saved checkpoints +mid-flight. You add a field to the state schema and rename another. +How do older checkpoints resume against the new schema without +each node body having to handle both shapes? + +#### Approach + +Tag the state class with a `schema_version` and register migration +callables at compile time via `GraphBuilder.with_state_migration`. +On resume, the engine inspects the loaded record's `schema_version`, +walks the registered chain (v1 → v2 → v3 → …), and hands node +bodies a fully-migrated state object. Node code stays single-shape; +all version-aware logic lives in the migration functions. + +The migration callable's typed signature is `Callable[[Any], Any]`. +For JSON-backed checkpointers (the only kind that supports +migration; see [Checkpointing](https://openarmature.ai/concepts/checkpointing/)), +that resolves to `(state_dict: dict) -> dict`: the callable +receives the deserialized record and returns the new shape. The +`from_version` and `to_version` are registered alongside the +callable on `with_state_migration`; the callable itself stays +signature-light because migrations MUST be pure (no implicit +version-dispatch logic inside the function body). The engine +dispatches a `checkpoint_migrated` observer event after each +migration step so OTel / Langfuse spans can correlate the migration +with the resume. + +#### Snippet + +```python +from typing import ClassVar + +from openarmature.checkpoint import SQLiteCheckpointer +from openarmature.graph import END, GraphBuilder, State + + +### v2 schema: renamed `step_count` -> `steps_completed` and added +### `last_node`. Old v1 checkpoints carry `step_count` and lack +### `last_node` entirely. +class PipelineState(State): + schema_version: ClassVar[str] = "2" + + query: str = "" + steps_completed: int = 0 + last_node: str | None = None + + +def _migrate_v1_to_v2(state_dict: dict) -> dict: + # Rename: step_count -> steps_completed. Default missing + # last_node to None (the v2 schema allows it). + state_dict["steps_completed"] = state_dict.pop("step_count", 0) + state_dict.setdefault("last_node", None) + return state_dict + + +async def _step(s: PipelineState) -> dict: + return {"steps_completed": s.steps_completed + 1, "last_node": "step"} + + +### ``serialization="json"`` is required for migration to operate on a +### dict; the default ``"pickle"`` mode round-trips through class +### identity and can't migrate across schemas. +compiled = ( + GraphBuilder(PipelineState) + .add_node("step", _step) + .add_edge("step", END) + .set_entry("step") + .with_checkpointer(SQLiteCheckpointer("ck.db", serialization="json")) + .with_state_migration("1", "2", _migrate_v1_to_v2) + .compile() +) + +### Later, on resume: +### final = await compiled.invoke( +### PipelineState(), # overwritten by the loaded checkpoint +### resume_invocation=prior_invocation_id, +### ) +``` + +When the chain spans multiple versions (v1 → v2 → v3), register +each step separately with repeated `with_state_migration` calls; +the engine walks them in version order. Gaps fail loudly: if v1→v2 +and v3→v4 are registered but a record loads at v2 needing v3, the +engine raises `CheckpointStateMigrationMissing` at resume time +rather than silently using a partial schema. + +#### When this is the right pattern + +- A schema change lands while in-flight checkpoints exist. Without + migrations, those resume attempts would fail validation at the + state-merge boundary. +- The change is shape-altering (rename, type change, field + add/remove) rather than purely additive with a safe default. A + bare field add with a Pydantic default doesn't need migration; + Pydantic fills it in on load. +- You want resume to be transparent to node bodies. Migrations let + each node body assume the current schema unconditionally. + +#### When it isn't + +- Adding a field with a safe default and NOT bumping + `schema_version`. Pydantic's default handling resolves the missing + field at load. Bumping `schema_version` without a corresponding + migration is fail-loud: the engine raises + `CheckpointStateMigrationMissing` at resume rather than silently + skipping. If you bump the version, register an identity migration + (a callable that returns the dict unchanged) to make the additive + intent explicit. +- Migrations need to call the LLM or do other slow / fallible work. + The migration runs synchronously during resume; long-running work + belongs in a dedicated `recompute` node guarded by + [bypass-if-output-exists](#bypass-if-output-exists), not in a + migration callable. +- Schema changes are happening on every release. Migration + callables accumulate fast; if the cadence is high enough that + v1→v2→v3→…→v9 starts to feel like a chain, consider whether the + schema would benefit from being more open at the seams (e.g. a + `metadata: dict[str, Any]` field for evolving auxiliary data + instead of dedicated columns). + +#### Cross-references + +- [Checkpointing concept page](https://openarmature.ai/concepts/checkpointing/): + checkpointer backends and the resume contract. +- [`session-as-checkpoint-resume`](#session-as-checkpoint-resume): + multi-turn agent state via the same checkpointer machinery. +- Spec: [pipeline-utilities](https://openarmature.org/capabilities/pipeline-utilities/), + the state-migration contract and `checkpoint_migrated` event. + ### Tool dispatch as node **Problem.** How do I run an agent tool-call loop? @@ -988,37 +1394,6 @@ Different classes, same OTel-Logs export path. If both are attached against the Catching `Exception` works but is too broad; catching one hierarchy misses the other two. If you want to branch on category strings (e.g., for retry logic), catch the relevant base — `RuntimeGraphError` covers all five spec runtime categories, `LlmProviderError` covers all nine provider categories, `CheckpointError` covers all six checkpoint categories. The `TRANSIENT_CATEGORIES` frozenset in `openarmature.llm` enumerates which provider categories are retriable. -### Reconcile `started` → `completed` pairs via a per-invocation dict keyed on `(namespace, branch_name, attempt_index, fan_out_index)` - -Observers receive `started` and `completed` events as a pair per node attempt, but the engine doesn't carry a `step_id`-like correlation field across the pair (it doesn't need one for its own logic — the events arrive serially per spec §6). Observer code that needs to thread per-call state — start timestamps, request payloads, custom IDs — between the two events has to reconcile manually. - -The pair identity is `(namespace, branch_name, attempt_index, fan_out_index)`: that tuple is unique within an invocation (per graph-engine §6 uniqueness invariants — `branch_name` and `fan_out_index` are independent slots, so a node inside a parallel-branches branch needs `branch_name` in the key to avoid colliding with the same-named node in a sibling branch). Carry per-invocation state in a `dict[invocation_id, dict[tuple, value]]` and look up on `completed`: - -```python -class StepTimingObserver: - def __init__(self) -> None: - # invocation_id -> {(namespace, branch_name, attempt_index, fan_out_index): start_ts} - self._pending: dict[str, dict[tuple[Any, ...], float]] = {} - - async def __call__(self, event: NodeEvent) -> None: - invocation_id = current_invocation_id() - if invocation_id is None: - return - key = (event.namespace, event.branch_name, event.attempt_index, event.fan_out_index) - if event.phase == "started": - self._pending.setdefault(invocation_id, {})[key] = time.monotonic() - elif event.phase == "completed": - start = self._pending.get(invocation_id, {}).pop(key, None) - if start is not None: - duration = time.monotonic() - start - # … emit timing - # Sweep when the dict empties (last completed for this invocation). - if not self._pending.get(invocation_id): - self._pending.pop(invocation_id, None) -``` - -The `_pending[invocation_id]` sub-dict naturally tracks in-flight pairs and drains as completions arrive. Sweep the outer entry when the sub-dict empties so long-running services don't accumulate per-invocation entries. If you also subscribe to drain events, that's another sweep opportunity. The same pattern works for any per-call state the observer needs to thread across the pair. - ### Filter `openarmature.*`-namespaced events when your observer only cares about user nodes OA emits observer events under sentinel node-names for its own internal dispatch: `openarmature.llm.complete` for LLM provider calls (proposal 0024), `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014), `openarmature.checkpoint.save` for checkpoint saves (proposal 0010). These events let the OTel / Langfuse observers emit LLM-provider spans, checkpoint-migrate spans, etc. — but a custom observer that only cares about user-defined node activity sees them as noise: @@ -1033,37 +1408,6 @@ async def __call__(self, event: NodeEvent) -> None: `event.namespace[0]` is the safest discriminator (the leaf `event.node_name` would also work for LLM events but won't match the checkpoint sentinels since those repurpose `node_name` differently). Don't try to filter on `current_invocation_id() is None` — OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract. -### A `with_state_migration` recipe — register migrations alongside the state class, run on resume - -`GraphBuilder.with_state_migration(s)` registers callables that transform an old-schema state record into the current schema. The engine calls them automatically on `invoke(resume_invocation=...)` when the loaded record's `schema_version` doesn't match `state_cls.schema_version`. The migration callable's signature is `(state_dict: dict, from_version: str, to_version: str) -> dict`; it receives the raw deserialized record and returns the new shape. - -Wire it up at compile time: - -```python -class PipelineState(State): - schema_version: ClassVar[str] = "2" - # … v2 fields - -def _migrate_v1_to_v2(state_dict: dict, from_version: str, to_version: str) -> dict: - # Old field "step_count" renamed to "steps_completed" in v2. - state_dict["steps_completed"] = state_dict.pop("step_count", 0) - return state_dict - -compiled = ( - GraphBuilder(PipelineState) - .add_node("step", _step_body) - .add_edge("step", END) - .set_entry("step") - .with_state_migration(from_version="1", to_version="2", migrate=_migrate_v1_to_v2) - .compile() -) -compiled.attach_checkpointer(checkpointer) -``` - -Important detail: the migration runs once on resume, before any node body fires; the engine dispatches a synthetic `checkpoint_migrated` observer event (per spec §6 cross-ref) so observers can emit a migration span. The migrated state is what `_step_body` sees on resume — you do NOT need to handle both v1 and v2 shapes in node bodies. - -When chaining multiple migrations (v1 → v2 → v3), register each step separately via repeated `with_state_migration` calls; the engine walks the chain in version order. If the chain has gaps (registered v1→v2 and v3→v4 but a record is at v2 with `to_version="4"`), the engine raises `CheckpointStateMigrationMissing` at resume time — fail-loud rather than silently skipping. - ### Fan-out subgraphs that emit `list[X]` per instance produce `list[list[X]]` at `target_field` When a fan-out's per-instance state collects a `list[X]` as its `collect_field` (e.g., each instance produces 0..N records), the engine's contribution step is `[s[cfg.collect_field] for s in successes]` — every instance's value becomes one element of the outer list. With `list[X]` per-instance, the parent receives `list[list[X]]`, and the default `append` reducer on the parent's `Annotated[list[X], append]` field preserves the nesting verbatim. Pydantic then fails to validate each `list[X]` element against `X`: @@ -1106,35 +1450,6 @@ Single-record-per-instance fan-outs (`collect_field: str`, parent field `Annotat If a parent field is populated by BOTH direct node writes AND fan-out collection, that's an architectural ambiguity worth fixing upstream — split into two fields, or pick one path. -### `invoke(metadata=...)` for caller-supplied trace identifiers (tenant IDs, request IDs, feature flags) - -Per spec observability §3.4 / proposal 0034, callers attach arbitrary key/value entries at `invoke()` time and the framework propagates them to every observability backend: - -```python -await compiled.invoke( - initial_state, - metadata={"tenantId": "acme-corp", "requestId": "req-12345", "featureFlag": "v2-canary"}, -) -``` - -The OTel observer emits each entry as an `openarmature.user.` cross-cutting span attribute on every span (invocation, node, subgraph wrapper, fan-out instance, LLM provider). The Langfuse observer merges each entry as a top-level key into `trace.metadata` AND every observation's metadata. Backends that consume OTel attributes (Honeycomb, Datadog APM, HyperDX, Grafana Tempo) pick the entries up for free; backends with typed metadata fields (Langfuse) get them via the per-backend propagation rule. - -Boundary validation runs synchronously: keys MUST NOT start with `openarmature.` or `gen_ai.` (reserved namespaces); values MUST be OTel-attribute-compatible scalars (`str` / `int` / `float` / `bool`) or homogeneous arrays of those. Violations raise `ValueError` before any work begins. - -Mid-invocation augmentation via the public helper: - -```python -from openarmature.observability import set_invocation_metadata - -async def my_node(state: MyState) -> dict: - set_invocation_metadata(productId=state.product_id) - # subsequent spans (this node's completed, next node's started, - # any LLM call inside, etc.) carry productId - return {"score": await compute_score(state)} -``` - -The augmentation respects fan-out / parallel-branches per-instance scoping — each instance's augmentation lives in its own Context copy and doesn't leak to siblings. Sequential nodes in the same engine task see prior nodes' augmentations forward. The helper validates the same rules as the `invoke()` boundary. - ## Example index _Runnable example programs shipped in the source tree at `examples/`. The full code is not bundled here (each example is 300+ lines); read the file at the listed path to see the canonical shape for that use case._ diff --git a/src/openarmature/_patterns/caller-supplied-trace-identifiers.md b/src/openarmature/_patterns/caller-supplied-trace-identifiers.md new file mode 100644 index 0000000..faac79d --- /dev/null +++ b/src/openarmature/_patterns/caller-supplied-trace-identifiers.md @@ -0,0 +1,138 @@ +# Caller-supplied trace identifiers + +**Problem.** A service runs the same graph for many tenants / +requests / feature flag cohorts. How do you tag every span and +trace so downstream observability (Honeycomb, Datadog, Langfuse, +HyperDX, Grafana Tempo) can filter by tenant or join across +services without each node having to thread the identifiers +through manually? + +## Approach + +Pass a `metadata` dict to `invoke()`. The framework propagates each +entry to every observability backend at once: the OTel observer +emits each entry as an `openarmature.user.` cross-cutting span +attribute on every span (invocation, node, subgraph wrapper, +fan-out instance, LLM provider), and the Langfuse observer merges +each entry as a top-level key into `trace.metadata` AND every +observation's metadata. Backends that consume OTel attributes pick +the entries up for free; backends with typed metadata fields get +them via per-backend propagation. + +For metadata that's only known mid-flight (an ID resolved by an +LLM-classification node, a derived feature flag), use +`set_invocation_metadata` from inside a node. The augmentation +respects fan-out / parallel-branches per-instance scoping per +proposal 0045, so each instance's update lives in its own +async-context copy and doesn't leak to siblings. + +## Snippet + +```python +import asyncio + +from openarmature.graph import END, GraphBuilder, State +from openarmature.observability import set_invocation_metadata + + +class RequestState(State): + query: str = "" + answer: str = "" + + +async def answer(s: RequestState) -> dict: + # An entry resolved mid-invocation propagates to subsequent spans + # in the same async-context: this node's `completed`, the LLM + # provider span if any, and onwards. Sibling fan-out instances + # and parallel-branches branches see their own copies. + set_invocation_metadata(modelTier="standard") + return {"answer": "Apollo 13 aborted due to an O2 tank failure."} + + +graph = ( + GraphBuilder(RequestState) + .add_node("answer", answer) + .add_edge("answer", END) + .set_entry("answer") + .compile() +) + + +async def main() -> None: + final = await graph.invoke( + RequestState(query="why did Apollo 13 abort?"), + metadata={ + "tenantId": "acme-corp", + "requestId": "req-12345", + "featureFlag": "v2-canary", + }, + ) + print(final.answer) + + +asyncio.run(main()) +``` + +Every span emitted during this `invoke()` carries +`openarmature.user.tenantId="acme-corp"`, +`openarmature.user.requestId="req-12345"`, and +`openarmature.user.featureFlag="v2-canary"`. Spans inside the +`answer` node (and any downstream nodes if the graph had more) +additionally carry `openarmature.user.modelTier="standard"` from +the `set_invocation_metadata` call. + +## Boundary validation + +Validation runs synchronously, before any node body fires. Both +`invoke(metadata=...)` and `set_invocation_metadata(...)` enforce +the same rules: + +- Keys MUST NOT start with `openarmature.` or `gen_ai.` (reserved + namespaces per the spec). +- Keys MUST NOT collide with the spec's reserved per-trace metadata + keys (`correlation_id`, `entry_node`, `spec_version`, etc.). The + set is enforced at the `invoke()` and `set_invocation_metadata` + boundaries via the validator in + `openarmature.observability.metadata`; it grows per spec proposals + 0041 / 0042, with the canonical list in the spec's observability + §3.4. +- Values MUST be OTel-attribute-compatible scalars (`str` / `int` / + `float` / `bool`) or homogeneous arrays of those. + +Violations raise `ValueError` at the boundary. Failing loud at +construction is better than the bare-key silently clobbering a +spec-reserved key in flat Langfuse `trace.metadata`. + +## When this is the right pattern + +- One service runs the same graph for many distinct callers + (multi-tenant SaaS, per-customer feature flags, A/B test + cohorts). +- Downstream observability needs to filter or join on caller-side + identifiers (tenant ID for billing dashboards, request ID for + cross-service trace stitching, feature flag for experiment + analysis). +- You don't want each node to know about tenancy. The metadata + flows through the framework, not the node bodies. + +## When it isn't + +- The identifier is a per-node decision, not a per-invocation one. + If different nodes in the same invocation produce different + values, that's typed state, not invocation metadata. Put it on + the `State` schema with a clear reducer. +- The value isn't a scalar or homogeneous array. The boundary + validation rejects complex shapes; if you need to attach a nested + object, serialize it to a JSON string before passing. +- The value contains PII you don't want in every span. Metadata is + unconditionally emitted everywhere the observers run; filter at + the caller or skip the propagation for those keys. + +## Cross-references + +- [Observability concept page](https://openarmature.ai/concepts/observability/): how + OTel attributes and Langfuse metadata propagate. +- [`examples/10-langfuse-observability`](https://openarmature.ai/examples/10-langfuse-observability/): + runnable example exercising the metadata propagation path. +- Spec: [observability](https://openarmature.org/capabilities/observability/), + the propagation contract for caller-supplied metadata. diff --git a/src/openarmature/_patterns/observer-state-reconciliation.md b/src/openarmature/_patterns/observer-state-reconciliation.md new file mode 100644 index 0000000..2c95099 --- /dev/null +++ b/src/openarmature/_patterns/observer-state-reconciliation.md @@ -0,0 +1,135 @@ +# Custom observer: reconciling started → completed pairs + +**Problem.** A custom observer needs to thread per-call state +between a node's `started` and `completed` events: measure +duration, capture request/response payloads, attach a custom ID +that downstream uses. The engine doesn't carry a correlation field +across the pair (it doesn't need one for its own logic, since +events arrive serially per spec §6). How does the observer +reconcile which `completed` matches which `started`? + +## Approach + +The pair identity is the tuple +`(namespace, branch_name, attempt_index, fan_out_index)`. That +tuple is unique within an invocation: the namespace separates +subgraph wrappers from their parents, `branch_name` distinguishes +parallel-branches branches, `attempt_index` distinguishes retried +attempts of the same node, and `fan_out_index` distinguishes +per-instance fan-out copies. Carry per-invocation state in +`dict[invocation_id, dict[tuple, value]]`, look up on `completed`, +and sweep the outer entry when the per-invocation sub-dict +empties. + +Both `branch_name` and `fan_out_index` matter even for nodes that +look "the same" by name: a node `score` inside parallel-branches +`branch=fast` vs `branch=slow` produces two distinct pair +identities, and a per-instance fan-out copy at `fan_out_index=3` is +not the same as `fan_out_index=4`. + +## Snippet + +```python +import time +from typing import NamedTuple + +from openarmature.graph import NodeEvent +from openarmature.observability.correlation import current_invocation_id + + +PairKey = tuple[tuple[str, ...], str | None, int, int | None] + + +class StepTiming(NamedTuple): + node_name: str + namespace: tuple[str, ...] + branch_name: str | None + attempt_index: int + fan_out_index: int | None + duration_s: float + + +class StepTimingObserver: + """Custom observer that records wall-clock duration per node + attempt. Stitches started -> completed via the per-invocation + pair-identity dict. + """ + + def __init__(self) -> None: + # invocation_id -> {pair_key: start_monotonic} + self._pending: dict[str, dict[PairKey, float]] = {} + # Final per-call timings, surfaced to whatever consumes them + # (metrics exporter, log line, in-test assertion). + self.timings: list[StepTiming] = [] + + async def __call__(self, event: NodeEvent) -> None: + invocation_id = current_invocation_id() + if invocation_id is None: + return + + key: PairKey = ( + event.namespace, + event.branch_name, + event.attempt_index, + event.fan_out_index, + ) + + if event.phase == "started": + self._pending.setdefault(invocation_id, {})[key] = time.monotonic() + return + + if event.phase == "completed": + start = self._pending.get(invocation_id, {}).pop(key, None) + if start is not None: + self.timings.append( + StepTiming( + node_name=event.node_name, + namespace=event.namespace, + branch_name=event.branch_name, + attempt_index=event.attempt_index, + fan_out_index=event.fan_out_index, + duration_s=time.monotonic() - start, + ) + ) + # Sweep when the dict empties for this invocation. + if not self._pending.get(invocation_id): + self._pending.pop(invocation_id, None) +``` + +Attach with `graph.attach_observer(StepTimingObserver())`. Run +the invocation; the observer's `timings` list carries one entry +per node attempt with its duration and identifying tuple. + +## When this is the right pattern + +- A custom observer needs paired-event state that the spec doesn't + carry across the pair. +- The pair identity needs to be unique across fan-out instances or + parallel-branches branches; a key shape that omits `branch_name` + or `fan_out_index` would collide. +- Long-running services need the dict to drain naturally as + invocations complete. The "sweep when sub-dict empties" pattern + prevents the outer dict from growing per-invocation forever. + +## When it isn't + +- You only need a final-summary signal at invocation completion. + Subscribe to the invocation `completed` event and read the final + state directly; no per-call reconciliation needed. +- The `OTelObserver` or `LangfuseObserver` already provides what + you want. Both stitch `started` / `completed` internally to open + / close spans; you don't need a custom observer to track timings + if a span carries the duration already. +- The metric is cross-invocation. A pair-identity dict scoped to a + single invocation_id won't aggregate; use a global counter or + push to an external metrics backend instead. + +## Cross-references + +- [Observability concept page](https://openarmature.ai/concepts/observability/): the + `NodeEvent` shape, `started` / `completed` lifecycle. +- [Caller-supplied trace identifiers](https://openarmature.ai/patterns/caller-supplied-trace-identifiers/): + adjacent pattern for tagging the events your observer sees. +- Spec: [graph-engine](https://openarmature.org/capabilities/graph-engine/), + observer events and the uniqueness invariants for + `(namespace, branch_name, attempt_index, fan_out_index)`. diff --git a/src/openarmature/_patterns/state-migration-on-resume.md b/src/openarmature/_patterns/state-migration-on-resume.md new file mode 100644 index 0000000..2b1f45c --- /dev/null +++ b/src/openarmature/_patterns/state-migration-on-resume.md @@ -0,0 +1,130 @@ +# State migration on resume + +**Problem.** A long-running pipeline has saved checkpoints +mid-flight. You add a field to the state schema and rename another. +How do older checkpoints resume against the new schema without +each node body having to handle both shapes? + +## Approach + +Tag the state class with a `schema_version` and register migration +callables at compile time via `GraphBuilder.with_state_migration`. +On resume, the engine inspects the loaded record's `schema_version`, +walks the registered chain (v1 → v2 → v3 → …), and hands node +bodies a fully-migrated state object. Node code stays single-shape; +all version-aware logic lives in the migration functions. + +The migration callable's typed signature is `Callable[[Any], Any]`. +For JSON-backed checkpointers (the only kind that supports +migration; see [Checkpointing](https://openarmature.ai/concepts/checkpointing/)), +that resolves to `(state_dict: dict) -> dict`: the callable +receives the deserialized record and returns the new shape. The +`from_version` and `to_version` are registered alongside the +callable on `with_state_migration`; the callable itself stays +signature-light because migrations MUST be pure (no implicit +version-dispatch logic inside the function body). The engine +dispatches a `checkpoint_migrated` observer event after each +migration step so OTel / Langfuse spans can correlate the migration +with the resume. + +## Snippet + +```python +from typing import ClassVar + +from openarmature.checkpoint import SQLiteCheckpointer +from openarmature.graph import END, GraphBuilder, State + + +# v2 schema: renamed `step_count` -> `steps_completed` and added +# `last_node`. Old v1 checkpoints carry `step_count` and lack +# `last_node` entirely. +class PipelineState(State): + schema_version: ClassVar[str] = "2" + + query: str = "" + steps_completed: int = 0 + last_node: str | None = None + + +def _migrate_v1_to_v2(state_dict: dict) -> dict: + # Rename: step_count -> steps_completed. Default missing + # last_node to None (the v2 schema allows it). + state_dict["steps_completed"] = state_dict.pop("step_count", 0) + state_dict.setdefault("last_node", None) + return state_dict + + +async def _step(s: PipelineState) -> dict: + return {"steps_completed": s.steps_completed + 1, "last_node": "step"} + + +# ``serialization="json"`` is required for migration to operate on a +# dict; the default ``"pickle"`` mode round-trips through class +# identity and can't migrate across schemas. +compiled = ( + GraphBuilder(PipelineState) + .add_node("step", _step) + .add_edge("step", END) + .set_entry("step") + .with_checkpointer(SQLiteCheckpointer("ck.db", serialization="json")) + .with_state_migration("1", "2", _migrate_v1_to_v2) + .compile() +) + +# Later, on resume: +# final = await compiled.invoke( +# PipelineState(), # overwritten by the loaded checkpoint +# resume_invocation=prior_invocation_id, +# ) +``` + +When the chain spans multiple versions (v1 → v2 → v3), register +each step separately with repeated `with_state_migration` calls; +the engine walks them in version order. Gaps fail loudly: if v1→v2 +and v3→v4 are registered but a record loads at v2 needing v3, the +engine raises `CheckpointStateMigrationMissing` at resume time +rather than silently using a partial schema. + +## When this is the right pattern + +- A schema change lands while in-flight checkpoints exist. Without + migrations, those resume attempts would fail validation at the + state-merge boundary. +- The change is shape-altering (rename, type change, field + add/remove) rather than purely additive with a safe default. A + bare field add with a Pydantic default doesn't need migration; + Pydantic fills it in on load. +- You want resume to be transparent to node bodies. Migrations let + each node body assume the current schema unconditionally. + +## When it isn't + +- Adding a field with a safe default and NOT bumping + `schema_version`. Pydantic's default handling resolves the missing + field at load. Bumping `schema_version` without a corresponding + migration is fail-loud: the engine raises + `CheckpointStateMigrationMissing` at resume rather than silently + skipping. If you bump the version, register an identity migration + (a callable that returns the dict unchanged) to make the additive + intent explicit. +- Migrations need to call the LLM or do other slow / fallible work. + The migration runs synchronously during resume; long-running work + belongs in a dedicated `recompute` node guarded by + [bypass-if-output-exists](https://openarmature.ai/patterns/bypass-if-output-exists/), not in a + migration callable. +- Schema changes are happening on every release. Migration + callables accumulate fast; if the cadence is high enough that + v1→v2→v3→…→v9 starts to feel like a chain, consider whether the + schema would benefit from being more open at the seams (e.g. a + `metadata: dict[str, Any]` field for evolving auxiliary data + instead of dedicated columns). + +## Cross-references + +- [Checkpointing concept page](https://openarmature.ai/concepts/checkpointing/): + checkpointer backends and the resume contract. +- [`session-as-checkpoint-resume`](https://openarmature.ai/patterns/session-as-checkpoint-resume/): + multi-turn agent state via the same checkpointer machinery. +- Spec: [pipeline-utilities](https://openarmature.org/capabilities/pipeline-utilities/), + the state-migration contract and `checkpoint_migrated` event. diff --git a/tests/unit/test_patterns_api.py b/tests/unit/test_patterns_api.py index 950fc77..44f2811 100644 --- a/tests/unit/test_patterns_api.py +++ b/tests/unit/test_patterns_api.py @@ -16,13 +16,16 @@ def test_list_returns_known_pattern_slugs() -> None: names = patterns.list() - # Exact set of seed patterns shipped from ``docs/patterns/``. + # Exact set of patterns shipped from ``docs/patterns/``. # If a new pattern lands, update this list deliberately — # silent additions mask scope expansion. assert names == [ "bypass-if-output-exists", + "caller-supplied-trace-identifiers", + "observer-state-reconciliation", "parameterized-entry-point", "session-as-checkpoint-resume", + "state-migration-on-resume", "tool-dispatch-as-node", ]