Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

### Added

- **Three new patterns docs.** `docs/patterns/state-migration-on-resume.md`, `docs/patterns/caller-supplied-trace-identifiers.md`, and `docs/patterns/observer-state-reconciliation.md` graduate the corresponding entries from `docs/agent/non-obvious-shapes.md` into full pattern recipes with code snippets and "when this is right / when it isn't" guidance. The programmatic patterns API (`openarmature.patterns.list()` / `get(name)`) grows from 4 to 7 entries.
- **HyperDX OTel integration test path and "Production swap" docs in example 03.** `examples/03-observer-hooks/main.py`'s module docstring grows a "Production swap" section showing how to substitute the demo's `SimpleSpanProcessor` + `ConsoleSpanExporter` for `BatchSpanProcessor` + `OTLPSpanExporter` pointed at HyperDX (or any other OTLP-HTTP collector). A new opt-in integration test (`tests/integration/test_otel_hyperdx_export.py`, gated by `HYPERDX_API_KEY` + `HYPERDX_OTLP_ENDPOINT` env vars and `@pytest.mark.integration`) drives the same production export path end-to-end against a live endpoint. `opentelemetry-exporter-otlp-proto-http` lands as a dev-only dep; not promoted to a public extras group yet.

### Changed (breaking)
Expand Down
91 changes: 0 additions & 91 deletions docs/agent/non-obvious-shapes.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,37 +125,6 @@ Different classes, same OTel-Logs export path. If both are attached against the

Catching `Exception` works but is too broad; catching one hierarchy misses the other two. If you want to branch on category strings (e.g., for retry logic), catch the relevant base — `RuntimeGraphError` covers all five spec runtime categories, `LlmProviderError` covers all nine provider categories, `CheckpointError` covers all six checkpoint categories. The `TRANSIENT_CATEGORIES` frozenset in `openarmature.llm` enumerates which provider categories are retriable.

### Reconcile `started` → `completed` pairs via a per-invocation dict keyed on `(namespace, branch_name, attempt_index, fan_out_index)`

Observers receive `started` and `completed` events as a pair per node attempt, but the engine doesn't carry a `step_id`-like correlation field across the pair (it doesn't need one for its own logic — the events arrive serially per spec §6). Observer code that needs to thread per-call state — start timestamps, request payloads, custom IDs — between the two events has to reconcile manually.

The pair identity is `(namespace, branch_name, attempt_index, fan_out_index)`: that tuple is unique within an invocation (per graph-engine §6 uniqueness invariants — `branch_name` and `fan_out_index` are independent slots, so a node inside a parallel-branches branch needs `branch_name` in the key to avoid colliding with the same-named node in a sibling branch). Carry per-invocation state in a `dict[invocation_id, dict[tuple, value]]` and look up on `completed`:

```python
class StepTimingObserver:
def __init__(self) -> None:
# invocation_id -> {(namespace, branch_name, attempt_index, fan_out_index): start_ts}
self._pending: dict[str, dict[tuple[Any, ...], float]] = {}

async def __call__(self, event: NodeEvent) -> None:
invocation_id = current_invocation_id()
if invocation_id is None:
return
key = (event.namespace, event.branch_name, event.attempt_index, event.fan_out_index)
if event.phase == "started":
self._pending.setdefault(invocation_id, {})[key] = time.monotonic()
elif event.phase == "completed":
start = self._pending.get(invocation_id, {}).pop(key, None)
if start is not None:
duration = time.monotonic() - start
# … emit timing
# Sweep when the dict empties (last completed for this invocation).
if not self._pending.get(invocation_id):
self._pending.pop(invocation_id, None)
```

The `_pending[invocation_id]` sub-dict naturally tracks in-flight pairs and drains as completions arrive. Sweep the outer entry when the sub-dict empties so long-running services don't accumulate per-invocation entries. If you also subscribe to drain events, that's another sweep opportunity. The same pattern works for any per-call state the observer needs to thread across the pair.

### Filter `openarmature.*`-namespaced events when your observer only cares about user nodes

OA emits observer events under sentinel node-names for its own internal dispatch: `openarmature.llm.complete` for LLM provider calls (proposal 0024), `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014), `openarmature.checkpoint.save` for checkpoint saves (proposal 0010). These events let the OTel / Langfuse observers emit LLM-provider spans, checkpoint-migrate spans, etc. — but a custom observer that only cares about user-defined node activity sees them as noise:
Expand All @@ -170,37 +139,6 @@ async def __call__(self, event: NodeEvent) -> None:

`event.namespace[0]` is the safest discriminator (the leaf `event.node_name` would also work for LLM events but won't match the checkpoint sentinels since those repurpose `node_name` differently). Don't try to filter on `current_invocation_id() is None` — OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract.

### A `with_state_migration` recipe — register migrations alongside the state class, run on resume

`GraphBuilder.with_state_migration(s)` registers callables that transform an old-schema state record into the current schema. The engine calls them automatically on `invoke(resume_invocation=...)` when the loaded record's `schema_version` doesn't match `state_cls.schema_version`. The migration callable's signature is `(state_dict: dict, from_version: str, to_version: str) -> dict`; it receives the raw deserialized record and returns the new shape.

Wire it up at compile time:

```python
class PipelineState(State):
schema_version: ClassVar[str] = "2"
# … v2 fields

