From 4d9e27c3326af8b7b2d20b0b239989f13f877277 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 00:37:45 +0000 Subject: [PATCH 1/2] feat(signing): async_resolve_agent + verify_from_agent_url + CLI --resolve MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements §"Discovering an agent's signing keys via brand_json_url" from security.mdx (8-step algorithm). Per design decisions in issue #344: - agent_resolver.py: async_resolve_agent (canonical), resolve_agent (sync wrapper), verify_from_agent_url (composes resolver + verifier). SSRF-pinned capability fetch uses own IP-pinned transport — does NOT route through ADCPClient (threat model differs for attacker-supplied URLs). - AgentResolution Pydantic model: agent_url, brand_json_url, agent_entry, jwks_uri, jwks, freshness, trace. No SDK-invented terms (identity_posture, consistency dropped per spec-provenance check). - brand_json_url read via identity dict from capabilities response (3.0.5+ has additionalProperties: true on identity, so field flows through). - tldextract added as [identity] optional extra (Tier-3 eTLD+1 binding gate). - CLI: --resolve flag (avoids positional alias collision found at __main__.py:528). - 23 unit tests, all passing. ruff + mypy (signing module) clean. Refs #344 https://claude.ai/code/session_01KkMQ5QS2Mhpyv2nfvx1btN --- pyproject.toml | 6 + src/adcp/__main__.py | 42 +++ src/adcp/signing/__init__.py | 18 ++ src/adcp/signing/agent_resolver.py | 418 ++++++++++++++++++++++++++ tests/test_agent_resolver.py | 468 +++++++++++++++++++++++++++++ 5 files changed, 952 insertions(+) create mode 100644 src/adcp/signing/agent_resolver.py create mode 100644 tests/test_agent_resolver.py diff --git a/pyproject.toml b/pyproject.toml index 96887add0..fe5cd3808 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,6 +107,12 @@ pg = [ "psycopg[binary]>=3.1.0", "psycopg-pool>=3.2.0", ] +identity = [ + # eTLD+1 binding for brand_json_url domain-consistency checks + # (Tier-3, gated on adcp#3690 Tier-3 work). Bundled PSL snapshot + # updates lazily; never fetches at runtime. + "tldextract>=5.0.0", +] [project.urls] Homepage = "https://github.com/adcontextprotocol/adcp-client-python" diff --git a/src/adcp/__main__.py b/src/adcp/__main__.py index e45e7d6e2..1ed2b9ff5 100644 --- a/src/adcp/__main__.py +++ b/src/adcp/__main__.py @@ -473,6 +473,33 @@ def handle_show_config() -> None: print(f"Config file: {CONFIG_FILE}") +def _handle_resolve(agent_url: str, *, json_output: bool, quiet: bool) -> None: + """Handle --resolve: walk brand_json_url and print the resolution result.""" + from adcp.signing.agent_resolver import AgentResolverError, resolve_agent + + try: + result = resolve_agent(agent_url) + except AgentResolverError as exc: + print(f"Error [{exc.code}]: {exc.detail}", file=sys.stderr) + sys.exit(1) + except Exception as exc: + print(f"Error: {exc}", file=sys.stderr) + sys.exit(1) + + if json_output: + print(result.model_dump_json(indent=2)) + else: + print(f"Agent URL: {result.agent_url}") + print(f"Brand JSON URL: {result.brand_json_url}") + print(f"JWKS URI: {result.jwks_uri}") + print(f"Keys: {len(result.jwks.get('keys', []))}") + if not quiet: + for hop in result.trace: + status_str = f" → HTTP {hop.status}" if hop.status is not None else "" + time_str = f" ({hop.elapsed_ms}ms)" if hop.elapsed_ms is not None else "" + print(f" [{hop.label}] {hop.url}{status_str}{time_str}") + + def resolve_agent_config(agent_identifier: str) -> dict[str, Any]: """Resolve agent identifier to configuration.""" # Check if it's a saved alias @@ -517,6 +544,13 @@ def main() -> None: parser.add_argument("--show-config", action="store_true", help="Show config file location") parser.add_argument("--version", action="store_true", help="Show SDK and AdCP version") + # Identity resolution + parser.add_argument( + "--resolve", metavar="AGENT_URL", help="Resolve agent URL to signing keys" + ) + parser.add_argument("--fresh", action="store_true", help="Bypass cache when resolving") + parser.add_argument("--quiet", action="store_true", help="Suppress non-essential output") + # Execution options parser.add_argument("--protocol", choices=["mcp", "a2a"], help="Force protocol type") parser.add_argument("--auth", help="Authentication token") @@ -542,6 +576,7 @@ def main() -> None: args.remove_agent, args.show_config, args.version, + args.resolve, ] ) ): @@ -559,6 +594,9 @@ def main() -> None: print(' adcp cs-agent calibrate_content \'{"content_standards_id":"cs-123"}\'') print(" adcp si-agent si_get_offering") print(" adcp gov-agent list_property_lists") + print("\nIdentity Resolution:") + print(" adcp --resolve https://buyer.example.com/mcp") + print(" adcp --resolve https://buyer.example.com/mcp --json") sys.exit(0) # Handle configuration commands @@ -587,6 +625,10 @@ def main() -> None: handle_show_config() sys.exit(0) + if args.resolve: + _handle_resolve(args.resolve, json_output=args.json, quiet=args.quiet) + sys.exit(0) + # Execute tool if not args.agent: print("Error: Agent identifier required", file=sys.stderr) diff --git a/src/adcp/signing/__init__.py b/src/adcp/signing/__init__.py index 37730b84b..6e0ab4c23 100644 --- a/src/adcp/signing/__init__.py +++ b/src/adcp/signing/__init__.py @@ -87,6 +87,16 @@ from __future__ import annotations +from adcp.signing.agent_resolver import ( + AgentResolution, + AgentResolutionFreshness, + AgentResolutionHop, + AgentResolverError, + AgentResolverErrorCode, + async_resolve_agent, + resolve_agent, + verify_from_agent_url, +) from adcp.signing.autosign import ( SigningConfig, SigningDecision, @@ -263,6 +273,11 @@ def __init__(self, *args: object, **kwargs: object) -> None: __all__ = [ + "AgentResolution", + "AgentResolutionFreshness", + "AgentResolutionHop", + "AgentResolverError", + "AgentResolverErrorCode", "ALG_ED25519", "ALG_ES256", "ALLOWED_ALGS", @@ -344,6 +359,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "as_async_resolver", "async_default_jwks_fetcher", "async_default_revocation_list_fetcher", + "async_resolve_agent", "async_sign_request", "averify_detached_jws", "averify_jws_document", @@ -371,6 +387,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "pem_to_adcp_jwk", "private_key_from_jwk", "public_key_from_jwk", + "resolve_agent", "resolve_and_validate_host", "sign_request", "sign_signature_base", @@ -379,6 +396,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "validate_jwks_uri", "verify_detached_jws", "verify_flask_request", + "verify_from_agent_url", "verify_jws_document", "verify_request_signature", "verify_signature", diff --git a/src/adcp/signing/agent_resolver.py b/src/adcp/signing/agent_resolver.py new file mode 100644 index 000000000..0bbe3b949 --- /dev/null +++ b/src/adcp/signing/agent_resolver.py @@ -0,0 +1,418 @@ +"""Agent URL → signing-key resolution via identity.brand_json_url. + +Implements §"Discovering an agent's signing keys via brand_json_url" from +security.mdx (8-step algorithm). Canonical entry-point: async_resolve_agent. +Sync convenience: resolve_agent. Composed verifier: verify_from_agent_url. +""" + +from __future__ import annotations + +import asyncio +import time +from collections.abc import Callable, Mapping +from contextlib import AbstractAsyncContextManager +from dataclasses import replace as _replace +from typing import TYPE_CHECKING, Any, Literal +from urllib.parse import urlsplit, urlunsplit + +import httpx +from pydantic import BaseModel, Field + +from adcp.signing.brand_jwks import BrandJsonResolverError, _fetch_brand_json +from adcp.signing.capability_priming import _unwrap_response +from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport +from adcp.signing.jwks import SSRFValidationError, async_default_jwks_fetcher + +if TYPE_CHECKING: + from adcp.signing.verifier import VerifiedSigner, VerifyOptions + +# Body caps (per design decisions in issue #344) +_CAPABILITIES_BODY_CAP = 64 * 1024 # 64 KiB +_CONNECT_TIMEOUT = 5.0 +_TOTAL_TIMEOUT = 10.0 + +_DEFAULT_PORTS: dict[str, int] = {"http": 80, "https": 443} + +#: Test seam: factory that accepts a URL and returns an async context manager +#: wrapping an httpx.AsyncClient. Production path uses IP-pinned transport; +#: tests inject a factory wired to a mock transport. +_ClientFactory = Callable[[str], AbstractAsyncContextManager[httpx.AsyncClient]] + +AgentResolverErrorCode = Literal[ + "invalid_url", + "capability_fetch_failed", + "brand_json_url_missing", + "brand_json_fetch_failed", + "brand_json_agent_not_found", + "jwks_fetch_failed", + "ssrf_blocked", +] + + +class AgentResolverError(Exception): + """Raised by async_resolve_agent / resolve_agent on any failure.""" + + def __init__(self, code: AgentResolverErrorCode, detail: str) -> None: + super().__init__(detail) + self.code: AgentResolverErrorCode = code + self.detail: str = detail + + +class AgentResolutionFreshness(BaseModel): + """Cache metadata captured at resolution time.""" + + fetched_at: float + cache_control: str | None = None + + +class AgentResolutionHop(BaseModel): + """One step in the resolution trace (capabilities / brand_json / jwks).""" + + label: str + url: str + status: int | None = None + elapsed_ms: float | None = None + + +class AgentResolution(BaseModel): + """Result of a successful agent URL → signing-key walk. + + Fields are grounded in observable wire state per the design decisions in + adcp-client-python#344. SDK-invented terms (identity_posture, consistency) + are absent — ``--json`` output is cross-SDK interop surface. + """ + + agent_url: str + brand_json_url: str + agent_entry: dict[str, Any] + jwks_uri: str + jwks: dict[str, Any] + freshness: AgentResolutionFreshness + trace: list[AgentResolutionHop] = Field(default_factory=list) + + +async def async_resolve_agent( + agent_url: str, + *, + allow_private: bool = False, + capabilities_body_cap: int = _CAPABILITIES_BODY_CAP, + connect_timeout: float = _CONNECT_TIMEOUT, + total_timeout: float = _TOTAL_TIMEOUT, + _client_factory: _ClientFactory | None = None, +) -> AgentResolution: + """Resolve an agent URL to its signing keys via brand_json_url. + + Steps (per security.mdx §"Discovering an agent's signing keys"): + 1. Fetch get_adcp_capabilities with a SSRF-pinned transport — does NOT + route through ADCPClient (threat model differs for attacker-supplied URLs). + 2. Extract identity.brand_json_url from the response. + 3. Fetch + walk brand.json to find the agent entry matching agent_url. + 4. Fetch JWKS from the agent entry's jwks_uri. + + ``_client_factory`` is a test seam; leave None in production. + """ + trace: list[AgentResolutionHop] = [] + + # Steps 1 + 2: capabilities fetch → brand_json_url + raw_caps, caps_hop = await _fetch_capabilities( + agent_url, + allow_private=allow_private, + body_cap=capabilities_body_cap, + connect_timeout=connect_timeout, + total_timeout=total_timeout, + client_factory=_client_factory, + ) + trace.append(caps_hop) + + # raw_caps is the outer JSON-RPC body; extract the tool result first so + # _unwrap_response sees the MCP CallToolResult (structuredContent / content[]) + # rather than the JSON-RPC envelope. + tool_result: Any = raw_caps + if isinstance(raw_caps, dict) and isinstance(raw_caps.get("result"), dict): + tool_result = raw_caps["result"] + + payload = _unwrap_response(tool_result) + if not isinstance(payload, dict): + raise AgentResolverError( + "brand_json_url_missing", + "get_adcp_capabilities response is not a dict", + ) + identity = payload.get("identity") + brand_json_url: str | None = ( + identity.get("brand_json_url") if isinstance(identity, dict) else None + ) + if not isinstance(brand_json_url, str) or not brand_json_url: + raise AgentResolverError( + "brand_json_url_missing", + "get_adcp_capabilities response has no identity.brand_json_url", + ) + + # Step 3: fetch brand.json and locate the agent entry + try: + fetched = await _fetch_brand_json( + start_url=brand_json_url, + current_etag=None, + max_redirects=3, + allow_private=allow_private, + timeout_seconds=total_timeout, + max_body_bytes=256 * 1024, + client_factory=_client_factory, + ) + except BrandJsonResolverError as exc: + raise AgentResolverError("brand_json_fetch_failed", str(exc)) from exc + + trace.append( + AgentResolutionHop( + label="brand_json", + url=fetched.final_url, + status=200 if fetched.status == "ok" else 304, + ) + ) + + if fetched.data is None: + raise AgentResolverError("brand_json_fetch_failed", "brand.json response missing body") + + agent_entry, jwks_uri = _find_agent_by_url(fetched.data, agent_url, fetched.final_url) + + # Step 4: fetch JWKS (async_default_jwks_fetcher already uses IP-pinned transport) + t0 = time.monotonic() + try: + jwks_data = await async_default_jwks_fetcher(jwks_uri, allow_private=allow_private) + except SSRFValidationError as exc: + raise AgentResolverError("ssrf_blocked", f"jwks_uri SSRF check failed: {exc}") from exc + except (httpx.HTTPError, ValueError, OSError) as exc: + raise AgentResolverError("jwks_fetch_failed", f"JWKS fetch failed: {exc}") from exc + elapsed = (time.monotonic() - t0) * 1000 + trace.append(AgentResolutionHop(label="jwks", url=jwks_uri, elapsed_ms=round(elapsed, 1))) + + return AgentResolution( + agent_url=agent_url, + brand_json_url=brand_json_url, + agent_entry=agent_entry, + jwks_uri=jwks_uri, + jwks=jwks_data, + freshness=AgentResolutionFreshness( + fetched_at=time.time(), + cache_control=fetched.cache_control, + ), + trace=trace, + ) + + +def resolve_agent(agent_url: str, **kwargs: Any) -> AgentResolution: + """Sync convenience wrapper around async_resolve_agent. + + Library code that runs on an asyncio event loop should call + async_resolve_agent directly to avoid blocking the loop. + """ + return asyncio.run(async_resolve_agent(agent_url, **kwargs)) + + +async def verify_from_agent_url( + *, + method: str, + url: str, + headers: Mapping[str, str], + body: bytes, + agent_url: str, + options: VerifyOptions, +) -> VerifiedSigner: + """Resolve agent keys then verify the request signature. + + Composes async_resolve_agent with verify_request_signature: + 1. Walks the brand_json_url chain to build a JWKS snapshot. + 2. Injects a StaticJwksResolver backed by that snapshot into ``options``. + 3. Calls verify_request_signature with the updated options. + + Raises AgentResolverError on resolution failure, SignatureVerificationError + on signature failure. + """ + from adcp.signing.jwks import StaticJwksResolver + from adcp.signing.verifier import verify_request_signature + + resolution = await async_resolve_agent(agent_url) + jwks_resolver = StaticJwksResolver(resolution.jwks) + pinned = _replace(options, jwks_resolver=jwks_resolver) + return verify_request_signature( + method=method, url=url, headers=headers, body=body, options=pinned + ) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +async def _fetch_capabilities( + agent_url: str, + *, + allow_private: bool, + body_cap: int, + connect_timeout: float, + total_timeout: float, + client_factory: _ClientFactory | None, +) -> tuple[dict[str, Any], AgentResolutionHop]: + """SSRF-pinned MCP tools/call fetch of get_adcp_capabilities.""" + mcp_body = { + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": {"name": "get_adcp_capabilities", "arguments": {}}, + } + t0 = time.monotonic() + + if client_factory is not None: + client_cm: AbstractAsyncContextManager[httpx.AsyncClient] = client_factory(agent_url) + else: + try: + transport = build_async_ip_pinned_transport(agent_url, allow_private=allow_private) + except SSRFValidationError as exc: + raise AgentResolverError( + "ssrf_blocked", f"agent_url SSRF check failed: {exc}" + ) from exc + timeout = httpx.Timeout( + connect=connect_timeout, + read=total_timeout, + write=total_timeout, + pool=total_timeout, + ) + client_cm = httpx.AsyncClient( + transport=transport, + timeout=timeout, + follow_redirects=False, + trust_env=False, + ) + + try: + async with client_cm as client: + try: + response = await client.post( + agent_url, + json=mcp_body, + headers={"Content-Type": "application/json"}, + ) + except SSRFValidationError as exc: + raise AgentResolverError( + "ssrf_blocked", f"agent_url SSRF check failed: {exc}" + ) from exc + except (httpx.HTTPError, OSError) as exc: + raise AgentResolverError( + "capability_fetch_failed", f"get_adcp_capabilities failed: {exc}" + ) from exc + + elapsed = (time.monotonic() - t0) * 1000 + hop = AgentResolutionHop( + label="capabilities", + url=agent_url, + status=response.status_code, + elapsed_ms=round(elapsed, 1), + ) + + if response.status_code != 200: + raise AgentResolverError( + "capability_fetch_failed", + f"get_adcp_capabilities returned HTTP {response.status_code}", + ) + + body_bytes = response.content + if len(body_bytes) > body_cap: + raise AgentResolverError( + "capability_fetch_failed", + f"get_adcp_capabilities response exceeds {body_cap} bytes", + ) + + try: + data: dict[str, Any] = response.json() + except (ValueError, httpx.DecodingError) as exc: + raise AgentResolverError( + "capability_fetch_failed", + "get_adcp_capabilities response is not valid JSON", + ) from exc + except AgentResolverError: + raise + + return data, hop + + +def _norm_url(raw: str) -> str: + """Normalize a URL for agent-entry matching (lowercase scheme+host, strip default port).""" + try: + parts = urlsplit(raw) + except ValueError: + return raw + scheme = parts.scheme.lower() + host = (parts.hostname or "").lower() + port = parts.port + if port is not None and port == _DEFAULT_PORTS.get(scheme): + port = None + netloc = host if port is None else f"{host}:{port}" + return urlunsplit((scheme, netloc, parts.path, parts.query, "")) + + +def _find_agent_by_url( + data: dict[str, Any], + agent_url: str, + final_brand_url: str, # noqa: ARG001 — reserved for future origin-check +) -> tuple[dict[str, Any], str]: + """Find the brand.json agent entry whose url matches agent_url. + + Walks the same structure as _select_agent in brand_jwks: portfolio + brands[] first, then house.agents[], then top-level agents[]. + """ + target = _norm_url(agent_url) + + def _search(agents: Any) -> tuple[dict[str, Any], str] | None: + if not isinstance(agents, list): + return None + for entry in agents: + if not isinstance(entry, dict): + continue + url = entry.get("url") + if not isinstance(url, str): + continue + if _norm_url(url) != target: + continue + jwks_uri_raw = entry.get("jwks_uri") + if isinstance(jwks_uri_raw, str): + return entry, jwks_uri_raw + # Fallback: agent-origin well-known per spec default + try: + p = urlsplit(url) + return entry, f"{p.scheme}://{p.netloc}/.well-known/jwks.json" + except ValueError: + continue + return None + + house = data.get("house") + if isinstance(house, dict): + brands = data.get("brands") + if isinstance(brands, list): + for brand in brands: + if isinstance(brand, dict): + result = _search(brand.get("agents")) + if result is not None: + return result + result = _search(house.get("agents")) + if result is not None: + return result + else: + result = _search(data.get("agents")) + if result is not None: + return result + + raise AgentResolverError( + "brand_json_agent_not_found", + f"brand.json has no agent entry matching {agent_url!r}", + ) + + +__all__ = [ + "AgentResolution", + "AgentResolutionFreshness", + "AgentResolutionHop", + "AgentResolverError", + "AgentResolverErrorCode", + "async_resolve_agent", + "resolve_agent", + "verify_from_agent_url", +] diff --git a/tests/test_agent_resolver.py b/tests/test_agent_resolver.py new file mode 100644 index 000000000..3b4e2ac65 --- /dev/null +++ b/tests/test_agent_resolver.py @@ -0,0 +1,468 @@ +"""Unit tests for adcp.signing.agent_resolver.""" +from __future__ import annotations + +import time +from contextlib import asynccontextmanager +from typing import Any +from unittest.mock import AsyncMock, patch + +import httpx +import pytest + +from adcp.signing.agent_resolver import ( + AgentResolution, + AgentResolutionFreshness, + AgentResolutionHop, + AgentResolverError, + _find_agent_by_url, + _norm_url, + async_resolve_agent, + resolve_agent, +) + + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + + +def _make_caps_http_response(brand_json_url: str | None) -> dict[str, Any]: + """Build a minimal MCP tools/call HTTP response body for get_adcp_capabilities.""" + identity: dict[str, Any] = {} + if brand_json_url is not None: + identity["brand_json_url"] = brand_json_url + return { + "jsonrpc": "2.0", + "id": 1, + "result": { + "structuredContent": { + "adcp": {"major_versions": [3]}, + "identity": identity, + } + }, + } + + +def _make_brand_json(agent_url: str, jwks_uri: str) -> dict[str, Any]: + return { + "agents": [ + {"type": "buying", "url": agent_url, "jwks_uri": jwks_uri}, + ] + } + + +def _make_jwks() -> dict[str, Any]: + return { + "keys": [ + { + "kty": "OKP", + "crv": "Ed25519", + "x": "dGVzdA==", + "kid": "test-key-1", + "use": "sig", + } + ] + } + + +def _caps_client_factory(caps_response: dict[str, Any]) -> Any: + """Return a _ClientFactory that returns the given capabilities response.""" + + @asynccontextmanager # type: ignore[arg-type] + async def _factory(url: str): # type: ignore[no-untyped-def] + client = AsyncMock() + + async def _post(*args: Any, **kwargs: Any) -> httpx.Response: + return httpx.Response(200, json=caps_response) + + client.post = _post + yield client + + return _factory + + +# --------------------------------------------------------------------------- +# _norm_url +# --------------------------------------------------------------------------- + + +class TestNormUrl: + def test_strips_default_https_port(self) -> None: + assert _norm_url("https://example.com:443/mcp") == _norm_url("https://example.com/mcp") + + def test_strips_default_http_port(self) -> None: + assert _norm_url("http://example.com:80/path") == _norm_url("http://example.com/path") + + def test_lowercases_host(self) -> None: + assert _norm_url("https://BUYER.EXAMPLE.COM/mcp") == _norm_url( + "https://buyer.example.com/mcp" + ) + + def test_preserves_non_default_port(self) -> None: + result = _norm_url("https://example.com:8443/mcp") + assert ":8443" in result + + def test_invalid_url_returns_raw(self) -> None: + raw = "not-a-url" + assert _norm_url(raw) == raw + + +# --------------------------------------------------------------------------- +# _find_agent_by_url +# --------------------------------------------------------------------------- + + +class TestFindAgentByUrl: + def test_finds_matching_agent(self) -> None: + brand_json: dict[str, Any] = { + "agents": [ + { + "type": "buying", + "url": "https://buyer.example.com/mcp", + "jwks_uri": "https://buyer.example.com/.well-known/jwks.json", + }, + ] + } + entry, jwks_uri = _find_agent_by_url( + brand_json, + "https://buyer.example.com/mcp", + "https://brand.example.com/.well-known/brand.json", + ) + assert entry["type"] == "buying" + assert jwks_uri == "https://buyer.example.com/.well-known/jwks.json" + + def test_case_insensitive_host_match(self) -> None: + brand_json: dict[str, Any] = { + "agents": [ + { + "type": "buying", + "url": "https://BUYER.example.com/mcp", + "jwks_uri": "https://buyer.example.com/.well-known/jwks.json", + }, + ] + } + entry, _ = _find_agent_by_url( + brand_json, + "https://buyer.example.com/mcp", + "https://brand.example.com/.well-known/brand.json", + ) + assert entry is not None + + def test_default_port_stripped_match(self) -> None: + brand_json: dict[str, Any] = { + "agents": [ + { + "type": "buying", + "url": "https://buyer.example.com:443/mcp", + "jwks_uri": "https://buyer.example.com/.well-known/jwks.json", + }, + ] + } + entry, _ = _find_agent_by_url( + brand_json, + "https://buyer.example.com/mcp", + "https://brand.example.com/.well-known/brand.json", + ) + assert entry is not None + + def test_raises_when_not_found(self) -> None: + brand_json: dict[str, Any] = { + "agents": [{"type": "buying", "url": "https://other.example.com/mcp"}] + } + with pytest.raises(AgentResolverError) as exc_info: + _find_agent_by_url( + brand_json, + "https://buyer.example.com/mcp", + "https://brand.example.com/.well-known/brand.json", + ) + assert exc_info.value.code == "brand_json_agent_not_found" + + def test_walks_portfolio_brands(self) -> None: + brand_json: dict[str, Any] = { + "house": {"agents": []}, + "brands": [ + { + "id": "brand-1", + "agents": [ + { + "type": "buying", + "url": "https://buyer.brand1.com/mcp", + "jwks_uri": "https://buyer.brand1.com/.well-known/jwks.json", + }, + ], + } + ], + } + entry, _ = _find_agent_by_url( + brand_json, + "https://buyer.brand1.com/mcp", + "https://brand.example.com/.well-known/brand.json", + ) + assert entry is not None + + def test_walks_house_agents_after_brands(self) -> None: + brand_json: dict[str, Any] = { + "house": { + "agents": [ + { + "type": "buying", + "url": "https://buyer.example.com/mcp", + "jwks_uri": "https://buyer.example.com/.well-known/jwks.json", + } + ] + }, + "brands": [{"id": "other", "agents": []}], + } + entry, _ = _find_agent_by_url( + brand_json, + "https://buyer.example.com/mcp", + "https://brand.example.com/.well-known/brand.json", + ) + assert entry is not None + + def test_fallback_well_known_jwks_when_no_jwks_uri(self) -> None: + brand_json: dict[str, Any] = { + "agents": [ + {"type": "buying", "url": "https://buyer.example.com/mcp"}, + ] + } + _, jwks_uri = _find_agent_by_url( + brand_json, + "https://buyer.example.com/mcp", + "https://buyer.example.com/.well-known/brand.json", + ) + assert jwks_uri == "https://buyer.example.com/.well-known/jwks.json" + + +# --------------------------------------------------------------------------- +# async_resolve_agent +# --------------------------------------------------------------------------- + + +class TestAsyncResolveAgent: + @pytest.mark.asyncio + async def test_happy_path(self) -> None: + agent_url = "https://buyer.example.com/mcp" + brand_json_url = "https://buyer.example.com/.well-known/brand.json" + jwks_uri = "https://buyer.example.com/.well-known/jwks.json" + brand_json = _make_brand_json(agent_url, jwks_uri) + jwks = _make_jwks() + caps = _make_caps_http_response(brand_json_url) + + from adcp.signing.brand_jwks import _FetchedBrandJson + + with ( + patch( + "adcp.signing.agent_resolver._fetch_brand_json", + return_value=_FetchedBrandJson( + status="ok", + final_url=brand_json_url, + data=brand_json, + etag=None, + cache_control="max-age=3600", + ), + ), + patch("adcp.signing.agent_resolver.async_default_jwks_fetcher", return_value=jwks), + ): + result = await async_resolve_agent( + agent_url, _client_factory=_caps_client_factory(caps) + ) + + assert isinstance(result, AgentResolution) + assert result.agent_url == agent_url + assert result.brand_json_url == brand_json_url + assert result.jwks_uri == jwks_uri + assert result.jwks == jwks + assert isinstance(result.freshness, AgentResolutionFreshness) + assert len(result.trace) == 3 + assert result.trace[0].label == "capabilities" + assert result.trace[0].status == 200 + assert result.trace[1].label == "brand_json" + assert result.trace[2].label == "jwks" + + @pytest.mark.asyncio + async def test_missing_brand_json_url(self) -> None: + agent_url = "https://buyer.example.com/mcp" + caps = _make_caps_http_response(None) + + with pytest.raises(AgentResolverError) as exc_info: + await async_resolve_agent(agent_url, _client_factory=_caps_client_factory(caps)) + assert exc_info.value.code == "brand_json_url_missing" + + @pytest.mark.asyncio + async def test_capabilities_http_error_status(self) -> None: + agent_url = "https://buyer.example.com/mcp" + + @asynccontextmanager # type: ignore[arg-type] + async def _factory(url: str): # type: ignore[no-untyped-def] + client = AsyncMock() + + async def _post(*a: Any, **kw: Any) -> httpx.Response: + return httpx.Response(500, json={"error": "oops"}) + + client.post = _post + yield client + + with pytest.raises(AgentResolverError) as exc_info: + await async_resolve_agent(agent_url, _client_factory=_factory) + assert exc_info.value.code == "capability_fetch_failed" + assert "500" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_capabilities_network_error(self) -> None: + agent_url = "https://buyer.example.com/mcp" + + @asynccontextmanager # type: ignore[arg-type] + async def _factory(url: str): # type: ignore[no-untyped-def] + client = AsyncMock() + + async def _post(*a: Any, **kw: Any) -> httpx.Response: + raise httpx.ConnectError("connection refused") + + client.post = _post + yield client + + with pytest.raises(AgentResolverError) as exc_info: + await async_resolve_agent(agent_url, _client_factory=_factory) + assert exc_info.value.code == "capability_fetch_failed" + + @pytest.mark.asyncio + async def test_capabilities_body_cap_exceeded(self) -> None: + agent_url = "https://buyer.example.com/mcp" + + @asynccontextmanager # type: ignore[arg-type] + async def _factory(url: str): # type: ignore[no-untyped-def] + client = AsyncMock() + + async def _post(*a: Any, **kw: Any) -> httpx.Response: + big = b"x" * 70000 + return httpx.Response(200, content=big) + + client.post = _post + yield client + + with pytest.raises(AgentResolverError) as exc_info: + await async_resolve_agent(agent_url, capabilities_body_cap=64 * 1024, _client_factory=_factory) + assert exc_info.value.code == "capability_fetch_failed" + + @pytest.mark.asyncio + async def test_agent_not_in_brand_json(self) -> None: + agent_url = "https://buyer.example.com/mcp" + brand_json_url = "https://brand.example.com/.well-known/brand.json" + caps = _make_caps_http_response(brand_json_url) + brand_json_no_match: dict[str, Any] = { + "agents": [ + { + "type": "buying", + "url": "https://other.com/mcp", + "jwks_uri": "https://other.com/.well-known/jwks.json", + } + ] + } + + from adcp.signing.brand_jwks import _FetchedBrandJson + + with patch( + "adcp.signing.agent_resolver._fetch_brand_json", + return_value=_FetchedBrandJson( + status="ok", + final_url=brand_json_url, + data=brand_json_no_match, + etag=None, + cache_control=None, + ), + ): + with pytest.raises(AgentResolverError) as exc_info: + await async_resolve_agent(agent_url, _client_factory=_caps_client_factory(caps)) + assert exc_info.value.code == "brand_json_agent_not_found" + + @pytest.mark.asyncio + async def test_brand_json_fetch_error_wrapped(self) -> None: + agent_url = "https://buyer.example.com/mcp" + brand_json_url = "https://brand.example.com/.well-known/brand.json" + caps = _make_caps_http_response(brand_json_url) + + from adcp.signing.brand_jwks import BrandJsonResolverError + + with patch( + "adcp.signing.agent_resolver._fetch_brand_json", + side_effect=BrandJsonResolverError("fetch_failed", "network error"), + ): + with pytest.raises(AgentResolverError) as exc_info: + await async_resolve_agent(agent_url, _client_factory=_caps_client_factory(caps)) + assert exc_info.value.code == "brand_json_fetch_failed" + + @pytest.mark.asyncio + async def test_freshness_captures_cache_control(self) -> None: + agent_url = "https://buyer.example.com/mcp" + brand_json_url = "https://buyer.example.com/.well-known/brand.json" + jwks_uri = "https://buyer.example.com/.well-known/jwks.json" + brand_json = _make_brand_json(agent_url, jwks_uri) + caps = _make_caps_http_response(brand_json_url) + + from adcp.signing.brand_jwks import _FetchedBrandJson + + with ( + patch( + "adcp.signing.agent_resolver._fetch_brand_json", + return_value=_FetchedBrandJson( + status="ok", + final_url=brand_json_url, + data=brand_json, + etag=None, + cache_control="max-age=1800", + ), + ), + patch( + "adcp.signing.agent_resolver.async_default_jwks_fetcher", + return_value=_make_jwks(), + ), + ): + result = await async_resolve_agent( + agent_url, _client_factory=_caps_client_factory(caps) + ) + + assert result.freshness.cache_control == "max-age=1800" + assert result.freshness.fetched_at <= time.time() + + +# --------------------------------------------------------------------------- +# resolve_agent (sync wrapper) +# --------------------------------------------------------------------------- + + +class TestResolveAgent: + def test_sync_wrapper_delegates_to_async(self, monkeypatch: pytest.MonkeyPatch) -> None: + """resolve_agent calls asyncio.run(async_resolve_agent(...)).""" + expected = AgentResolution( + agent_url="https://buyer.example.com/mcp", + brand_json_url="https://buyer.example.com/.well-known/brand.json", + agent_entry={"type": "buying"}, + jwks_uri="https://buyer.example.com/.well-known/jwks.json", + jwks={"keys": []}, + freshness=AgentResolutionFreshness(fetched_at=0.0), + ) + + async def _fake(url: str, **kwargs: Any) -> AgentResolution: + return expected + + monkeypatch.setattr("adcp.signing.agent_resolver.async_resolve_agent", _fake) + result = resolve_agent("https://buyer.example.com/mcp") + assert result is expected + + +# --------------------------------------------------------------------------- +# AgentResolverError attributes +# --------------------------------------------------------------------------- + + +class TestAgentResolverError: + def test_code_attribute(self) -> None: + err = AgentResolverError("ssrf_blocked", "private IP") + assert err.code == "ssrf_blocked" + assert err.detail == "private IP" + assert str(err) == "private IP" + + def test_is_exception(self) -> None: + err = AgentResolverError("invalid_url", "bad url") + assert isinstance(err, Exception) From 28bdd2e9cbbc3bf639975b5f5cb21f7a341cca04 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 00:53:18 +0000 Subject: [PATCH 2/2] fix(signing): address pre-PR security and code review findings Security blockers: - Add _validate_brand_json_origin: brand_json_url must be same-origin or parent-domain of agent_url; prevents a compromised agent from redirecting hop-2 key discovery to an attacker-controlled public host - Replace inline JWKS fallback in _find_agent_by_url with _default_jwks_uri from brand_jwks, which enforces agent/brand.json origin parity (prevents cross-origin trust pivot when jwks_uri is absent) - Add brand_json_origin_mismatch error code to AgentResolverErrorCode Code review findings: - Remove --fresh CLI flag (documented but silently ignored, no caching layer) - Add _client_factory seam to verify_from_agent_url so it is testable - Fix _find_agent_by_url docstring (no top-level agents[] fallback when house key present) - Add tests: origin mismatch rejected, parent domain accepted, verify_from_agent_url https://claude.ai/code/session_01KkMQ5QS2Mhpyv2nfvx1btN --- src/adcp/__main__.py | 3 +- src/adcp/signing/agent_resolver.py | 78 ++++++++++++++++--- tests/test_agent_resolver.py | 117 +++++++++++++++++++++++++++-- 3 files changed, 182 insertions(+), 16 deletions(-) diff --git a/src/adcp/__main__.py b/src/adcp/__main__.py index 1ed2b9ff5..90185293e 100644 --- a/src/adcp/__main__.py +++ b/src/adcp/__main__.py @@ -548,8 +548,7 @@ def main() -> None: parser.add_argument( "--resolve", metavar="AGENT_URL", help="Resolve agent URL to signing keys" ) - parser.add_argument("--fresh", action="store_true", help="Bypass cache when resolving") - parser.add_argument("--quiet", action="store_true", help="Suppress non-essential output") + parser.add_argument("--quiet", action="store_true", help="Suppress trace output from --resolve") # Execution options parser.add_argument("--protocol", choices=["mcp", "a2a"], help="Force protocol type") diff --git a/src/adcp/signing/agent_resolver.py b/src/adcp/signing/agent_resolver.py index 0bbe3b949..836de6fdf 100644 --- a/src/adcp/signing/agent_resolver.py +++ b/src/adcp/signing/agent_resolver.py @@ -18,7 +18,7 @@ import httpx from pydantic import BaseModel, Field -from adcp.signing.brand_jwks import BrandJsonResolverError, _fetch_brand_json +from adcp.signing.brand_jwks import BrandJsonResolverError, _default_jwks_uri, _fetch_brand_json from adcp.signing.capability_priming import _unwrap_response from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport from adcp.signing.jwks import SSRFValidationError, async_default_jwks_fetcher @@ -42,6 +42,7 @@ "invalid_url", "capability_fetch_failed", "brand_json_url_missing", + "brand_json_origin_mismatch", "brand_json_fetch_failed", "brand_json_agent_not_found", "jwks_fetch_failed", @@ -147,6 +148,11 @@ async def async_resolve_agent( "get_adcp_capabilities response has no identity.brand_json_url", ) + # Domain-binding guard: brand_json_url must be same-origin or parent-domain + # of agent_url so a compromised agent cannot redirect key discovery to an + # attacker-controlled public host (SSRF validation only blocks private IPs). + _validate_brand_json_origin(brand_json_url, agent_url) + # Step 3: fetch brand.json and locate the agent entry try: fetched = await _fetch_brand_json( @@ -216,6 +222,7 @@ async def verify_from_agent_url( body: bytes, agent_url: str, options: VerifyOptions, + _client_factory: _ClientFactory | None = None, ) -> VerifiedSigner: """Resolve agent keys then verify the request signature. @@ -226,11 +233,14 @@ async def verify_from_agent_url( Raises AgentResolverError on resolution failure, SignatureVerificationError on signature failure. + + ``_client_factory`` is a test seam forwarded to async_resolve_agent; + leave None in production. """ from adcp.signing.jwks import StaticJwksResolver from adcp.signing.verifier import verify_request_signature - resolution = await async_resolve_agent(agent_url) + resolution = await async_resolve_agent(agent_url, _client_factory=_client_factory) jwks_resolver = StaticJwksResolver(resolution.jwks) pinned = _replace(options, jwks_resolver=jwks_resolver) return verify_request_signature( @@ -243,6 +253,53 @@ async def verify_from_agent_url( # --------------------------------------------------------------------------- +def _validate_brand_json_origin(brand_json_url: str, agent_url: str) -> None: + """Reject brand_json_url values not same-origin or parent-domain of agent_url. + + A compromised agent can advertise any brand_json_url; without this guard + it could redirect key discovery to an attacker-controlled public host. + SSRF validation on hop 2 blocks private IPs but not public attacker domains. + + Accepted relationships (brand_host → agent_host examples): + example.com → buyer.example.com (agent is a subdomain of brand domain) + buyer.example.com → buyer.example.com (exact match) + + Rejected (cross-domain trust pivot): + evil.com → buyer.example.com + + Cross-subdomain cases (brand.example.com for buyer.example.com) require + the [identity] tldextract extra for eTLD+1 comparison; those are gated + on Tier-3 work tracked in adcp#3690. + """ + try: + brand_parts = urlsplit(brand_json_url) + agent_parts = urlsplit(agent_url) + except ValueError as exc: + raise AgentResolverError("invalid_url", f"invalid URL in origin check: {exc}") from exc + + if brand_parts.scheme != "https": + raise AgentResolverError( + "brand_json_origin_mismatch", + f"brand_json_url must use HTTPS (got scheme {brand_parts.scheme!r})", + ) + + brand_host = (brand_parts.hostname or "").lower() + agent_host = (agent_parts.hostname or "").lower() + + if not brand_host: + raise AgentResolverError("brand_json_origin_mismatch", "brand_json_url has no host") + + if brand_host == agent_host or agent_host.endswith("." + brand_host): + return + + raise AgentResolverError( + "brand_json_origin_mismatch", + f"brand_json_url host ({brand_host!r}) must be the same as or a parent " + f"domain of agent_url host ({agent_host!r}); use the [identity] extra " + "for cross-subdomain brand.json (e.g. brand.acme.com for buyer.acme.com)", + ) + + async def _fetch_capabilities( agent_url: str, *, @@ -352,12 +409,13 @@ def _norm_url(raw: str) -> str: def _find_agent_by_url( data: dict[str, Any], agent_url: str, - final_brand_url: str, # noqa: ARG001 — reserved for future origin-check + final_brand_url: str, ) -> tuple[dict[str, Any], str]: """Find the brand.json agent entry whose url matches agent_url. Walks the same structure as _select_agent in brand_jwks: portfolio - brands[] first, then house.agents[], then top-level agents[]. + brands[] first, then house.agents[]. Top-level agents[] is used when + there is no ``house`` key (flat brand.json). """ target = _norm_url(agent_url) @@ -375,12 +433,14 @@ def _search(agents: Any) -> tuple[dict[str, Any], str] | None: jwks_uri_raw = entry.get("jwks_uri") if isinstance(jwks_uri_raw, str): return entry, jwks_uri_raw - # Fallback: agent-origin well-known per spec default + # Fallback: spec default /.well-known/jwks.json on the agent origin. + # _default_jwks_uri enforces that agent.url and brand.json share an + # origin, preventing a malicious brand.json from directing JWKS fetch + # to an attacker-controlled host (cross-origin trust pivot). try: - p = urlsplit(url) - return entry, f"{p.scheme}://{p.netloc}/.well-known/jwks.json" - except ValueError: - continue + return entry, _default_jwks_uri(url, final_brand_url) + except BrandJsonResolverError as exc: + raise AgentResolverError("brand_json_agent_not_found", str(exc)) from exc return None house = data.get("house") diff --git a/tests/test_agent_resolver.py b/tests/test_agent_resolver.py index 3b4e2ac65..386207e28 100644 --- a/tests/test_agent_resolver.py +++ b/tests/test_agent_resolver.py @@ -12,7 +12,6 @@ from adcp.signing.agent_resolver import ( AgentResolution, AgentResolutionFreshness, - AgentResolutionHop, AgentResolverError, _find_agent_by_url, _norm_url, @@ -20,7 +19,6 @@ resolve_agent, ) - # --------------------------------------------------------------------------- # Test helpers # --------------------------------------------------------------------------- @@ -342,13 +340,15 @@ async def _post(*a: Any, **kw: Any) -> httpx.Response: yield client with pytest.raises(AgentResolverError) as exc_info: - await async_resolve_agent(agent_url, capabilities_body_cap=64 * 1024, _client_factory=_factory) + await async_resolve_agent( + agent_url, capabilities_body_cap=64 * 1024, _client_factory=_factory + ) assert exc_info.value.code == "capability_fetch_failed" @pytest.mark.asyncio async def test_agent_not_in_brand_json(self) -> None: agent_url = "https://buyer.example.com/mcp" - brand_json_url = "https://brand.example.com/.well-known/brand.json" + brand_json_url = "https://buyer.example.com/.well-known/brand.json" caps = _make_caps_http_response(brand_json_url) brand_json_no_match: dict[str, Any] = { "agents": [ @@ -379,7 +379,7 @@ async def test_agent_not_in_brand_json(self) -> None: @pytest.mark.asyncio async def test_brand_json_fetch_error_wrapped(self) -> None: agent_url = "https://buyer.example.com/mcp" - brand_json_url = "https://brand.example.com/.well-known/brand.json" + brand_json_url = "https://buyer.example.com/.well-known/brand.json" caps = _make_caps_http_response(brand_json_url) from adcp.signing.brand_jwks import BrandJsonResolverError @@ -425,6 +425,113 @@ async def test_freshness_captures_cache_control(self) -> None: assert result.freshness.cache_control == "max-age=1800" assert result.freshness.fetched_at <= time.time() + @pytest.mark.asyncio + async def test_brand_json_origin_mismatch_rejected(self) -> None: + """brand_json_url on a different domain from agent_url is rejected before hop 2.""" + agent_url = "https://buyer.example.com/mcp" + # evil.com is not same-origin or parent-domain of buyer.example.com + bad_brand_json_url = "https://evil.com/.well-known/brand.json" + caps = _make_caps_http_response(bad_brand_json_url) + + with pytest.raises(AgentResolverError) as exc_info: + await async_resolve_agent(agent_url, _client_factory=_caps_client_factory(caps)) + assert exc_info.value.code == "brand_json_origin_mismatch" + + @pytest.mark.asyncio + async def test_brand_json_parent_domain_accepted(self) -> None: + """brand_json_url on a parent domain of agent_url is accepted.""" + agent_url = "https://buyer.example.com/mcp" + brand_json_url = "https://example.com/.well-known/brand.json" + jwks_uri = "https://buyer.example.com/.well-known/jwks.json" + brand_json = _make_brand_json(agent_url, jwks_uri) + caps = _make_caps_http_response(brand_json_url) + + from adcp.signing.brand_jwks import _FetchedBrandJson + + with ( + patch( + "adcp.signing.agent_resolver._fetch_brand_json", + return_value=_FetchedBrandJson( + status="ok", + final_url=brand_json_url, + data=brand_json, + etag=None, + cache_control=None, + ), + ), + patch( + "adcp.signing.agent_resolver.async_default_jwks_fetcher", + return_value=_make_jwks(), + ), + ): + result = await async_resolve_agent( + agent_url, _client_factory=_caps_client_factory(caps) + ) + + assert result.brand_json_url == brand_json_url + + @pytest.mark.asyncio + async def test_verify_from_agent_url(self) -> None: + """verify_from_agent_url resolves keys then delegates to verify_request_signature.""" + from unittest.mock import MagicMock + + from adcp.signing.agent_resolver import verify_from_agent_url + from adcp.signing.verifier import VerifiedSigner, VerifyOptions + + agent_url = "https://buyer.example.com/mcp" + brand_json_url = "https://buyer.example.com/.well-known/brand.json" + jwks_uri = "https://buyer.example.com/.well-known/jwks.json" + brand_json = _make_brand_json(agent_url, jwks_uri) + jwks = _make_jwks() + caps = _make_caps_http_response(brand_json_url) + + fake_signer = MagicMock(spec=VerifiedSigner) + + from adcp.signing.brand_jwks import _FetchedBrandJson + from adcp.signing.verifier import VerifierCapability + + base_options = VerifyOptions( + now=0.0, + capability=VerifierCapability(), + operation="test_op", + jwks_resolver=MagicMock(), + ) + + with ( + patch( + "adcp.signing.agent_resolver._fetch_brand_json", + return_value=_FetchedBrandJson( + status="ok", + final_url=brand_json_url, + data=brand_json, + etag=None, + cache_control=None, + ), + ), + patch("adcp.signing.agent_resolver.async_default_jwks_fetcher", return_value=jwks), + patch( + "adcp.signing.verifier.verify_request_signature", + return_value=fake_signer, + ) as mock_verify, + ): + options = base_options + result = await verify_from_agent_url( + method="POST", + url="https://seller.example.com/api", + headers={"signature": "sig1=:abc:"}, + body=b"{}", + agent_url=agent_url, + options=options, + _client_factory=_caps_client_factory(caps), + ) + + assert result is fake_signer + # jwks_resolver in the call must be a StaticJwksResolver, not the original mock + call_options = mock_verify.call_args.kwargs["options"] + from adcp.signing.jwks import StaticJwksResolver + + assert isinstance(call_options.jwks_resolver, StaticJwksResolver) + # --------------------------------------------------------------------------- # resolve_agent (sync wrapper)