Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions crates/adaptive/src/acg/request_surfaces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) mod openai_responses;
use std::collections::HashSet;

use nemo_relay::api::llm::LlmRequest;
use nemo_relay::codec::resolve::{ProviderSurface, detect_request_surface};
use serde_json::Value;

use crate::acg::prompt_ir::PromptIR;
Expand All @@ -40,6 +41,17 @@ pub(crate) trait RequestSurfaceApplier: Send + Sync {
}

impl RequestSurface {
/// Map a core [`ProviderSurface`] to the adaptive request surface.
///
/// Bridges the deliberate casing difference (`OpenAiChat` -> `OpenAIChat`).
fn from_provider_surface(surface: ProviderSurface) -> Self {
match surface {
ProviderSurface::OpenAiChat => Self::OpenAIChat,
ProviderSurface::OpenAiResponses => Self::OpenAIResponses,
ProviderSurface::AnthropicMessages => Self::AnthropicMessages,
}
}

pub(crate) fn supports_provider(self, provider: &str) -> bool {
match provider {
"anthropic" => matches!(self, Self::AnthropicMessages),
Expand Down Expand Up @@ -71,17 +83,16 @@ impl RequestSurface {
pub(crate) fn resolve_request_surface_from_request(
request: &LlmRequest,
) -> crate::acg::Result<RequestSurface> {
if request.content.get("input").is_some() || request.content.get("instructions").is_some() {
Ok(RequestSurface::OpenAIResponses)
} else if request.content.get("system").is_some() {
Ok(RequestSurface::AnthropicMessages)
} else if request.content.get("messages").is_some() {
Ok(RequestSurface::OpenAIChat)
} else {
Err(crate::acg::AcgError::Internal(
"unable to resolve request surface from request shape".to_string(),
))
}
// Delegate request-shape detection to the core codec layer (single source of
// truth, hoisted from this function's former body), then preserve the
// fail-closed contract: an unrecognized shape is an internal error.
detect_request_surface(&request.content)
.map(RequestSurface::from_provider_surface)
.ok_or_else(|| {
crate::acg::AcgError::Internal(
"unable to resolve request surface from request shape".to_string(),
)
})
}

#[cfg_attr(not(test), allow(dead_code))]
Expand Down
41 changes: 40 additions & 1 deletion crates/core/src/api/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

//! Event types for Agent Trajectory Observability Format (ATOF) runtime events.

use std::borrow::Cow;
use std::collections::BTreeMap;
use std::sync::Arc;

Expand All @@ -23,10 +24,11 @@ use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use uuid::Uuid;

use crate::api::llm::LlmAttributes;
use crate::api::llm::{LlmAttributes, LlmRequest};
use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType};
use crate::api::tool::ToolAttributes;
use crate::codec::request::AnnotatedLlmRequest;
use crate::codec::resolve;
use crate::codec::response::AnnotatedLlmResponse;
use crate::json::Json;

Expand Down Expand Up @@ -609,6 +611,43 @@ impl Event {
.and_then(|profile| profile.annotated_response.as_ref())
}

/// Return the normalized LLM request, annotation-first.
///
/// Prefers a codec-attached [`AnnotatedLlmRequest`] (borrowed) when present;
/// otherwise best-effort decodes the start-event input payload through
/// [`crate::codec::resolve::normalize_request`] (owned). This is the single
/// preferred path for consumers that want normalized request data without
/// reimplementing provider parsing; only genuinely non-provider payloads
/// should fall through to a consumer-local lenient parse.
///
/// # Returns
/// `Some` borrowed annotation, `Some` owned best-effort decode, or `None`.
#[must_use]
pub fn normalized_llm_request(&self) -> Option<Cow<'_, AnnotatedLlmRequest>> {
if let Some(annotated) = self.annotated_request() {
return Some(Cow::Borrowed(annotated.as_ref()));
}
let request: LlmRequest = serde_json::from_value(self.input()?.clone()).ok()?;
resolve::normalize_request(&request, None).map(Cow::Owned)
}

/// Return the normalized LLM response, annotation-first.
///
/// Prefers a codec-attached [`AnnotatedLlmResponse`] (borrowed) when present;
/// otherwise best-effort decodes the end-event output payload through
/// [`crate::codec::resolve::normalize_response`] (owned). Counterpart to
/// [`Event::normalized_llm_request`].
///
/// # Returns
/// `Some` borrowed annotation, `Some` owned best-effort decode, or `None`.
#[must_use]
pub fn normalized_llm_response(&self) -> Option<Cow<'_, AnnotatedLlmResponse>> {
if let Some(annotated) = self.annotated_response() {
return Some(Cow::Borrowed(annotated.as_ref()));
}
resolve::normalize_response(self.output()?, None).map(Cow::Owned)
}

/// Return true for scope-start events.
///
/// # Returns
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod openai_chat;
pub mod openai_responses;
pub mod pricing;
pub mod request;
pub mod resolve;
pub mod response;
pub mod streaming;
pub mod traits;
21 changes: 20 additions & 1 deletion crates/core/src/codec/openai_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ fn collect_output_item(
.unwrap_or("")
{
"message" => collect_message_text_parts(item, text_parts),
// Top-level `output_text` output items (some providers/proxies emit
// these as siblings of `message` items rather than nested blocks).
"output_text" => {
if let Some(text) = output_text_block(item) {
text_parts.push(text);
}
}
"function_call" => tool_calls.push(parse_function_call(item)),
_ => {}
}
Expand Down Expand Up @@ -244,6 +251,17 @@ fn message_from_text_parts(text_parts: Vec<String>) -> Option<MessageContent> {
}
}

/// Top-level `output_text` convenience field, a flattened aggregate of the
/// response text. Used only as a fallback when the structured `output` items
/// yield no message text, so structured content takes precedence.
fn top_level_output_text(response: &Json) -> Option<MessageContent> {
response
.get("output_text")
.and_then(|value| value.as_str())
.filter(|text| !text.is_empty())
.map(|text| MessageContent::Text(text.to_string()))
}

fn optional_vec<T>(items: Vec<T>) -> Option<Vec<T>> {
(!items.is_empty()).then_some(items)
}
Expand Down Expand Up @@ -456,7 +474,8 @@ impl LlmResponseCodec for OpenAIResponsesCodec {

let all_output_items = raw.output.clone();
let (text_parts, tool_calls) = collect_output_parts(raw.output.as_deref());
let message = message_from_text_parts(text_parts);
let message =
message_from_text_parts(text_parts).or_else(|| top_level_output_text(response));
let tool_calls = optional_vec(tool_calls);

// Map finish reason from status + incomplete_details.
Expand Down
171 changes: 171 additions & 0 deletions crates/core/src/codec/resolve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Provider-surface detection and best-effort normalization.
//!
//! This module is the single preferred entry point for turning raw provider
//! JSON into the normalized [`AnnotatedLlmRequest`] / [`AnnotatedLlmResponse`]
//! when a caller does not already hold a codec-attached annotation. It detects
//! which built-in provider surface a payload matches and decodes it through the
//! matching built-in codec ([`OpenAIChatCodec`], [`OpenAIResponsesCodec`],
//! [`AnthropicMessagesCodec`]).
//!
//! # Preferred path
//!
//! Exporters and plugins should prefer an annotation already attached to the
//! event (see [`crate::api::event::Event::annotated_response`]) and fall back to
//! [`normalize_response`] / [`normalize_request`] here, rather than re-parsing
//! provider JSON by hand. Only genuinely non-provider/manual payloads should use
//! a consumer-local lenient fallback.
//!
//! # Detection is conservative
//!
//! [`detect_response_surface`] classifies a payload only when exactly one
//! built-in shape matches; ambiguous or unrecognized payloads return `None` so
//! callers fall through to their own handling. [`detect_request_surface`] uses
//! the same priority order as the request codecs (Responses before Anthropic
//! before Chat), mirroring adaptive's historical resolver.

use crate::api::llm::LlmRequest;
use crate::json::Json;

use super::anthropic::AnthropicMessagesCodec;
use super::openai_chat::OpenAIChatCodec;
use super::openai_responses::OpenAIResponsesCodec;
use super::request::AnnotatedLlmRequest;
use super::response::AnnotatedLlmResponse;
use super::traits::{LlmCodec, LlmResponseCodec};

/// A built-in provider request/response surface.
///
/// The string forms returned by [`ProviderSurface::codec_name`] match the
/// `ApiSpecificResponse` serde tags (`"openai_chat"`, `"openai_responses"`,
/// `"anthropic_messages"`).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProviderSurface {
/// OpenAI Chat Completions.
OpenAiChat,
/// OpenAI Responses.
OpenAiResponses,
/// Anthropic Messages.
AnthropicMessages,
}

impl ProviderSurface {
/// Map a canonical codec name to a surface.
///
/// Recognizes `"openai_chat"`, `"openai_responses"`, and
/// `"anthropic_messages"`; returns `None` for any other name.
#[must_use]
pub fn from_codec_name(name: &str) -> Option<Self> {
match name {
"openai_chat" => Some(Self::OpenAiChat),
"openai_responses" => Some(Self::OpenAiResponses),
"anthropic_messages" => Some(Self::AnthropicMessages),
_ => None,
}
}

/// The canonical codec name for this surface.
#[must_use]
pub fn codec_name(self) -> &'static str {
match self {
Self::OpenAiChat => "openai_chat",
Self::OpenAiResponses => "openai_responses",
Self::AnthropicMessages => "anthropic_messages",
}
}
}