def _migrate_v1_to_v2(state_dict: dict, from_version: str, to_version: str) -> dict:
# Old field "step_count" renamed to "steps_completed" in v2.
state_dict["steps_completed"] = state_dict.pop("step_count", 0)
return state_dict

compiled = (
GraphBuilder(PipelineState)
.add_node("step", _step_body)
.add_edge("step", END)
.set_entry("step")
.with_state_migration(from_version="1", to_version="2", migrate=_migrate_v1_to_v2)
.compile()
)
compiled.attach_checkpointer(checkpointer)
```

Important detail: the migration runs once on resume, before any node body fires; the engine dispatches a synthetic `checkpoint_migrated` observer event (per spec §6 cross-ref) so observers can emit a migration span. The migrated state is what `_step_body` sees on resume — you do NOT need to handle both v1 and v2 shapes in node bodies.

When chaining multiple migrations (v1 → v2 → v3), register each step separately via repeated `with_state_migration` calls; the engine walks the chain in version order. If the chain has gaps (registered v1→v2 and v3→v4 but a record is at v2 with `to_version="4"`), the engine raises `CheckpointStateMigrationMissing` at resume time — fail-loud rather than silently skipping.

### Fan-out subgraphs that emit `list[X]` per instance produce `list[list[X]]` at `target_field`

When a fan-out's per-instance state collects a `list[X]` as its `collect_field` (e.g., each instance produces 0..N records), the engine's contribution step is `[s[cfg.collect_field] for s in successes]` — every instance's value becomes one element of the outer list. With `list[X]` per-instance, the parent receives `list[list[X]]`, and the default `append` reducer on the parent's `Annotated[list[X], append]` field preserves the nesting verbatim. Pydantic then fails to validate each `list[X]` element against `X`:
Expand Down Expand Up @@ -242,32 +180,3 @@ class PipelineState(State):
Single-record-per-instance fan-outs (`collect_field: str`, parent field `Annotated[list[X], append]`) don't hit this — the engine still wraps each instance's value as one element, but `append` flattens it correctly since each element is already an `X`. The two non-flat shapes emerge only when the per-instance value is itself a container: a `list[X]` per instance lands `list[list[X]]` (use `concat_flatten`), and a `dict[str, X]` per instance lands `list[dict]` (use `merge_all`).

If a parent field is populated by BOTH direct node writes AND fan-out collection, that's an architectural ambiguity worth fixing upstream — split into two fields, or pick one path.

### `invoke(metadata=...)` for caller-supplied trace identifiers (tenant IDs, request IDs, feature flags)

Per spec observability §3.4 / proposal 0034, callers attach arbitrary key/value entries at `invoke()` time and the framework propagates them to every observability backend:

```python
await compiled.invoke(
initial_state,
metadata={"tenantId": "acme-corp", "requestId": "req-12345", "featureFlag": "v2-canary"},
)
```

The OTel observer emits each entry as an `openarmature.user.<key>` cross-cutting span attribute on every span (invocation, node, subgraph wrapper, fan-out instance, LLM provider). The Langfuse observer merges each entry as a top-level key into `trace.metadata` AND every observation's metadata. Backends that consume OTel attributes (Honeycomb, Datadog APM, HyperDX, Grafana Tempo) pick the entries up for free; backends with typed metadata fields (Langfuse) get them via the per-backend propagation rule.

Boundary validation runs synchronously: keys MUST NOT start with `openarmature.` or `gen_ai.` (reserved namespaces); values MUST be OTel-attribute-compatible scalars (`str` / `int` / `float` / `bool`) or homogeneous arrays of those. Violations raise `ValueError` before any work begins.

Mid-invocation augmentation via the public helper:

```python
from openarmature.observability import set_invocation_metadata

async def my_node(state: MyState) -> dict:
set_invocation_metadata(productId=state.product_id)
# subsequent spans (this node's completed, next node's started,
# any LLM call inside, etc.) carry productId
return {"score": await compute_score(state)}
```

The augmentation respects fan-out / parallel-branches per-instance scoping — each instance's augmentation lives in its own Context copy and doesn't leak to siblings. Sequential nodes in the same engine task see prior nodes' augmentations forward. The helper validates the same rules as the `invoke()` boundary.
138 changes: 138 additions & 0 deletions docs/patterns/caller-supplied-trace-identifiers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Caller-supplied trace identifiers

**Problem.** A service runs the same graph for many tenants /
requests / feature flag cohorts. How do you tag every span and
trace so downstream observability (Honeycomb, Datadog, Langfuse,
HyperDX, Grafana Tempo) can filter by tenant or join across
services without each node having to thread the identifiers
through manually?

## Approach

Pass a `metadata` dict to `invoke()`. The framework propagates each
entry to every observability backend at once: the OTel observer
emits each entry as an `openarmature.user.<key>` cross-cutting span
attribute on every span (invocation, node, subgraph wrapper,
fan-out instance, LLM provider), and the Langfuse observer merges
each entry as a top-level key into `trace.metadata` AND every
observation's metadata. Backends that consume OTel attributes pick
the entries up for free; backends with typed metadata fields get
them via per-backend propagation.

For metadata that's only known mid-flight (an ID resolved by an
LLM-classification node, a derived feature flag), use
`set_invocation_metadata` from inside a node. The augmentation
respects fan-out / parallel-branches per-instance scoping per
proposal 0045, so each instance's update lives in its own
async-context copy and doesn't leak to siblings.

## Snippet

```python
import asyncio

