diff --git a/crates/adaptive/src/acg/request_surfaces/mod.rs b/crates/adaptive/src/acg/request_surfaces/mod.rs index 94f3bc71..f8c85727 100644 --- a/crates/adaptive/src/acg/request_surfaces/mod.rs +++ b/crates/adaptive/src/acg/request_surfaces/mod.rs @@ -16,6 +16,7 @@ pub(crate) mod openai_responses; use std::collections::HashSet; use nemo_relay::api::llm::LlmRequest; +use nemo_relay::codec::resolve::{ProviderSurface, detect_request_surface}; use serde_json::Value; use crate::acg::prompt_ir::PromptIR; @@ -40,6 +41,17 @@ pub(crate) trait RequestSurfaceApplier: Send + Sync { } impl RequestSurface { + /// Map a core [`ProviderSurface`] to the adaptive request surface. + /// + /// Bridges the deliberate casing difference (`OpenAiChat` -> `OpenAIChat`). + fn from_provider_surface(surface: ProviderSurface) -> Self { + match surface { + ProviderSurface::OpenAiChat => Self::OpenAIChat, + ProviderSurface::OpenAiResponses => Self::OpenAIResponses, + ProviderSurface::AnthropicMessages => Self::AnthropicMessages, + } + } + pub(crate) fn supports_provider(self, provider: &str) -> bool { match provider { "anthropic" => matches!(self, Self::AnthropicMessages), @@ -71,17 +83,16 @@ impl RequestSurface { pub(crate) fn resolve_request_surface_from_request( request: &LlmRequest, ) -> crate::acg::Result { - if request.content.get("input").is_some() || request.content.get("instructions").is_some() { - Ok(RequestSurface::OpenAIResponses) - } else if request.content.get("system").is_some() { - Ok(RequestSurface::AnthropicMessages) - } else if request.content.get("messages").is_some() { - Ok(RequestSurface::OpenAIChat) - } else { - Err(crate::acg::AcgError::Internal( - "unable to resolve request surface from request shape".to_string(), - )) - } + // Delegate request-shape detection to the core codec layer (single source of + // truth, hoisted from this function's former body), then preserve the + // fail-closed contract: an unrecognized shape is an internal error. + detect_request_surface(&request.content) + .map(RequestSurface::from_provider_surface) + .ok_or_else(|| { + crate::acg::AcgError::Internal( + "unable to resolve request surface from request shape".to_string(), + ) + }) } #[cfg_attr(not(test), allow(dead_code))] diff --git a/crates/core/src/api/event.rs b/crates/core/src/api/event.rs index fb73eb8a..9acb4b7c 100644 --- a/crates/core/src/api/event.rs +++ b/crates/core/src/api/event.rs @@ -15,6 +15,7 @@ //! Event types for Agent Trajectory Observability Format (ATOF) runtime events. +use std::borrow::Cow; use std::collections::BTreeMap; use std::sync::Arc; @@ -23,10 +24,11 @@ use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use uuid::Uuid; -use crate::api::llm::LlmAttributes; +use crate::api::llm::{LlmAttributes, LlmRequest}; use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType}; use crate::api::tool::ToolAttributes; use crate::codec::request::AnnotatedLlmRequest; +use crate::codec::resolve; use crate::codec::response::AnnotatedLlmResponse; use crate::json::Json; @@ -609,6 +611,43 @@ impl Event { .and_then(|profile| profile.annotated_response.as_ref()) } + /// Return the normalized LLM request, annotation-first. + /// + /// Prefers a codec-attached [`AnnotatedLlmRequest`] (borrowed) when present; + /// otherwise best-effort decodes the start-event input payload through + /// [`crate::codec::resolve::normalize_request`] (owned). This is the single + /// preferred path for consumers that want normalized request data without + /// reimplementing provider parsing; only genuinely non-provider payloads + /// should fall through to a consumer-local lenient parse. + /// + /// # Returns + /// `Some` borrowed annotation, `Some` owned best-effort decode, or `None`. + #[must_use] + pub fn normalized_llm_request(&self) -> Option> { + if let Some(annotated) = self.annotated_request() { + return Some(Cow::Borrowed(annotated.as_ref())); + } + let request: LlmRequest = serde_json::from_value(self.input()?.clone()).ok()?; + resolve::normalize_request(&request, None).map(Cow::Owned) + } + + /// Return the normalized LLM response, annotation-first. + /// + /// Prefers a codec-attached [`AnnotatedLlmResponse`] (borrowed) when present; + /// otherwise best-effort decodes the end-event output payload through + /// [`crate::codec::resolve::normalize_response`] (owned). Counterpart to + /// [`Event::normalized_llm_request`]. + /// + /// # Returns + /// `Some` borrowed annotation, `Some` owned best-effort decode, or `None`. + #[must_use] + pub fn normalized_llm_response(&self) -> Option> { + if let Some(annotated) = self.annotated_response() { + return Some(Cow::Borrowed(annotated.as_ref())); + } + resolve::normalize_response(self.output()?, None).map(Cow::Owned) + } + /// Return true for scope-start events. /// /// # Returns diff --git a/crates/core/src/codec/mod.rs b/crates/core/src/codec/mod.rs index 20653c14..aa495343 100644 --- a/crates/core/src/codec/mod.rs +++ b/crates/core/src/codec/mod.rs @@ -16,6 +16,7 @@ pub mod openai_chat; pub mod openai_responses; pub mod pricing; pub mod request; +pub mod resolve; pub mod response; pub mod streaming; pub mod traits; diff --git a/crates/core/src/codec/openai_responses.rs b/crates/core/src/codec/openai_responses.rs index f2661eef..9890d553 100644 --- a/crates/core/src/codec/openai_responses.rs +++ b/crates/core/src/codec/openai_responses.rs @@ -192,6 +192,13 @@ fn collect_output_item( .unwrap_or("") { "message" => collect_message_text_parts(item, text_parts), + // Top-level `output_text` output items (some providers/proxies emit + // these as siblings of `message` items rather than nested blocks). + "output_text" => { + if let Some(text) = output_text_block(item) { + text_parts.push(text); + } + } "function_call" => tool_calls.push(parse_function_call(item)), _ => {} } @@ -244,6 +251,17 @@ fn message_from_text_parts(text_parts: Vec) -> Option { } } +/// Top-level `output_text` convenience field, a flattened aggregate of the +/// response text. Used only as a fallback when the structured `output` items +/// yield no message text, so structured content takes precedence. +fn top_level_output_text(response: &Json) -> Option { + response + .get("output_text") + .and_then(|value| value.as_str()) + .filter(|text| !text.is_empty()) + .map(|text| MessageContent::Text(text.to_string())) +} + fn optional_vec(items: Vec) -> Option> { (!items.is_empty()).then_some(items) } @@ -456,7 +474,8 @@ impl LlmResponseCodec for OpenAIResponsesCodec { let all_output_items = raw.output.clone(); let (text_parts, tool_calls) = collect_output_parts(raw.output.as_deref()); - let message = message_from_text_parts(text_parts); + let message = + message_from_text_parts(text_parts).or_else(|| top_level_output_text(response)); let tool_calls = optional_vec(tool_calls); // Map finish reason from status + incomplete_details. diff --git a/crates/core/src/codec/resolve.rs b/crates/core/src/codec/resolve.rs new file mode 100644 index 00000000..6262998d --- /dev/null +++ b/crates/core/src/codec/resolve.rs @@ -0,0 +1,171 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Provider-surface detection and best-effort normalization. +//! +//! This module is the single preferred entry point for turning raw provider +//! JSON into the normalized [`AnnotatedLlmRequest`] / [`AnnotatedLlmResponse`] +//! when a caller does not already hold a codec-attached annotation. It detects +//! which built-in provider surface a payload matches and decodes it through the +//! matching built-in codec ([`OpenAIChatCodec`], [`OpenAIResponsesCodec`], +//! [`AnthropicMessagesCodec`]). +//! +//! # Preferred path +//! +//! Exporters and plugins should prefer an annotation already attached to the +//! event (see [`crate::api::event::Event::annotated_response`]) and fall back to +//! [`normalize_response`] / [`normalize_request`] here, rather than re-parsing +//! provider JSON by hand. Only genuinely non-provider/manual payloads should use +//! a consumer-local lenient fallback. +//! +//! # Detection is conservative +//! +//! [`detect_response_surface`] classifies a payload only when exactly one +//! built-in shape matches; ambiguous or unrecognized payloads return `None` so +//! callers fall through to their own handling. [`detect_request_surface`] uses +//! the same priority order as the request codecs (Responses before Anthropic +//! before Chat), mirroring adaptive's historical resolver. + +use crate::api::llm::LlmRequest; +use crate::json::Json; + +use super::anthropic::AnthropicMessagesCodec; +use super::openai_chat::OpenAIChatCodec; +use super::openai_responses::OpenAIResponsesCodec; +use super::request::AnnotatedLlmRequest; +use super::response::AnnotatedLlmResponse; +use super::traits::{LlmCodec, LlmResponseCodec}; + +/// A built-in provider request/response surface. +/// +/// The string forms returned by [`ProviderSurface::codec_name`] match the +/// `ApiSpecificResponse` serde tags (`"openai_chat"`, `"openai_responses"`, +/// `"anthropic_messages"`). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ProviderSurface { + /// OpenAI Chat Completions. + OpenAiChat, + /// OpenAI Responses. + OpenAiResponses, + /// Anthropic Messages. + AnthropicMessages, +} + +impl ProviderSurface { + /// Map a canonical codec name to a surface. + /// + /// Recognizes `"openai_chat"`, `"openai_responses"`, and + /// `"anthropic_messages"`; returns `None` for any other name. + #[must_use] + pub fn from_codec_name(name: &str) -> Option { + match name { + "openai_chat" => Some(Self::OpenAiChat), + "openai_responses" => Some(Self::OpenAiResponses), + "anthropic_messages" => Some(Self::AnthropicMessages), + _ => None, + } + } + + /// The canonical codec name for this surface. + #[must_use] + pub fn codec_name(self) -> &'static str { + match self { + Self::OpenAiChat => "openai_chat", + Self::OpenAiResponses => "openai_responses", + Self::AnthropicMessages => "anthropic_messages", + } + } +} + +/// Detect the request surface from a raw request body (the request `content`). +/// +/// Uses the same priority order as the request codecs: Responses +/// (`input`/`instructions`) before Anthropic (`system`) before Chat +/// (`messages`). Returns `None` when no key matches or `body` is not an object. +/// +/// This is the single source of truth for request-shape detection; adaptive's +/// surface resolver delegates here. +#[must_use] +pub fn detect_request_surface(body: &Json) -> Option { + let obj = body.as_object()?; + if obj.contains_key("input") || obj.contains_key("instructions") { + Some(ProviderSurface::OpenAiResponses) + } else if obj.contains_key("system") { + Some(ProviderSurface::AnthropicMessages) + } else if obj.contains_key("messages") { + Some(ProviderSurface::OpenAiChat) + } else { + None + } +} + +/// Detect the response surface from a raw provider response. +/// +/// Classifies strictly on top-level shape and returns `Some` only when exactly +/// one built-in surface matches: +/// - **Chat**: a `choices` array. +/// - **Responses**: an `output` array or an `output_text` field. +/// - **Anthropic**: `type == "message"` with a `content` array. +/// +/// Returns `None` for unrecognized payloads (e.g. `{}`), ambiguous payloads +/// (e.g. both `choices` and `output`), or non-objects. Strict gating is +/// deliberate: the built-in codecs accept minimal/empty objects, so decode +/// success alone is not a reliable classifier. +#[must_use] +pub fn detect_response_surface(raw: &Json) -> Option { + let obj = raw.as_object()?; + let is_chat = obj.get("choices").is_some_and(Json::is_array); + let is_responses = obj.get("output").is_some_and(Json::is_array) + || obj.get("output_text").is_some_and(Json::is_string); + let is_anthropic = obj.get("type").and_then(Json::as_str) == Some("message") + && obj.get("content").is_some_and(Json::is_array); + + match (is_chat, is_responses, is_anthropic) { + (true, false, false) => Some(ProviderSurface::OpenAiChat), + (false, true, false) => Some(ProviderSurface::OpenAiResponses), + (false, false, true) => Some(ProviderSurface::AnthropicMessages), + _ => None, + } +} + +/// Best-effort decode of a raw request into [`AnnotatedLlmRequest`]. +/// +/// Uses `hint` when provided, otherwise [`detect_request_surface`] on +/// `request.content`. Fails open: returns `None` when the surface is unknown or +/// the matching codec cannot decode the request. +#[must_use] +pub fn normalize_request( + request: &LlmRequest, + hint: Option, +) -> Option { + let surface = hint.or_else(|| detect_request_surface(&request.content))?; + match surface { + ProviderSurface::OpenAiChat => OpenAIChatCodec.decode(request), + ProviderSurface::OpenAiResponses => OpenAIResponsesCodec.decode(request), + ProviderSurface::AnthropicMessages => AnthropicMessagesCodec.decode(request), + } + .ok() +} + +/// Best-effort decode of a raw response into [`AnnotatedLlmResponse`]. +/// +/// Uses `hint` when provided, otherwise [`detect_response_surface`]. Fails open: +/// returns `None` when the surface is unknown or the matching codec cannot +/// decode the response. +#[must_use] +pub fn normalize_response( + raw: &Json, + hint: Option, +) -> Option { + let surface = hint.or_else(|| detect_response_surface(raw))?; + match surface { + ProviderSurface::OpenAiChat => OpenAIChatCodec.decode_response(raw), + ProviderSurface::OpenAiResponses => OpenAIResponsesCodec.decode_response(raw), + ProviderSurface::AnthropicMessages => AnthropicMessagesCodec.decode_response(raw), + } + .ok() +} + +#[cfg(test)] +#[path = "../../tests/unit/codec/resolve_tests.rs"] +mod tests; diff --git a/crates/core/src/codec/traits.rs b/crates/core/src/codec/traits.rs index 6fe5d189..9da3f10a 100644 --- a/crates/core/src/codec/traits.rs +++ b/crates/core/src/codec/traits.rs @@ -18,8 +18,9 @@ use super::response::AnnotatedLlmResponse; /// structured [`AnnotatedLlmRequest`]. /// /// Codecs are implemented by integration patches (LangChain, LangChain-NVIDIA, -/// LangGraph, etc.) since each SDK has its own request format. They are -/// registered by name in the global codec registry. +/// LangGraph, etc.) since each SDK has its own request format. A codec is +/// supplied per call by the caller; the built-in provider codecs can also be +/// selected from a raw payload via [`crate::codec::resolve`]. /// /// # Design /// diff --git a/crates/core/src/observability/atif.rs b/crates/core/src/observability/atif.rs index 4d4d1f31..ce4a9454 100644 --- a/crates/core/src/observability/atif.rs +++ b/crates/core/src/observability/atif.rs @@ -38,7 +38,8 @@ use uuid::Uuid; use crate::api::event::Event; use crate::api::runtime::EventSubscriberFn; use crate::api::subscriber::flush_subscribers; -use crate::codec::response::{Usage, estimate_cost_for_provider}; +use crate::codec::request::{AnnotatedLlmRequest, Message, MessageContent}; +use crate::codec::response::{AnnotatedLlmResponse, Usage, estimate_cost_for_provider}; use crate::error::Result; use crate::json::Json; @@ -1054,6 +1055,53 @@ fn extract_tool_calls(output: &Json) -> Option> { if calls.is_empty() { None } else { Some(calls) } } +// --------------------------------------------------------------------------- +// Annotation adapters +// +// When a managed/instrumented LLM call attaches a codec annotation to the +// event, ATIF reads the normalized message text from the +// `AnnotatedLlmRequest`/`AnnotatedLlmResponse` instead of re-parsing the raw +// provider payload. These adapters handle only what the normalized form models +// losslessly (plain text, and tool-only responses which render as `""`); for +// multimodal/content-part messages they return `None` so the caller falls back +// to the raw extractor, which preserves those shapes. Tool calls, metrics, and +// reasoning stay on the raw path, which preserves provider-specific extras +// (e.g. tool-call `provider_data`) that the normalized form does not model. +// --------------------------------------------------------------------------- + +/// ATIF user-step message from an annotated request. +/// +/// Returns the latest user message when it is plain text. Returns `None` for a +/// multimodal (content-part) message or when there is no user message, so the +/// caller falls back to the raw extractor (which preserves content parts). +fn atif_message_from_annotated_request(request: &AnnotatedLlmRequest) -> Option { + let content = request + .messages + .iter() + .rev() + .find_map(|message| match message { + Message::User { content, .. } => Some(content), + _ => None, + })?; + match content { + MessageContent::Text(text) => Some(Json::String(text.clone())), + MessageContent::Parts(_) => None, + } +} + +/// ATIF agent-step message from an annotated response. +/// +/// Plain text renders directly; a tool-only response (no message) renders as the +/// empty string, matching the raw extractor. Returns `None` for a multimodal +/// (content-part) message so the caller falls back to the raw extractor. +fn atif_message_from_annotated_response(response: &AnnotatedLlmResponse) -> Option { + match &response.message { + Some(MessageContent::Text(text)) => Some(Json::String(text.clone())), + Some(MessageContent::Parts(_)) => None, + None => Some(empty_message()), + } +} + fn tool_call_array(output: &Json) -> Option<&Vec> { output .as_object() @@ -2212,7 +2260,10 @@ impl StepConversionState { self.steps.push(AtifStep { step_id: 0, source: "user".to_string(), - message: extract_user_messages(&content), + message: event + .annotated_request() + .and_then(|request| atif_message_from_annotated_request(request)) + .unwrap_or_else(|| extract_user_messages(&content)), timestamp: Some(event.timestamp().to_rfc3339()), model_name: None, reasoning_effort: None, @@ -2232,6 +2283,8 @@ impl StepConversionState { let Some(output) = event.data() else { return; }; + // Tool calls stay on the raw path: the normalized form drops + // provider-specific extras that ATIF preserves (e.g. `provider_data`). let tool_calls = extract_tool_calls(output); let tool_call_order = refresh_tool_call_lookup(&mut self.last_tool_call_map, &tool_calls); let reasoning_effort = self.current_reasoning_effort.take(); @@ -2253,7 +2306,10 @@ impl StepConversionState { self.steps.push(AtifStep { step_id: 0, source: "agent".to_string(), - message: extract_llm_response_message(output), + message: event + .annotated_response() + .and_then(|response| atif_message_from_annotated_response(response)) + .unwrap_or_else(|| extract_llm_response_message(output)), timestamp: Some(event.timestamp().to_rfc3339()), model_name: event.model_name().map(ToOwned::to_owned), reasoning_effort, diff --git a/crates/core/src/observability/manual.rs b/crates/core/src/observability/manual.rs new file mode 100644 index 00000000..116223b7 --- /dev/null +++ b/crates/core/src/observability/manual.rs @@ -0,0 +1,229 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Shared best-effort extraction for **non-provider / manual** LLM output. +//! +//! Exporters prefer normalized provider data (a codec annotation, or +//! [`crate::codec::resolve`] detection). These helpers are the *fallback* for +//! payloads the codecs are not meant to parse — manually logged usage objects, +//! SDK aliases (`inputTokens`, `token_usage`, ...), and component cost objects. +//! They were previously duplicated in `otel.rs` and `openinference.rs`; this +//! module is the single shared copy. +//! +//! Currency policy differs by exporter and is therefore parameterized: +//! OpenTelemetry emits whatever currency it finds, OpenInference (and ATIF) keep +//! USD-only. Token totals use the strict policy (drop an inconsistent or absent +//! reported total) for both, which is safe because the only consumer of the +//! looser policy — OpenTelemetry cost estimation — uses prompt/completion +//! tokens, never the total. + +use serde_json::Map; + +use crate::codec::response::Usage; +use crate::json::Json; + +/// Extract the model name from a manually logged LLM output object. +pub(crate) fn model_name_from_manual_llm_output(output: Option<&Json>) -> Option<&str> { + output?.as_object()?.get("model").and_then(Json::as_str) +} + +/// Best-effort [`Usage`] from a manually logged LLM output object. +/// +/// Reads `usage` and `token_usage`, tolerating common token field-name aliases. +/// Returns `None` when neither object carries any recognized token count. +pub(crate) fn usage_from_manual_llm_output(output: Option<&Json>) -> Option { + let object = output?.as_object()?; + let usage = object.get("usage").and_then(Json::as_object); + let token_usage = object.get("token_usage").and_then(Json::as_object); + if usage.is_none() && token_usage.is_none() { + return None; + } + + let prompt_tokens = first_u64_from_manual_usage( + usage, + token_usage, + &["prompt_tokens", "input_tokens", "inputTokens", "input"], + ); + let completion_tokens = first_u64_from_manual_usage( + usage, + token_usage, + &[ + "completion_tokens", + "output_tokens", + "completionTokens", + "outputTokens", + "output", + ], + ); + let reported_total_tokens = first_u64_from_manual_usage( + usage, + token_usage, + &["total_tokens", "totalTokens", "total"], + ); + let cache_read_tokens = first_u64_from_manual_usage( + usage, + token_usage, + &[ + "cache_read_tokens", + "cached_tokens", + "cache_read_input_tokens", + "cacheReadTokens", + "cachedTokens", + "cacheReadInputTokens", + "cacheRead", + ], + ) + .or_else(|| { + first_nested_u64_from_manual_usage( + usage, + token_usage, + "input_tokens_details", + "cached_tokens", + ) + }) + .or_else(|| { + first_nested_u64_from_manual_usage( + usage, + token_usage, + "prompt_tokens_details", + "cached_tokens", + ) + }); + let cache_write_tokens = first_u64_from_manual_usage( + usage, + token_usage, + &[ + "cache_write_tokens", + "cache_creation_input_tokens", + "cacheWriteTokens", + "cacheCreationInputTokens", + "cacheWrite", + ], + ); + + if prompt_tokens.is_none() + && completion_tokens.is_none() + && reported_total_tokens.is_none() + && cache_read_tokens.is_none() + && cache_write_tokens.is_none() + { + return None; + } + let total_tokens = + normalize_total_tokens(reported_total_tokens, prompt_tokens, completion_tokens); + + Some(Usage { + prompt_tokens, + completion_tokens, + total_tokens, + cache_read_tokens, + cache_write_tokens, + cost: None, + }) +} + +/// Best-effort cost `(total, currency)` from a manually logged LLM output object. +/// +/// Reads `usage`/`token_usage` and applies [`cost_from_manual_usage`] per map so +/// that, under `usd_only`, a non-USD `usage` map is skipped and `token_usage` is +/// still tried. +pub(crate) fn cost_from_manual_llm_output( + output: Option<&Json>, + usd_only: bool, +) -> Option<(f64, String)> { + let object = output?.as_object()?; + let usage = object.get("usage").and_then(Json::as_object); + let token_usage = object.get("token_usage").and_then(Json::as_object); + usage + .and_then(|usage| cost_from_manual_usage(usage, usd_only)) + .or_else(|| token_usage.and_then(|usage| cost_from_manual_usage(usage, usd_only))) +} + +/// Cost `(total, currency)` from a single usage map. +/// +/// `cost_usd` is taken as USD directly. Otherwise a `cost` object yields its +/// `total` (or the sum of `input`/`output`/`cache_read`/`cache_write` +/// components), with currency from `cost.currency` (default `"USD"`). When +/// `usd_only` is set, a non-USD `cost` object returns `None`. +fn cost_from_manual_usage(usage: &Map, usd_only: bool) -> Option<(f64, String)> { + if let Some(total) = usage.get("cost_usd").and_then(Json::as_f64) { + return Some((total, "USD".to_string())); + } + let cost = usage.get("cost")?.as_object()?; + let currency = cost + .get("currency") + .and_then(Json::as_str) + .unwrap_or("USD") + .to_string(); + if usd_only && !currency.eq_ignore_ascii_case("USD") { + return None; + } + let total = cost.get("total").and_then(Json::as_f64).or_else(|| { + let (has_component, component_total) = ["input", "output", "cache_read", "cache_write"] + .iter() + .filter_map(|field| cost.get(*field).and_then(Json::as_f64)) + .fold((false, 0.0), |(_, total), value| (true, total + value)); + has_component.then_some(component_total) + })?; + Some((total, currency)) +} + +/// Normalize the reported total-token count (strict). +/// +/// Returns `None` when no total is reported, or when the reported total is less +/// than `prompt + completion` (an inconsistent value). +fn normalize_total_tokens( + total_tokens: Option, + prompt_tokens: Option, + completion_tokens: Option, +) -> Option { + let total_tokens = total_tokens?; + let minimum_total = prompt_tokens + .unwrap_or(0) + .saturating_add(completion_tokens.unwrap_or(0)); + if minimum_total == 0 || total_tokens >= minimum_total { + Some(total_tokens) + } else { + None + } +} + +/// First matching `u64` across `usage` then `token_usage` (map-major): all keys +/// are tried against `usage` before falling back to `token_usage`. +fn first_u64_from_manual_usage( + usage: Option<&Map>, + token_usage: Option<&Map>, + keys: &[&str], +) -> Option { + usage + .and_then(|value| first_u64(value, keys)) + .or_else(|| token_usage.and_then(|value| first_u64(value, keys))) +} + +fn first_nested_u64_from_manual_usage( + usage: Option<&Map>, + token_usage: Option<&Map>, + parent_key: &str, + child_key: &str, +) -> Option { + usage + .and_then(|value| nested_u64(value, parent_key, child_key)) + .or_else(|| token_usage.and_then(|value| nested_u64(value, parent_key, child_key))) +} + +fn first_u64(usage: &Map, keys: &[&str]) -> Option { + keys.iter() + .find_map(|key| usage.get(*key).and_then(Json::as_u64)) +} + +fn nested_u64(usage: &Map, parent_key: &str, child_key: &str) -> Option { + usage + .get(parent_key) + .and_then(Json::as_object) + .and_then(|details| details.get(child_key)) + .and_then(Json::as_u64) +} + +#[cfg(test)] +#[path = "../../tests/unit/observability/manual_tests.rs"] +mod tests; diff --git a/crates/core/src/observability/mod.rs b/crates/core/src/observability/mod.rs index 08aa2de3..dd83c5cf 100644 --- a/crates/core/src/observability/mod.rs +++ b/crates/core/src/observability/mod.rs @@ -13,6 +13,8 @@ pub(crate) fn test_mutex() -> &'static Mutex<()> { pub mod atif; pub mod atof; +#[cfg(any(feature = "otel", feature = "openinference"))] +pub(crate) mod manual; #[cfg(feature = "openinference")] pub mod openinference; #[cfg(feature = "otel")] diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index 54be164e..855483a6 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use super::manual; use crate::api::event::{Event, ScopeCategory}; use crate::api::runtime::EventSubscriberFn; use crate::api::scope::ScopeType; @@ -722,7 +723,7 @@ fn end_attributes(event: &Event) -> Vec { attributes.push(KeyValue::new(oi::output::MIME_TYPE, mime_type)); } let fallback_usage = if is_llm { - usage_from_manual_llm_output(event.output()) + manual::usage_from_manual_llm_output(event.output()) } else { None }; @@ -770,33 +771,49 @@ fn push_llm_usage_attributes(attributes: &mut Vec, usage: Option<&Usag } fn push_llm_request_attributes(attributes: &mut Vec, event: &Event) { + // Tier 1: a codec annotation attached at call time. if let Some(request) = event.annotated_request() { push_annotated_request_attributes(attributes, request); return; } - let Some(input) = event.input().and_then(replay_llm_payload) else { + // Tier 2: OpenClaw replay payloads. Handled before detection because replay + // content can itself look provider-shaped (e.g. carry `messages`), which + // would otherwise be misrouted to the codec path. + if let Some(input) = event.input().and_then(replay_llm_payload) { + if let Some(provider) = input.get("provider").and_then(Json::as_str) { + attributes.push(KeyValue::new(oi::llm::PROVIDER, provider.to_string())); + } + if let Some(system) = input.get("systemPrompt").and_then(display_text_from_json) { + attributes.push(KeyValue::new(oi::llm::SYSTEM, system)); + } + push_replay_input_messages(attributes, input); return; - }; - if let Some(provider) = input.get("provider").and_then(Json::as_str) { - attributes.push(KeyValue::new(oi::llm::PROVIDER, provider.to_string())); } - if let Some(system) = input.get("systemPrompt").and_then(display_text_from_json) { - attributes.push(KeyValue::new(oi::llm::SYSTEM, system)); + + // Tier 3: detect and decode a raw provider request through the codec layer. + if let Some(request) = event.normalized_llm_request() { + push_annotated_request_attributes(attributes, &request); } - push_replay_input_messages(attributes, input); } fn push_llm_response_attributes(attributes: &mut Vec, event: &Event) { + // Tier 1: a codec annotation attached at call time. if let Some(response) = event.annotated_response() { push_annotated_response_attributes(attributes, response); return; } - let Some(output) = event.output().and_then(replay_llm_response) else { + // Tier 2: OpenClaw replay payloads (top-level `openclaw` envelope). + if let Some(output) = event.output().and_then(replay_llm_response) { + push_replay_response_attributes(attributes, output); return; - }; - push_replay_response_attributes(attributes, output); + } + + // Tier 3: detect and decode a raw provider response through the codec layer. + if let Some(response) = event.normalized_llm_response() { + push_annotated_response_attributes(attributes, &response); + } } fn push_annotated_request_attributes( @@ -1097,17 +1114,10 @@ fn finish_reason_value(reason: &FinishReason) -> String { } } -fn cost_total_from_manual_llm_output(output: Option<&Json>) -> Option { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - usage - .and_then(cost_total_from_usage) - .or_else(|| token_usage.and_then(cost_total_from_usage)) -} - fn cost_total_from_llm_event(event: &Event, fallback_usage: Option<&Usage>) -> Option { - if let Some(cost) = cost_total_from_manual_llm_output(event.output()) { + if let Some(cost) = + manual::cost_from_manual_llm_output(event.output(), true).map(|(total, _)| total) + { return Some(cost); } @@ -1126,178 +1136,11 @@ fn cost_total_from_llm_event(event: &Event, fallback_usage: Option<&Usage>) -> O let usage = fallback_usage?; let model_name = event .model_name() - .or_else(|| model_name_from_manual_llm_output(event.output()))?; + .or_else(|| manual::model_name_from_manual_llm_output(event.output()))?; estimate_cost_for_provider(Some(event.name()), model_name, usage) .and_then(|cost| cost.total_for_currency("USD")) } -fn model_name_from_manual_llm_output(output: Option<&Json>) -> Option<&str> { - output?.as_object()?.get("model").and_then(Json::as_str) -} - -fn cost_total_from_usage(usage: &serde_json::Map) -> Option { - usage.get("cost_usd").and_then(Json::as_f64).or_else(|| { - let cost = usage.get("cost")?.as_object()?; - let currency = cost.get("currency").and_then(Json::as_str); - let is_usd_cost = currency.is_none_or(|currency| currency.eq_ignore_ascii_case("USD")); - if !is_usd_cost { - return None; - } - cost.get("total").and_then(Json::as_f64).or_else(|| { - let (has_component, component_total) = ["input", "output", "cache_read", "cache_write"] - .iter() - .filter_map(|field| cost.get(*field).and_then(Json::as_f64)) - .fold((false, 0.0), |(_, total), value| (true, total + value)); - has_component.then_some(component_total) - }) - }) -} - -fn usage_from_manual_llm_output(output: Option<&Json>) -> Option { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - if usage.is_none() && token_usage.is_none() { - return None; - } - - let prompt_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["prompt_tokens", "input_tokens", "inputTokens", "input"], - ); - let completion_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "completion_tokens", - "output_tokens", - "completionTokens", - "outputTokens", - "output", - ], - ); - let reported_total_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["total_tokens", "totalTokens", "total"], - ); - let cache_read_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_read_tokens", - "cached_tokens", - "cache_read_input_tokens", - "cacheReadTokens", - "cachedTokens", - "cacheReadInputTokens", - "cacheRead", - ], - ) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "input_tokens_details", - "cached_tokens", - ) - }) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "prompt_tokens_details", - "cached_tokens", - ) - }); - let cache_write_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_write_tokens", - "cache_creation_input_tokens", - "cacheWriteTokens", - "cacheCreationInputTokens", - "cacheWrite", - ], - ); - - if prompt_tokens.is_none() - && completion_tokens.is_none() - && reported_total_tokens.is_none() - && cache_read_tokens.is_none() - && cache_write_tokens.is_none() - { - return None; - } - let total_tokens = - normalize_total_tokens(reported_total_tokens, prompt_tokens, completion_tokens); - - Some(Usage { - prompt_tokens, - completion_tokens, - total_tokens, - cache_read_tokens, - cache_write_tokens, - cost: None, - }) -} - -fn normalize_total_tokens( - total_tokens: Option, - prompt_tokens: Option, - completion_tokens: Option, -) -> Option { - let total_tokens = total_tokens?; - let minimum_total = prompt_tokens - .unwrap_or(0) - .saturating_add(completion_tokens.unwrap_or(0)); - if minimum_total == 0 || total_tokens >= minimum_total { - Some(total_tokens) - } else { - None - } -} - -fn first_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - keys: &[&str], -) -> Option { - usage - .and_then(|value| first_u64(value, keys)) - .or_else(|| token_usage.and_then(|value| first_u64(value, keys))) -} - -fn first_nested_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - parent_key: &str, - child_key: &str, -) -> Option { - usage - .and_then(|value| nested_u64(value, parent_key, child_key)) - .or_else(|| token_usage.and_then(|value| nested_u64(value, parent_key, child_key))) -} - -fn nested_u64( - usage: &serde_json::Map, - parent_key: &str, - child_key: &str, -) -> Option { - usage - .get(parent_key) - .and_then(Json::as_object) - .and_then(|details| details.get(child_key)) - .and_then(Json::as_u64) -} - -fn first_u64(usage: &serde_json::Map, keys: &[&str]) -> Option { - keys.iter() - .find_map(|key| usage.get(*key).and_then(Json::as_u64)) -} - fn mark_attributes(event: &Event) -> Vec { let handle_attributes = event.attributes(); let mut attributes = vec![ diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index 74fefe59..c3f8443b 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -20,14 +20,14 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use super::manual; use crate::api::event::Event; use crate::api::event::ScopeCategory; use crate::api::runtime::EventSubscriberFn; use crate::api::scope::ScopeType; use crate::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; -use crate::codec::response::{CostEstimate, Usage, estimate_cost_for_provider}; +use crate::codec::response::{CostEstimate, estimate_cost_for_provider}; use crate::error::FlowError; -use crate::json::Json; use chrono::{DateTime, Utc}; use opentelemetry::trace::{ Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _, @@ -681,7 +681,7 @@ fn end_attributes(event: &Event) -> Vec { } fn cost_from_llm_event(event: &Event) -> Option<(f64, String)> { - if let Some(cost) = cost_from_manual_llm_output(event.output()) { + if let Some(cost) = manual::cost_from_manual_llm_output(event.output(), false) { return Some(cost); } if let Some(response) = event.annotated_response() @@ -695,10 +695,10 @@ fn cost_from_llm_event(event: &Event) -> Option<(f64, String)> { .and_then(|cost| cost_total_and_currency(&cost)); } } - let usage = usage_from_manual_llm_output(event.output())?; + let usage = manual::usage_from_manual_llm_output(event.output())?; let model_name = event .model_name() - .or_else(|| model_name_from_manual_llm_output(event.output()))?; + .or_else(|| manual::model_name_from_manual_llm_output(event.output()))?; estimate_cost_for_provider(Some(event.name()), model_name, &usage) .and_then(|cost| cost_total_and_currency(&cost)) } @@ -707,184 +707,6 @@ fn cost_total_and_currency(cost: &CostEstimate) -> Option<(f64, String)> { Some((cost.total_or_component_sum()?, cost.currency.clone())) } -fn cost_from_manual_llm_output(output: Option<&Json>) -> Option<(f64, String)> { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - usage - .and_then(cost_from_manual_usage) - .or_else(|| token_usage.and_then(cost_from_manual_usage)) -} - -fn cost_from_manual_usage(usage: &serde_json::Map) -> Option<(f64, String)> { - usage - .get("cost_usd") - .and_then(Json::as_f64) - .map(|total| (total, "USD".to_string())) - .or_else(|| { - let cost = usage.get("cost")?.as_object()?; - let total = cost.get("total").and_then(Json::as_f64).or_else(|| { - let (has_component, component_total) = - ["input", "output", "cache_read", "cache_write"] - .iter() - .filter_map(|field| cost.get(*field).and_then(Json::as_f64)) - .fold((false, 0.0), |(_, total), value| (true, total + value)); - has_component.then_some(component_total) - })?; - Some(( - total, - cost.get("currency") - .and_then(Json::as_str) - .unwrap_or("USD") - .to_string(), - )) - }) -} - -fn usage_from_manual_llm_output(output: Option<&Json>) -> Option { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - if usage.is_none() && token_usage.is_none() { - return None; - } - - let prompt_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["prompt_tokens", "input_tokens", "inputTokens", "input"], - ); - let completion_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "completion_tokens", - "output_tokens", - "completionTokens", - "outputTokens", - "output", - ], - ); - let reported_total_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["total_tokens", "totalTokens", "total"], - ); - let cache_read_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_read_tokens", - "cached_tokens", - "cache_read_input_tokens", - "cacheReadTokens", - "cachedTokens", - "cacheReadInputTokens", - "cacheRead", - ], - ) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "input_tokens_details", - "cached_tokens", - ) - }) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "prompt_tokens_details", - "cached_tokens", - ) - }); - let cache_write_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_write_tokens", - "cache_creation_input_tokens", - "cacheWriteTokens", - "cacheCreationInputTokens", - "cacheWrite", - ], - ); - - if prompt_tokens.is_none() - && completion_tokens.is_none() - && reported_total_tokens.is_none() - && cache_read_tokens.is_none() - && cache_write_tokens.is_none() - { - return None; - } - - Some(Usage { - prompt_tokens, - completion_tokens, - total_tokens: normalize_total_tokens( - reported_total_tokens, - prompt_tokens, - completion_tokens, - ), - cache_read_tokens, - cache_write_tokens, - cost: None, - }) -} - -fn model_name_from_manual_llm_output(output: Option<&Json>) -> Option<&str> { - output?.as_object()?.get("model").and_then(Json::as_str) -} - -fn first_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - keys: &[&str], -) -> Option { - keys.iter().find_map(|key| { - usage - .and_then(|usage| usage.get(*key).and_then(Json::as_u64)) - .or_else(|| token_usage.and_then(|usage| usage.get(*key).and_then(Json::as_u64))) - }) -} - -fn first_nested_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - parent: &str, - key: &str, -) -> Option { - usage - .and_then(|usage| usage.get(parent).and_then(Json::as_object)) - .and_then(|details| details.get(key).and_then(Json::as_u64)) - .or_else(|| { - token_usage - .and_then(|usage| usage.get(parent).and_then(Json::as_object)) - .and_then(|details| details.get(key).and_then(Json::as_u64)) - }) -} - -fn normalize_total_tokens( - reported_total_tokens: Option, - prompt_tokens: Option, - completion_tokens: Option, -) -> Option { - let calculated_total = match (prompt_tokens, completion_tokens) { - (Some(prompt), Some(completion)) => Some(prompt + completion), - (Some(prompt), None) => Some(prompt), - (None, Some(completion)) => Some(completion), - (None, None) => None, - }; - match (reported_total_tokens, calculated_total) { - (Some(reported), Some(calculated)) if reported >= calculated => Some(reported), - (Some(_), Some(calculated)) => Some(calculated), - (Some(reported), None) => Some(reported), - (None, calculated) => calculated, - } -} - fn mark_attributes(event: &Event) -> Vec { let handle_attributes = event.attributes(); let mut attributes = vec![ diff --git a/crates/core/tests/unit/atif_tests.rs b/crates/core/tests/unit/atif_tests.rs index 1bfe2cdd..25e4bdcf 100644 --- a/crates/core/tests/unit/atif_tests.rs +++ b/crates/core/tests/unit/atif_tests.rs @@ -8,15 +8,20 @@ use crate::api::event::{ BaseEvent, CategoryProfile, Event, EventCategory, MarkEvent, ScopeCategory, ScopeEvent, llm_attributes_to_strings, scope_attributes_to_strings, tool_attributes_to_strings, }; -use crate::api::llm::LlmAttributes; +use crate::api::llm::{LlmAttributes, LlmRequest}; use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType}; use crate::api::tool::ToolAttributes; +use crate::codec::openai_chat::OpenAIChatCodec; use crate::codec::pricing::pricing_test_mutex; +use crate::codec::request::AnnotatedLlmRequest; use crate::codec::response::{ - PricingCatalog, PricingResolver, reset_active_pricing_resolver, set_active_pricing_resolver, + AnnotatedLlmResponse, PricingCatalog, PricingResolver, reset_active_pricing_resolver, + set_active_pricing_resolver, }; +use crate::codec::traits::{LlmCodec, LlmResponseCodec}; use serde_json::json; use std::collections::HashSet; +use std::sync::Arc; struct ResetPricingResolverGuard; @@ -73,6 +78,8 @@ struct TestEventBuilder { output: Option, model_name: Option, tool_call_id: Option, + annotated_request: Option>, + annotated_response: Option>, } impl TestEventBuilder { @@ -121,6 +128,16 @@ impl TestEventBuilder { self } + fn annotated_request(mut self, annotated: AnnotatedLlmRequest) -> Self { + self.annotated_request = Some(Arc::new(annotated)); + self + } + + fn annotated_response(mut self, annotated: AnnotatedLlmResponse) -> Self { + self.annotated_response = Some(Arc::new(annotated)); + self + } + fn build(self) -> Event { match (self.event_type, self.scope_type) { (EventType::Mark, _) => Event::Mark(MarkEvent::new( @@ -191,6 +208,7 @@ impl TestEventBuilder { Some( CategoryProfile::builder() .model_name_opt(self.model_name) + .annotated_request_opt(self.annotated_request) .build(), ), )), @@ -211,6 +229,7 @@ impl TestEventBuilder { Some( CategoryProfile::builder() .model_name_opt(self.model_name) + .annotated_response_opt(self.annotated_response) .build(), ), )), @@ -265,6 +284,8 @@ fn event_builder(uuid: Uuid, event_type: EventType) -> TestEventBuilder { output: None, model_name: None, tool_call_id: None, + annotated_request: None, + annotated_response: None, } } @@ -1691,6 +1712,164 @@ fn test_exporter_llm_tool_calls_promoted() { assert_eq!(extra.llm_response.unwrap()["role"], json!("assistant")); } +#[test] +fn test_exporter_uses_annotated_message_but_raw_tool_calls() { + // The annotation supplies the normalized message text (winning over the raw + // content), while tool calls stay on the raw path so provider-specific + // extras are preserved. + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let annotated = OpenAIChatCodec + .decode_response(&json!({ + "choices": [{ + "message": {"role": "assistant", "content": "from annotation"}, + "finish_reason": "stop" + }] + })) + .unwrap(); + + let end = event_builder(Uuid::now_v7(), EventType::End) + .name("gpt-4") + .scope_type(ScopeType::Llm) + .output(json!({ + "choices": [{ + "message": { + "role": "assistant", + "content": "from RAW", + "tool_calls": [{ + "id": "call_1", + "type": "function", + "provider_data": {"trace_id": "t-1"}, + "function": {"name": "search", "arguments": "{\"q\":\"x\"}"} + }] + } + }] + })) + .annotated_response(annotated) + .build(); + exporter.state.lock().unwrap().events.push(end); + + let trajectory = exporter.export().unwrap(); + let step = &trajectory.steps[0]; + // Message comes from the annotation. + assert_eq!(step.message, json!("from annotation")); + // Tool calls come from the raw payload, with provider extras preserved. + let tool_calls = step.tool_calls.as_ref().unwrap(); + assert_eq!(tool_calls[0].function_name, "search"); + assert_eq!(tool_calls[0].arguments, json!({"q": "x"})); + assert_eq!( + tool_calls[0].extra.as_ref().unwrap()["provider_data"]["trace_id"], + json!("t-1") + ); +} + +#[test] +fn test_exporter_annotated_tool_only_response_renders_empty_message() { + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let annotated = OpenAIChatCodec + .decode_response(&json!({ + "choices": [{ + "message": { + "role": "assistant", + "content": null, + "tool_calls": [{ + "id": "call_2", + "type": "function", + "function": {"name": "lookup", "arguments": "{}"} + }] + }, + "finish_reason": "tool_calls" + }] + })) + .unwrap(); + + let end = event_builder(Uuid::now_v7(), EventType::End) + .name("gpt-4") + .scope_type(ScopeType::Llm) + .output(json!({ + "choices": [{ + "message": { + "role": "assistant", + "content": null, + "tool_calls": [{ + "id": "call_2", + "type": "function", + "function": {"name": "lookup", "arguments": "{}"} + }] + } + }] + })) + .annotated_response(annotated) + .build(); + exporter.state.lock().unwrap().events.push(end); + + let trajectory = exporter.export().unwrap(); + let step = &trajectory.steps[0]; + assert_eq!(step.message, json!("")); + assert_eq!(step.tool_calls.as_ref().unwrap()[0].function_name, "lookup"); +} + +#[test] +fn test_exporter_prefers_annotated_request_message() { + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let request = LlmRequest { + headers: serde_json::Map::new(), + content: json!({ + "model": "gpt-4", + "messages": [{"role": "user", "content": "hello from annotation"}] + }), + }; + let annotated = OpenAIChatCodec.decode(&request).unwrap(); + + let start = event_builder(Uuid::now_v7(), EventType::Start) + .name("gpt-4") + .scope_type(ScopeType::Llm) + .input(json!({"messages": [{"role": "user", "content": "from RAW"}]})) + .annotated_request(annotated) + .build(); + exporter.state.lock().unwrap().events.push(start); + + let trajectory = exporter.export().unwrap(); + let step = &trajectory.steps[0]; + assert_eq!(step.source, "user"); + assert_eq!(step.message, json!("hello from annotation")); +} + +#[test] +fn test_exporter_annotated_multimodal_request_falls_back_to_raw() { + // A multimodal (content-part) request message must not be flattened to text: + // the annotation adapter returns None and ATIF preserves the raw content. + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let multimodal = json!({ + "model": "gpt-4", + "messages": [{"role": "user", "content": [ + {"type": "text", "text": "describe"}, + {"type": "image_url", "image_url": {"url": "https://example/img.png"}} + ]}] + }); + let annotated = OpenAIChatCodec + .decode(&LlmRequest { + headers: serde_json::Map::new(), + content: multimodal.clone(), + }) + .unwrap(); + + let start = event_builder(Uuid::now_v7(), EventType::Start) + .name("gpt-4") + .scope_type(ScopeType::Llm) + .input(multimodal) + .annotated_request(annotated) + .build(); + exporter.state.lock().unwrap().events.push(start); + + let trajectory = exporter.export().unwrap(); + let message = trajectory.steps[0].message.as_str().unwrap(); + // Raw fallback preserves the image part rather than flattening to "describe". + assert!( + message.contains("image_url"), + "multimodal content preserved: {message}" + ); +} + #[test] fn test_exporter_hermes_wrapper_payload_is_atif_v17_compatible() { let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); diff --git a/crates/core/tests/unit/codec/openai_responses_tests.rs b/crates/core/tests/unit/codec/openai_responses_tests.rs index 2576c8ba..3feafeda 100644 --- a/crates/core/tests/unit/codec/openai_responses_tests.rs +++ b/crates/core/tests/unit/codec/openai_responses_tests.rs @@ -311,6 +311,58 @@ fn test_decode_response_multiple_output_text_items() { ); } +#[test] +fn test_decode_response_item_level_output_text() { + // A top-level `output_text` output item (sibling of `message`/`function_call`). + let codec = OpenAIResponsesCodec; + let response = json!({ + "output": [ + { "type": "output_text", "text": "Item text." } + ] + }); + let resp = codec.decode_response(&response).unwrap(); + assert_eq!( + resp.message, + Some(MessageContent::Text("Item text.".into())) + ); +} + +#[test] +fn test_decode_response_top_level_output_text_fallback() { + // The flattened top-level `output_text` is used when `output` yields no text. + let codec = OpenAIResponsesCodec; + let response = json!({ + "output": [], + "output_text": "Aggregated text." + }); + let resp = codec.decode_response(&response).unwrap(); + assert_eq!( + resp.message, + Some(MessageContent::Text("Aggregated text.".into())) + ); +} + +#[test] +fn test_decode_response_output_items_take_precedence_over_top_level_output_text() { + // Structured `output` message text wins over the flattened `output_text`. + let codec = OpenAIResponsesCodec; + let response = json!({ + "output": [ + { + "type": "message", + "role": "assistant", + "content": [ { "type": "output_text", "text": "Structured." } ] + } + ], + "output_text": "Aggregate that should be ignored." + }); + let resp = codec.decode_response(&response).unwrap(); + assert_eq!( + resp.message, + Some(MessageContent::Text("Structured.".into())) + ); +} + #[test] fn test_decode_response_only_reasoning_items() { let codec = OpenAIResponsesCodec; diff --git a/crates/core/tests/unit/codec/resolve_tests.rs b/crates/core/tests/unit/codec/resolve_tests.rs new file mode 100644 index 00000000..63ae804a --- /dev/null +++ b/crates/core/tests/unit/codec/resolve_tests.rs @@ -0,0 +1,223 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Unit tests for provider-surface detection and best-effort normalization. + +use super::*; +use crate::api::llm::LlmRequest; +use serde_json::json; + +fn req(content: serde_json::Value) -> LlmRequest { + LlmRequest { + headers: serde_json::Map::new(), + content, + } +} + +// --------------------------------------------------------------------------- +// ProviderSurface name mapping +// --------------------------------------------------------------------------- + +#[test] +fn surface_codec_name_round_trip() { + for surface in [ + ProviderSurface::OpenAiChat, + ProviderSurface::OpenAiResponses, + ProviderSurface::AnthropicMessages, + ] { + assert_eq!( + ProviderSurface::from_codec_name(surface.codec_name()), + Some(surface) + ); + } +} + +#[test] +fn surface_from_unknown_name_is_none() { + assert_eq!(ProviderSurface::from_codec_name("not_a_codec"), None); +} + +// --------------------------------------------------------------------------- +// detect_request_surface (priority order, hoisted from adaptive) +// --------------------------------------------------------------------------- + +#[test] +fn detect_request_responses_by_input_or_instructions() { + assert_eq!( + detect_request_surface(&json!({"input": []})), + Some(ProviderSurface::OpenAiResponses) + ); + assert_eq!( + detect_request_surface(&json!({"instructions": "x"})), + Some(ProviderSurface::OpenAiResponses) + ); +} + +#[test] +fn detect_request_anthropic_by_system() { + assert_eq!( + detect_request_surface(&json!({"system": "x", "messages": []})), + Some(ProviderSurface::AnthropicMessages) + ); +} + +#[test] +fn detect_request_chat_by_messages() { + assert_eq!( + detect_request_surface(&json!({"messages": []})), + Some(ProviderSurface::OpenAiChat) + ); +} + +#[test] +fn detect_request_priority_responses_then_anthropic_then_chat() { + // `input` wins even alongside `system` and `messages`. + assert_eq!( + detect_request_surface(&json!({"input": [], "system": "x", "messages": []})), + Some(ProviderSurface::OpenAiResponses) + ); + // `system` wins over `messages` (Anthropic carries both). + assert_eq!( + detect_request_surface(&json!({"system": "x", "messages": []})), + Some(ProviderSurface::AnthropicMessages) + ); +} + +#[test] +fn detect_request_none_for_unknown_or_non_object() { + assert_eq!(detect_request_surface(&json!({})), None); + assert_eq!(detect_request_surface(&json!({"foo": 1})), None); + assert_eq!(detect_request_surface(&json!([1, 2, 3])), None); + assert_eq!(detect_request_surface(&json!("string")), None); +} + +// --------------------------------------------------------------------------- +// detect_response_surface (strict; ambiguity -> None) +// --------------------------------------------------------------------------- + +#[test] +fn detect_response_chat_by_choices() { + assert_eq!( + detect_response_surface(&json!({"choices": []})), + Some(ProviderSurface::OpenAiChat) + ); +} + +#[test] +fn detect_response_responses_by_output_or_output_text() { + assert_eq!( + detect_response_surface(&json!({"output": []})), + Some(ProviderSurface::OpenAiResponses) + ); + assert_eq!( + detect_response_surface(&json!({"output_text": "hi"})), + Some(ProviderSurface::OpenAiResponses) + ); +} + +#[test] +fn detect_response_output_text_must_be_string() { + // A non-string `output_text` (null/object) is not a Responses match. + assert_eq!(detect_response_surface(&json!({"output_text": null})), None); + assert_eq!( + detect_response_surface(&json!({"output_text": {"nested": 1}})), + None + ); +} + +#[test] +fn detect_response_anthropic_by_type_message_and_content() { + assert_eq!( + detect_response_surface(&json!({"type": "message", "content": []})), + Some(ProviderSurface::AnthropicMessages) + ); +} + +#[test] +fn detect_response_none_for_empty_object_the_decode_trap() { + // The built-in codecs decode `{}` successfully, so detection must NOT rely + // on decode success: an empty object classifies to None. + assert_eq!(detect_response_surface(&json!({})), None); +} + +#[test] +fn detect_response_none_for_ambiguous_choices_and_output() { + assert_eq!( + detect_response_surface(&json!({"choices": [], "output": []})), + None + ); +} + +#[test] +fn detect_response_none_for_partial_anthropic() { + // `type == "message"` without a content array does not classify. + assert_eq!(detect_response_surface(&json!({"type": "message"})), None); + // A content array without `type == "message"` does not classify. + assert_eq!(detect_response_surface(&json!({"content": []})), None); +} + +#[test] +fn detect_response_none_for_non_object() { + assert_eq!(detect_response_surface(&json!([1, 2])), None); +} + +// --------------------------------------------------------------------------- +// normalize_response (detect -> decode, fail-open) +// --------------------------------------------------------------------------- + +#[test] +fn normalize_response_decodes_detected_chat() { + let raw = json!({ + "id": "r1", + "model": "gpt-4o", + "choices": [{ + "message": {"role": "assistant", "content": "hello"}, + "finish_reason": "stop" + }] + }); + let decoded = normalize_response(&raw, None).expect("chat response decodes"); + assert_eq!(decoded.response_text(), Some("hello")); +} + +#[test] +fn normalize_response_decodes_detected_anthropic() { + let raw = json!({ + "type": "message", + "role": "assistant", + "model": "claude-3-5-sonnet", + "content": [{"type": "text", "text": "hi"}], + "stop_reason": "end_turn" + }); + let decoded = normalize_response(&raw, None).expect("anthropic response decodes"); + assert_eq!(decoded.response_text(), Some("hi")); +} + +#[test] +fn normalize_response_none_for_unrecognized_shape() { + assert!(normalize_response(&json!({"foo": 1}), None).is_none()); +} + +#[test] +fn normalize_response_hint_bypasses_detection() { + // `{}` detects to None, but an explicit hint decodes via the named codec. + assert!(normalize_response(&json!({}), Some(ProviderSurface::OpenAiChat)).is_some()); +} + +// --------------------------------------------------------------------------- +// normalize_request (detect -> decode, fail-open) +// --------------------------------------------------------------------------- + +#[test] +fn normalize_request_decodes_detected_chat() { + let request = req(json!({ + "model": "gpt-4o", + "messages": [{"role": "user", "content": "hi"}] + })); + let decoded = normalize_request(&request, None).expect("chat request decodes"); + assert!(!decoded.messages.is_empty()); +} + +#[test] +fn normalize_request_none_for_unknown_shape() { + assert!(normalize_request(&req(json!({"foo": 1})), None).is_none()); +} diff --git a/crates/core/tests/unit/observability/manual_tests.rs b/crates/core/tests/unit/observability/manual_tests.rs new file mode 100644 index 00000000..4e11c54c --- /dev/null +++ b/crates/core/tests/unit/observability/manual_tests.rs @@ -0,0 +1,105 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Unit tests for the shared manual/non-provider fallback helpers, focused on +//! the points where the OpenTelemetry and OpenInference copies diverged. + +use super::*; +use serde_json::json; + +#[test] +fn cost_otel_policy_emits_any_currency() { + let output = json!({"usage": {"cost": {"total": 0.5, "currency": "EUR"}}}); + assert_eq!( + cost_from_manual_llm_output(Some(&output), false), + Some((0.5, "EUR".to_string())) + ); +} + +#[test] +fn cost_openinference_policy_drops_non_usd() { + let output = json!({"usage": {"cost": {"total": 0.5, "currency": "EUR"}}}); + assert_eq!(cost_from_manual_llm_output(Some(&output), true), None); +} + +#[test] +fn cost_component_sum_emits_currency_for_otel() { + let output = json!({"usage": {"cost": {"input": 0.5, "output": 0.375, "currency": "EUR"}}}); + assert_eq!( + cost_from_manual_llm_output(Some(&output), false), + Some((0.875, "EUR".to_string())) + ); +} + +#[test] +fn cost_usd_field_passes_usd_only() { + let output = json!({"usage": {"cost_usd": 1.25}}); + assert_eq!( + cost_from_manual_llm_output(Some(&output), true), + Some((1.25, "USD".to_string())) + ); +} + +#[test] +fn cost_absent_currency_treated_as_usd() { + let output = json!({"usage": {"cost": {"total": 0.9}}}); + assert_eq!( + cost_from_manual_llm_output(Some(&output), true), + Some((0.9, "USD".to_string())) + ); +} + +#[test] +fn cost_per_map_fallthrough_under_usd_only() { + // A non-USD `usage` cost is skipped under usd_only; `token_usage` USD wins. + let output = json!({ + "usage": {"cost": {"total": 0.5, "currency": "EUR"}}, + "token_usage": {"cost_usd": 0.2} + }); + assert_eq!( + cost_from_manual_llm_output(Some(&output), true), + Some((0.2, "USD".to_string())) + ); +} + +#[test] +fn first_u64_is_map_major() { + // `usage`'s `total` (5) wins over `token_usage`'s `total_tokens` (10): + // all keys are tried against `usage` before `token_usage`. + let usage = json!({"total": 5}); + let token_usage = json!({"total_tokens": 10}); + let got = first_u64_from_manual_usage( + usage.as_object(), + token_usage.as_object(), + &["total_tokens", "totalTokens", "total"], + ); + assert_eq!(got, Some(5)); +} + +#[test] +fn normalize_total_strict_drops_absent_and_inconsistent() { + assert_eq!(normalize_total_tokens(None, Some(5), Some(5)), None); + assert_eq!(normalize_total_tokens(Some(3), Some(5), Some(5)), None); // 3 < 10 + assert_eq!(normalize_total_tokens(Some(12), Some(5), Some(5)), Some(12)); + assert_eq!(normalize_total_tokens(Some(7), None, None), Some(7)); // minimum 0 +} + +#[test] +fn usage_extracts_aliases_and_returns_none_without_tokens() { + let output = json!({"usage": {"inputTokens": 3, "outputTokens": 4}}); + let usage = usage_from_manual_llm_output(Some(&output)).expect("has tokens"); + assert_eq!(usage.prompt_tokens, Some(3)); + assert_eq!(usage.completion_tokens, Some(4)); + assert!(usage_from_manual_llm_output(Some(&json!({"usage": {"foo": 1}}))).is_none()); + assert!(usage_from_manual_llm_output(Some(&json!({}))).is_none()); +} + +#[test] +fn model_name_extraction() { + assert_eq!( + model_name_from_manual_llm_output(Some(&json!({"model": "m"}))), + Some("m") + ); + assert_eq!(model_name_from_manual_llm_output(Some(&json!({}))), None); + assert_eq!(model_name_from_manual_llm_output(None), None); +} diff --git a/crates/core/tests/unit/observability/openinference_tests.rs b/crates/core/tests/unit/observability/openinference_tests.rs index 22d0698f..7145a151 100644 --- a/crates/core/tests/unit/observability/openinference_tests.rs +++ b/crates/core/tests/unit/observability/openinference_tests.rs @@ -954,11 +954,64 @@ fn llm_input_value_omits_request_headers() { assert!(!attributes.contains_key("nemo_relay.start.input_json")); assert!(!attributes["input.value"].contains("authorization")); assert!(!attributes["input.value"].contains("secret-token")); - assert!(!attributes.contains_key("llm.input_messages.0.message.role")); + // The provider-shaped request is decoded through the codec layer, so + // structured messages are emitted — without leaking transport headers. + assert_attr(&attributes, "llm.input_messages.0.message.role", "user"); + assert_attr(&attributes, "llm.input_messages.0.message.content", "hi"); assert_no_attr_contains(&attributes, "headers"); assert_no_attr_contains(&attributes, "secret-token"); } +#[test] +fn un_annotated_provider_response_decoded_through_codec() { + // No annotation and no OpenClaw envelope: the raw provider response is + // detected and decoded through the codec layer (tier 3), so OpenInference + // emits structured output messages instead of nothing. + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + uuid, + None, + "chat", + ScopeType::Llm, + Some(json!({ + "headers": {}, + "content": {"messages": [{"role": "user", "content": "hi"}], "model": "demo-model"} + })), + )); + processor.process(&make_end_event( + uuid, + None, + "chat", + ScopeType::Llm, + Some(json!({ + "choices": [{ + "message": {"role": "assistant", "content": "hello there"}, + "finish_reason": "stop" + }] + })), + )); + + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 1); + let attributes = attr_map(&spans[0].attributes); + assert_attr( + &attributes, + "llm.output_messages.0.message.role", + "assistant", + ); + assert_attr( + &attributes, + "llm.output_messages.0.message.content", + "hello there", + ); +} + #[test] fn openclaw_replay_payloads_emit_flattened_openinference_llm_attributes() { let (provider, exporter) = make_provider(); @@ -2077,9 +2130,7 @@ fn helper_functions_cover_additional_openinference_branches() { ), Some("Requested tools: read".to_string()) ); - assert_eq!(normalize_total_tokens(Some(5), None, None), Some(5)); - - let alias_usage = usage_from_manual_llm_output(Some(&json!({ + let alias_usage = crate::observability::manual::usage_from_manual_llm_output(Some(&json!({ "usage": {"inputTokens": 11, "outputTokens": 7, "totalTokens": 18, "cacheReadInputTokens": 5} }))) .unwrap(); diff --git a/crates/core/tests/unit/types_tests.rs b/crates/core/tests/unit/types_tests.rs index f524c30c..46b6e4b7 100644 --- a/crates/core/tests/unit/types_tests.rs +++ b/crates/core/tests/unit/types_tests.rs @@ -601,6 +601,93 @@ fn event_json_value_uses_canonical_subscriber_shape() { assert_eq!(decoded, value); } +fn llm_end_event(data: serde_json::Value, profile: CategoryProfile) -> Event { + Event::Scope(ScopeEvent::new( + BaseEvent::builder().name("llm").data(data).build(), + ScopeCategory::End, + llm_attributes_to_strings(LlmAttributes::empty()), + EventCategory::llm(), + Some(profile), + )) +} + +fn llm_start_event(data: serde_json::Value, profile: CategoryProfile) -> Event { + Event::Scope(ScopeEvent::new( + BaseEvent::builder().name("llm").data(data).build(), + ScopeCategory::Start, + llm_attributes_to_strings(LlmAttributes::empty()), + EventCategory::llm(), + Some(profile), + )) +} + +#[test] +fn normalized_llm_response_prefers_annotation_over_raw_output() { + // Annotation present: returned (borrowed), ignoring the conflicting raw output. + let response = annotated_response("resp-1", "demo-model", "from-annotation"); + let event = llm_end_event( + json!({"choices": [{"message": {"role": "assistant", "content": "from-raw"}}]}), + CategoryProfile::builder() + .annotated_response(Arc::new(response)) + .build(), + ); + let normalized = event.normalized_llm_response().expect("annotation present"); + assert_eq!(normalized.response_text(), Some("from-annotation")); +} + +#[test] +fn normalized_llm_response_falls_back_to_codec_decode() { + // No annotation: best-effort decode of the raw provider output. + let event = llm_end_event( + json!({ + "model": "gpt-4o", + "choices": [{ + "message": {"role": "assistant", "content": "from-raw"}, + "finish_reason": "stop" + }] + }), + CategoryProfile::default(), + ); + let normalized = event + .normalized_llm_response() + .expect("decodes raw chat output"); + assert_eq!(normalized.response_text(), Some("from-raw")); +} + +#[test] +fn normalized_llm_response_none_for_non_provider_output() { + let event = llm_end_event(json!({"answer": "x"}), CategoryProfile::default()); + assert!(event.normalized_llm_response().is_none()); +} + +#[test] +fn normalized_llm_request_decodes_wrapped_request_when_unannotated() { + // No annotation: decode the wrapped LlmRequest from the start-event input. + let event = llm_start_event( + json!({ + "headers": {}, + "content": {"model": "gpt-4o", "messages": [{"role": "user", "content": "hi"}]} + }), + CategoryProfile::default(), + ); + let normalized = event + .normalized_llm_request() + .expect("decodes wrapped chat request"); + assert!(!normalized.messages.is_empty()); +} + +#[test] +fn normalized_llm_request_prefers_annotation() { + let request = annotated_request("demo-model", "annotated"); + let event = llm_start_event( + json!({"headers": {}, "content": {"messages": []}}), + CategoryProfile::builder() + .annotated_request(Arc::new(request)) + .build(), + ); + assert!(event.normalized_llm_request().is_some()); +} + #[test] fn category_profile_wire_empty_accounts_for_annotations() { assert!(CategoryProfile::default().is_wire_empty()); diff --git a/docs/integrate-into-frameworks/provider-response-codecs.mdx b/docs/integrate-into-frameworks/provider-response-codecs.mdx index e70ed3d7..8502fdf4 100644 --- a/docs/integrate-into-frameworks/provider-response-codecs.mdx +++ b/docs/integrate-into-frameworks/provider-response-codecs.mdx @@ -479,6 +479,12 @@ registerSubscriber('response-debugger', (event) => { +## When No Annotation Is Present + +An `annotated_response` is attached only when a response codec was supplied for the managed call and decoding succeeded. Prefer the annotation whenever it is present — it is the single normalized view of the response. + +For events without one — manual lifecycle spans, replayed payloads, or unrecognized shapes — consumers fall back to the raw end-event payload. Built-in exporters follow this rule rather than re-parsing provider schemas by hand: they read `annotated_response` first, and otherwise recover a provider-neutral view (model, tokens, cost) from the raw payload. Reserve raw-payload extraction for these no-annotation cases; use the annotation for normal provider responses. + ## Custom Response Codecs Use a custom response codec when the provider or framework response does not match a built-in shape.