/// Detect the request surface from a raw request body (the request `content`).
///
/// Uses the same priority order as the request codecs: Responses
/// (`input`/`instructions`) before Anthropic (`system`) before Chat
/// (`messages`). Returns `None` when no key matches or `body` is not an object.
///
/// This is the single source of truth for request-shape detection; adaptive's
/// surface resolver delegates here.
#[must_use]
pub fn detect_request_surface(body: &Json) -> Option<ProviderSurface> {
let obj = body.as_object()?;
if obj.contains_key("input") || obj.contains_key("instructions") {
Some(ProviderSurface::OpenAiResponses)
} else if obj.contains_key("system") {
Some(ProviderSurface::AnthropicMessages)
} else if obj.contains_key("messages") {
Some(ProviderSurface::OpenAiChat)
} else {
None
}
}

/// Detect the response surface from a raw provider response.
///
/// Classifies strictly on top-level shape and returns `Some` only when exactly
/// one built-in surface matches:
/// - **Chat**: a `choices` array.
/// - **Responses**: an `output` array or an `output_text` field.
/// - **Anthropic**: `type == "message"` with a `content` array.
///
/// Returns `None` for unrecognized payloads (e.g. `{}`), ambiguous payloads
/// (e.g. both `choices` and `output`), or non-objects. Strict gating is
/// deliberate: the built-in codecs accept minimal/empty objects, so decode
/// success alone is not a reliable classifier.
#[must_use]
pub fn detect_response_surface(raw: &Json) -> Option<ProviderSurface> {
let obj = raw.as_object()?;
let is_chat = obj.get("choices").is_some_and(Json::is_array);
let is_responses = obj.get("output").is_some_and(Json::is_array)
|| obj.get("output_text").is_some_and(Json::is_string);
let is_anthropic = obj.get("type").and_then(Json::as_str) == Some("message")
&& obj.get("content").is_some_and(Json::is_array);

match (is_chat, is_responses, is_anthropic) {
(true, false, false) => Some(ProviderSurface::OpenAiChat),
(false, true, false) => Some(ProviderSurface::OpenAiResponses),
(false, false, true) => Some(ProviderSurface::AnthropicMessages),
_ => None,
}
}