from openarmature.graph import END, GraphBuilder, State
from openarmature.observability import set_invocation_metadata


class RequestState(State):
query: str = ""
answer: str = ""


async def answer(s: RequestState) -> dict:
# An entry resolved mid-invocation propagates to subsequent spans
# in the same async-context: this node's `completed`, the LLM
# provider span if any, and onwards. Sibling fan-out instances
# and parallel-branches branches see their own copies.
set_invocation_metadata(modelTier="standard")
return {"answer": "Apollo 13 aborted due to an O2 tank failure."}


graph = (
GraphBuilder(RequestState)
.add_node("answer", answer)
.add_edge("answer", END)
.set_entry("answer")
.compile()
)


async def main() -> None:
final = await graph.invoke(
RequestState(query="why did Apollo 13 abort?"),
metadata={
"tenantId": "acme-corp",
"requestId": "req-12345",
"featureFlag": "v2-canary",
},
)
print(final.answer)


asyncio.run(main())
```

Every span emitted during this `invoke()` carries
`openarmature.user.tenantId="acme-corp"`,
`openarmature.user.requestId="req-12345"`, and
`openarmature.user.featureFlag="v2-canary"`. Spans inside the
`answer` node (and any downstream nodes if the graph had more)
additionally carry `openarmature.user.modelTier="standard"` from
the `set_invocation_metadata` call.

## Boundary validation

Validation runs synchronously, before any node body fires. Both
`invoke(metadata=...)` and `set_invocation_metadata(...)` enforce
the same rules:

- Keys MUST NOT start with `openarmature.` or `gen_ai.` (reserved
namespaces per the spec).
- Keys MUST NOT collide with the spec's reserved per-trace metadata
keys (`correlation_id`, `entry_node`, `spec_version`, etc.). The
set is enforced at the `invoke()` and `set_invocation_metadata`
boundaries via the validator in
`openarmature.observability.metadata`; it grows per spec proposals
0041 / 0042, with the canonical list in the spec's observability
§3.4.
- Values MUST be OTel-attribute-compatible scalars (`str` / `int` /
Comment thread
chris-colinsky marked this conversation as resolved.
`float` / `bool`) or homogeneous arrays of those.

Violations raise `ValueError` at the boundary. Failing loud at
construction is better than the bare-key silently clobbering a
spec-reserved key in flat Langfuse `trace.metadata`.

## When this is the right pattern

- One service runs the same graph for many distinct callers
(multi-tenant SaaS, per-customer feature flags, A/B test
cohorts).
- Downstream observability needs to filter or join on caller-side
identifiers (tenant ID for billing dashboards, request ID for
cross-service trace stitching, feature flag for experiment
analysis).
- You don't want each node to know about tenancy. The metadata
flows through the framework, not the node bodies.

## When it isn't

- The identifier is a per-node decision, not a per-invocation one.
If different nodes in the same invocation produce different
values, that's typed state, not invocation metadata. Put it on
the `State` schema with a clear reducer.
- The value isn't a scalar or homogeneous array. The boundary
validation rejects complex shapes; if you need to attach a nested
object, serialize it to a JSON string before passing.
- The value contains PII you don't want in every span. Metadata is
unconditionally emitted everywhere the observers run; filter at
the caller or skip the propagation for those keys.

## Cross-references

- [Observability concept page](../concepts/observability.md): how
OTel attributes and Langfuse metadata propagate.
- [`examples/10-langfuse-observability`](../examples/10-langfuse-observability.md):
runnable example exercising the metadata propagation path.
- Spec: [observability](https://openarmature.org/capabilities/observability/),
the propagation contract for caller-supplied metadata.
9 changes: 9 additions & 0 deletions docs/patterns/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,12 @@ docs composing existing primitives.
- [Bypass if output exists](bypass-if-output-exists.md) —
short-circuit a node whose external output already exists, via
middleware.
- [State migration on resume](state-migration-on-resume.md) — let
older in-flight checkpoints resume against an evolved state
schema without each node body having to handle multiple shapes.
- [Caller-supplied trace identifiers](caller-supplied-trace-identifiers.md)
— propagate tenant ID / request ID / feature flags into every
observability span via `invoke(metadata=...)`.
- [Custom observer: reconciling started → completed pairs](observer-state-reconciliation.md)
— thread per-call state between paired events using a per-
invocation dict keyed on the spec's uniqueness tuple.
Loading