/// Best-effort decode of a raw request into [`AnnotatedLlmRequest`].
///
/// Uses `hint` when provided, otherwise [`detect_request_surface`] on
/// `request.content`. Fails open: returns `None` when the surface is unknown or
/// the matching codec cannot decode the request.
#[must_use]
pub fn normalize_request(
request: &LlmRequest,
hint: Option<ProviderSurface>,
) -> Option<AnnotatedLlmRequest> {
let surface = hint.or_else(|| detect_request_surface(&request.content))?;
match surface {
ProviderSurface::OpenAiChat => OpenAIChatCodec.decode(request),
ProviderSurface::OpenAiResponses => OpenAIResponsesCodec.decode(request),
ProviderSurface::AnthropicMessages => AnthropicMessagesCodec.decode(request),
}
.ok()
}

/// Best-effort decode of a raw response into [`AnnotatedLlmResponse`].
///
/// Uses `hint` when provided, otherwise [`detect_response_surface`]. Fails open:
/// returns `None` when the surface is unknown or the matching codec cannot
/// decode the response.
#[must_use]
pub fn normalize_response(
raw: &Json,
hint: Option<ProviderSurface>,
) -> Option<AnnotatedLlmResponse> {
let surface = hint.or_else(|| detect_response_surface(raw))?;
match surface {
ProviderSurface::OpenAiChat => OpenAIChatCodec.decode_response(raw),
ProviderSurface::OpenAiResponses => OpenAIResponsesCodec.decode_response(raw),
ProviderSurface::AnthropicMessages => AnthropicMessagesCodec.decode_response(raw),
}
.ok()
}

#[cfg(test)]
#[path = "../../tests/unit/codec/resolve_tests.rs"]
mod tests;
5 changes: 3 additions & 2 deletions crates/core/src/codec/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use super::response::AnnotatedLlmResponse;
/// structured [`AnnotatedLlmRequest`].
///
/// Codecs are implemented by integration patches (LangChain, LangChain-NVIDIA,
/// LangGraph, etc.) since each SDK has its own request format. They are
/// registered by name in the global codec registry.
/// LangGraph, etc.) since each SDK has its own request format. A codec is
/// supplied per call by the caller; the built-in provider codecs can also be
/// selected from a raw payload via [`crate::codec::resolve`].
///
/// # Design
///
Expand Down
Loading
Loading