diff --git a/src/adcp/decisioning/handler.py b/src/adcp/decisioning/handler.py index 1554ab956..90d8dc72c 100644 --- a/src/adcp/decisioning/handler.py +++ b/src/adcp/decisioning/handler.py @@ -39,6 +39,7 @@ _build_request_context, _invoke_platform_method, ) +from adcp.decisioning.webhook_emit import maybe_emit_sync_completion from adcp.server.base import ADCPHandler, ToolContext if TYPE_CHECKING: @@ -70,6 +71,7 @@ UpdateMediaBuyRequest, UpdateMediaBuySuccessResponse, ) + from adcp.webhook_sender import WebhookSender # --------------------------------------------------------------------------- @@ -141,6 +143,8 @@ def __init__( registry: TaskRegistry, state_reader: StateReader | None = None, resource_resolver: ResourceResolver | None = None, + webhook_sender: WebhookSender | None = None, + auto_emit_completion_webhooks: bool = True, ) -> None: super().__init__() self._platform = platform @@ -148,6 +152,8 @@ def __init__( self._registry = registry self._state_reader = state_reader self._resource_resolver = resource_resolver + self._webhook_sender = webhook_sender + self._auto_emit_completion_webhooks = auto_emit_completion_webhooks # ----- account resolution helper ----- @@ -211,6 +217,43 @@ def _extract_auth_info(ctx: ToolContext) -> AuthInfo | None: ) return None + def _maybe_auto_emit_sync_completion( + self, + method_name: str, + params: Any, + result: Any, + ) -> None: + """Fire the F12 sync-completion webhook if applicable. + + Skips TaskHandoff projections — those go through the registry + completion path which emits its own webhook on terminal state. + The auto-emit fires on the sync-success arm only, mirroring the + JS-side ``routeIfHandoff`` logic at + ``src/lib/server/decisioning/runtime/from-platform.ts``. + + TaskHandoff projection returns the exact 2-key dict ``{"task_id": + ..., "status": "submitted"}`` from ``_project_handoff``; we + match the full key set rather than the loose ``status == + "submitted"`` predicate so an adopter who legitimately returns a + sync ``{"status": "submitted", ...}`` (e.g., synchronous queue + acceptance with extra metadata) still gets the auto-emit. + """ + if ( + isinstance(result, dict) + and set(result.keys()) == {"task_id", "status"} + and result.get("status") == "submitted" + ): + # TaskHandoff projection — registry completion path emits + # its own webhook on terminal state. + return + maybe_emit_sync_completion( + sender=self._webhook_sender, + enabled=self._auto_emit_completion_webhooks, + method_name=method_name, + params=params, + result=result, + ) + def _build_ctx( self, tool_ctx: ToolContext, @@ -260,17 +303,16 @@ async def create_media_buy( # type: ignore[override] tool_ctx = context or ToolContext() account = await self._resolve_account(params.account, tool_ctx) ctx = self._build_ctx(tool_ctx, account) - return cast( - "CreateMediaBuySuccessResponse", - await _invoke_platform_method( - self._platform, - "create_media_buy", - params, - ctx, - executor=self._executor, - registry=self._registry, - ), + result = await _invoke_platform_method( + self._platform, + "create_media_buy", + params, + ctx, + executor=self._executor, + registry=self._registry, ) + self._maybe_auto_emit_sync_completion("create_media_buy", params, result) + return cast("CreateMediaBuySuccessResponse", result) async def update_media_buy( # type: ignore[override] self, @@ -285,18 +327,17 @@ async def update_media_buy( # type: ignore[override] tool_ctx = context or ToolContext() account = await self._resolve_account(params.account, tool_ctx) ctx = self._build_ctx(tool_ctx, account) - return cast( - "UpdateMediaBuySuccessResponse", - await _invoke_platform_method( - self._platform, - "update_media_buy", - params, - ctx, - executor=self._executor, - registry=self._registry, - arg_projector={"media_buy_id": params.media_buy_id, "patch": params}, - ), + result = await _invoke_platform_method( + self._platform, + "update_media_buy", + params, + ctx, + executor=self._executor, + registry=self._registry, + arg_projector={"media_buy_id": params.media_buy_id, "patch": params}, ) + self._maybe_auto_emit_sync_completion("update_media_buy", params, result) + return cast("UpdateMediaBuySuccessResponse", result) async def sync_creatives( # type: ignore[override] self, @@ -306,17 +347,16 @@ async def sync_creatives( # type: ignore[override] tool_ctx = context or ToolContext() account = await self._resolve_account(params.account, tool_ctx) ctx = self._build_ctx(tool_ctx, account) - return cast( - "SyncCreativesSuccessResponse", - await _invoke_platform_method( - self._platform, - "sync_creatives", - params, - ctx, - executor=self._executor, - registry=self._registry, - ), + result = await _invoke_platform_method( + self._platform, + "sync_creatives", + params, + ctx, + executor=self._executor, + registry=self._registry, ) + self._maybe_auto_emit_sync_completion("sync_creatives", params, result) + return cast("SyncCreativesSuccessResponse", result) async def get_media_buy_delivery( # type: ignore[override] self, diff --git a/src/adcp/decisioning/serve.py b/src/adcp/decisioning/serve.py index ddf998085..fb86afb4e 100644 --- a/src/adcp/decisioning/serve.py +++ b/src/adcp/decisioning/serve.py @@ -43,6 +43,7 @@ from adcp.decisioning.resolve import ResourceResolver from adcp.decisioning.state import StateReader from adcp.decisioning.task_registry import TaskRegistry + from adcp.webhook_sender import WebhookSender def _is_production_env() -> bool: @@ -75,6 +76,8 @@ def create_adcp_server_from_platform( registry: TaskRegistry | None = None, state_reader: StateReader | None = None, resource_resolver: ResourceResolver | None = None, + webhook_sender: WebhookSender | None = None, + auto_emit_completion_webhooks: bool = True, ) -> tuple[PlatformHandler, ThreadPoolExecutor, TaskRegistry]: """Build the :class:`PlatformHandler` + supporting wiring from a :class:`DecisioningPlatform`. @@ -117,6 +120,24 @@ def create_adcp_server_from_platform( (D15 — async framework-mediated fetches). Default is the v6.0 stub (raises ``NotImplementedError`` with a pointer to v6.1). + :param webhook_sender: Bring-your-own + :class:`adcp.webhook_sender.WebhookSender` for sync-completion + and HITL-completion webhook delivery. Default ``None`` — when + unset, sync-completion auto-emit is a silent no-op (no URL to + deliver to, framework can't synthesize a sender). Adopters + wiring webhook delivery pass a configured sender (with their + signing key, IP-pinned transport, etc.). + :param auto_emit_completion_webhooks: F12 feature gate. When + ``True`` (default), the framework auto-fires a completion + webhook on the sync-success arm of mutating tools whenever the + request supplied ``push_notification_config.url`` AND the tool + is in :data:`adcp.decisioning.webhook_emit.SPEC_WEBHOOK_TASK_TYPES`. + Buyers passing the URL expect notification regardless of + whether the seller routed sync vs HITL. Set ``False`` for + adopters who emit webhooks manually inside their handlers + (avoid duplicate delivery; idempotency-key dedup at the + receiver would handle it but explicit suppression matches the + v5 manual-emit posture for adopters mid-migration). :raises ValueError: when ``executor`` and ``thread_pool_size`` are both supplied (D5 mutually-exclusive validation). @@ -213,6 +234,8 @@ def create_adcp_server_from_platform( registry=registry, state_reader=state_reader, resource_resolver=resource_resolver, + webhook_sender=webhook_sender, + auto_emit_completion_webhooks=auto_emit_completion_webhooks, ) return handler, executor, registry @@ -226,6 +249,8 @@ def serve( registry: TaskRegistry | None = None, state_reader: StateReader | None = None, resource_resolver: ResourceResolver | None = None, + webhook_sender: WebhookSender | None = None, + auto_emit_completion_webhooks: bool = True, advertise_all: bool = False, **serve_kwargs: Any, ) -> None: @@ -246,6 +271,14 @@ def serve( :class:`InMemoryTaskRegistry` (gated for production). :param state_reader: Custom :class:`StateReader` impl (D15). :param resource_resolver: Custom :class:`ResourceResolver` impl (D15). + :param webhook_sender: BYO :class:`adcp.webhook_sender.WebhookSender` + for completion webhook delivery (sync auto-emit + HITL terminal). + ``None`` disables auto-emit silently. + :param auto_emit_completion_webhooks: F12 — auto-fire a completion + webhook on the sync-success arm of mutating tools when the + request supplied ``push_notification_config.url``. Default + ``True``. Set ``False`` for adopters who emit webhooks + manually inside their handlers. :param advertise_all: Forwarded to :func:`adcp.server.serve`. When ``True``, ``tools/list`` advertises every method on the handler regardless of override status. Default ``False`` — @@ -267,6 +300,8 @@ def serve( registry=registry, state_reader=state_reader, resource_resolver=resource_resolver, + webhook_sender=webhook_sender, + auto_emit_completion_webhooks=auto_emit_completion_webhooks, ) server_name = name or type(platform).__name__ diff --git a/src/adcp/decisioning/webhook_emit.py b/src/adcp/decisioning/webhook_emit.py new file mode 100644 index 000000000..3cedda76f --- /dev/null +++ b/src/adcp/decisioning/webhook_emit.py @@ -0,0 +1,256 @@ +"""Auto-emit completion webhook on sync-success arm of mutating tools. + +When a buyer supplies ``push_notification_config.url`` on a request and +the seller answers via the sync fast path (NOT a :class:`TaskHandoff`), +the framework fires a completion webhook to that URL after the response +so buyers get consistent notification regardless of how the seller +routed the call. Without this, a buyer registering a webhook URL would +get notifications only on the HITL path — sync responses would leave +them polling. + +Mirrors the JS-side ``emitSyncCompletionWebhook`` at +``src/lib/server/decisioning/runtime/from-platform.ts`` (commits +``8dc427f9`` and ``7a887dfa``). Wire-format is identical: same +``task_type``, ``status: 'completed'``, ``result`` field carrying the +projected sync response, and an echoed ``token`` if the buyer +registered one. ``task_id`` is synthesized as ``f"sync-{uuid4()}"`` +since sync responses don't allocate a registry task; buyers correlate +via the resource ids embedded in ``result``. + +**Fire-and-forget.** Webhook delivery runs in a background asyncio +task; the sync response returns inline immediately. A buyer-supplied +slowloris webhook URL must not be able to hold the seller's request +worker for the full retry budget — the JS round-2 fix (``7a887dfa``) +addressed this DoS vector and Python preserves the same posture. +``_BACKGROUND_WEBHOOK_TASKS`` strong-refs in-flight emissions so the +asyncio loop's weak-ref behavior doesn't garbage-collect them +mid-flight. + +**Spec gate.** Only tools in :data:`SPEC_WEBHOOK_TASK_TYPES` (the +closed 20-value enum from ``schemas/cache/enums/task-type.json``) +emit. Spec-validating webhook receivers reject envelopes with +non-spec ``task_type`` values; tools the framework dispatches that +aren't in the spec enum (adopter-only specialism methods) skip +delivery and rely on ``publishStatusChange`` for state updates. + +Adopters who emit webhooks manually inside their handlers pass +``auto_emit_completion_webhooks=False`` to +:func:`adcp.decisioning.serve` to avoid duplicate delivery. +""" + +from __future__ import annotations + +import asyncio +import logging +import uuid +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from adcp.webhook_sender import WebhookSender + +logger = logging.getLogger(__name__) + + +#: Tools eligible for sync-completion webhook auto-emit. Mirrors the +#: closed enum in ``schemas/cache/enums/task-type.json`` verbatim. The +#: framework dispatches a wider tool surface than this set; the JS side +#: maintains the same set at +#: ``src/lib/server/decisioning/runtime/protocol-for-tool.ts``. +#: +#: Drift policy: bump this constant AND the JS +#: ``SPEC_WEBHOOK_TASK_TYPES`` in lockstep when the spec enum widens. +#: A unit test pins this to the on-disk enum so out-of-band drift +#: surfaces in CI. +SPEC_WEBHOOK_TASK_TYPES: frozenset[str] = frozenset( + { + "create_media_buy", + "update_media_buy", + "sync_creatives", + "activate_signal", + "get_signals", + "create_property_list", + "update_property_list", + "get_property_list", + "list_property_lists", + "delete_property_list", + "sync_accounts", + "get_account_financials", + "get_creative_delivery", + "sync_event_sources", + "sync_audiences", + "sync_catalogs", + "log_event", + "get_brand_identity", + "get_rights", + "acquire_rights", + } +) + + +#: Strong-ref the in-flight auto-emit tasks so the asyncio loop's +#: weak-ref behavior doesn't garbage-collect them mid-flight. +#: Module-level so the set survives across requests; framework-internal, +#: never exported. Mirrors ``_BACKGROUND_HANDOFF_TASKS`` in +#: ``dispatch.py``. +_BACKGROUND_WEBHOOK_TASKS: set[asyncio.Task[None]] = set() + + +def _extract_push_notification_url_and_token( + params: Any, +) -> tuple[str, str | None] | None: + """Pull ``(url, token)`` from ``params.push_notification_config``. + + Returns ``None`` when the request didn't carry the field, the field + is None, or the URL is empty. Tolerates both Pydantic models and + plain dicts on ``params`` since handler shims and test fixtures + both call in. The URL is unwrapped via ``str()`` so the webhook + sender sees a plain string (Pydantic AnyUrl stringifies to canonical + form). + """ + config = getattr(params, "push_notification_config", None) + if config is None and isinstance(params, dict): + config = params.get("push_notification_config") + if config is None: + return None + url = getattr(config, "url", None) + if url is None and isinstance(config, dict): + url = config.get("url") + if not url: + return None + token = getattr(config, "token", None) + if token is None and isinstance(config, dict): + token = config.get("token") + return (str(url), token) + + +async def _emit_sync_completion_webhook( + *, + sender: WebhookSender, + url: str, + token: str | None, + method_name: str, + result: Any, +) -> None: + """Fire one sync-completion webhook. Logged-and-swallowed on failure. + + Wrapped by the caller in :func:`asyncio.create_task` so the sync + response returns to the buyer immediately. Truncated to 16 hex + chars (~64 bits) — adequate for buyer correlation. Buyers + correlate primarily via resource ids on ``result`` + (``media_buy_id``, ``creative_id``, etc.); ``task_id`` here is + informational for the spec's required-field shape. + """ + task_id = f"sync-{uuid.uuid4().hex[:16]}" + try: + await sender.send_mcp( + url=url, + task_id=task_id, + status="completed", + task_type=method_name, + result=result, + token=token, + ) + except Exception: + # Logged-and-swallowed: the sync response has already returned + # to the buyer with the result inline. + logger.warning( + "[adcp.decisioning] sync completion webhook for %s " + "task_id=%s failed; sync response already returned to buyer", + method_name, + task_id, + exc_info=True, + ) + + +def maybe_emit_sync_completion( + *, + sender: WebhookSender | None, + enabled: bool, + method_name: str, + params: Any, + result: Any, +) -> None: + """Fire-and-forget auto-emit gate. Called by handler shims after + the sync-success arm of mutating tools. + + Skips silently when: + + * ``enabled`` is False (operator opted out). + * ``sender`` is None (no emitter wired). + * The request didn't carry ``push_notification_config.url``. + * ``method_name`` isn't in :data:`SPEC_WEBHOOK_TASK_TYPES` (logged + as a warning so adopters notice they extended the tool surface + beyond the spec enum). + + Schedules the actual delivery via the running event loop's + ``create_task`` so the sync response path is non-blocking. + + **Exception isolation.** The gate runs AFTER the platform method's + successful return. ANY exception in here — extraction quirk on a + weird ``params`` shape, ``loop.create_task`` failure — must NOT + propagate to the handler shim, which would lose the buyer's sync + response. The whole body is wrapped in ``try/except Exception``; + logged-and-swallowed. + """ + try: + if not enabled or sender is None: + return + extracted = _extract_push_notification_url_and_token(params) + if extracted is None: + return + url, token = extracted + if method_name not in SPEC_WEBHOOK_TASK_TYPES: + logger.warning( + "[adcp.decisioning] sync completion webhook for %s skipped — " + "tool not in spec task-type enum (closed 20-value set per " + "schemas/cache/enums/task-type.json). Use " + "publishStatusChange for long-running %s state.", + method_name, + method_name, + ) + return + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # Production code that lands here is mis-wired (handler + # shim called outside an event loop); bump to warning so + # it's visible. Cost of one warning per misuse is + # negligible vs. the cost of silent webhook loss. + logger.warning( + "[adcp.decisioning] sync completion webhook for %s " + "skipped — no running event loop. The handler shim is " + "expected to run inside an asyncio task; this branch " + "fires when sync test code calls into the handler " + "outside ``asyncio.run`` or ``pytest.mark.asyncio``.", + method_name, + ) + return + bg = loop.create_task( + _emit_sync_completion_webhook( + sender=sender, + url=url, + token=token, + method_name=method_name, + result=result, + ), + name=f"adcp-sync-completion-{method_name}", + ) + _BACKGROUND_WEBHOOK_TASKS.add(bg) + bg.add_done_callback(_BACKGROUND_WEBHOOK_TASKS.discard) + except Exception: + # Last-line defense: an unexpected exception in the gate + # itself (extraction quirk, scheduler error) must never + # propagate to the handler shim, which has already produced + # a successful sync response for the buyer. + logger.warning( + "[adcp.decisioning] sync completion webhook gate raised " + "for %s; sync response unaffected", + method_name, + exc_info=True, + ) + + +__all__ = [ + "SPEC_WEBHOOK_TASK_TYPES", + "maybe_emit_sync_completion", +] diff --git a/src/adcp/webhook_sender.py b/src/adcp/webhook_sender.py index 9d4f0b6b8..77bac692c 100644 --- a/src/adcp/webhook_sender.py +++ b/src/adcp/webhook_sender.py @@ -300,6 +300,7 @@ async def send_mcp( context_id: str | None = None, domain: str | None = None, idempotency_key: str | None = None, + token: str | None = None, extra_headers: Mapping[str, str] | None = None, ) -> WebhookDeliveryResult: """POST a signed MCP-style task-status webhook. @@ -309,6 +310,13 @@ async def send_mcp( the "same" args would produce a fresh ``timestamp`` and potentially a different serialized body, which the receiver would dedupe but with different observed payload data. + + :param token: Buyer-supplied token from + ``push_notification_config.token`` echoed back on the + payload's ``token`` field per spec + (``schemas/cache/core/push_notification_config.json``: "Echoed + back in webhook payload to validate request authenticity"). + Cross-language wire-parity with the JS implementation. """ payload = create_mcp_webhook_payload( task_id=task_id, @@ -321,6 +329,7 @@ async def send_mcp( context_id=context_id, domain=domain, idempotency_key=idempotency_key, + token=token, ) return await self.send_raw( url=url, diff --git a/src/adcp/webhooks.py b/src/adcp/webhooks.py index cf7993406..47e71a81b 100644 --- a/src/adcp/webhooks.py +++ b/src/adcp/webhooks.py @@ -94,6 +94,7 @@ def create_mcp_webhook_payload( context_id: str | None = None, domain: str | None = None, idempotency_key: str | None = None, + token: str | None = None, ) -> dict[str, Any]: """ Create MCP webhook payload dictionary. @@ -191,6 +192,15 @@ def create_mcp_webhook_payload( if domain is not None: payload["domain"] = domain + if token is not None: + # Buyer-supplied token from push_notification_config.token, + # echoed back per push-notification-config.json spec text: + # "Echoed back in webhook payload to validate request authenticity." + # Cross-language wire-parity with the JS implementation + # (``buildTaskWebhookPayload`` in ``from-platform.ts``) — buyers + # validating against the spec read body.token, not headers. + payload["token"] = token + return payload diff --git a/tests/test_decisioning_webhook_emit.py b/tests/test_decisioning_webhook_emit.py new file mode 100644 index 000000000..8f5a04e17 --- /dev/null +++ b/tests/test_decisioning_webhook_emit.py @@ -0,0 +1,725 @@ +"""F12: auto-emit completion webhook on sync-success arm. + +Mirrors the JS test file +``test/server-decisioning-auto-emit-completion.test.js`` (commits +``8dc427f9`` + ``7a887dfa``) plus Python-specific concerns: + +* TaskHandoff projection path doesn't double-fire (registry completion + emits its own webhook on terminal state). +* Fire-and-forget non-blocking — sync response returns before webhook + delivery. +* Tools outside ``SPEC_WEBHOOK_TASK_TYPES`` skip with a warning. +* No-running-loop branch is silent (sync test paths). +* SPEC_WEBHOOK_TASK_TYPES drift-guard against the on-disk schema cache. +""" + +from __future__ import annotations + +import asyncio +import json +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock + +import pytest + +from adcp.decisioning import ( + DecisioningCapabilities, + DecisioningPlatform, + SingletonAccounts, +) +from adcp.decisioning.handler import PlatformHandler +from adcp.decisioning.task_registry import InMemoryTaskRegistry +from adcp.decisioning.webhook_emit import ( + _BACKGROUND_WEBHOOK_TASKS, + SPEC_WEBHOOK_TASK_TYPES, + _extract_push_notification_url_and_token, + maybe_emit_sync_completion, +) +from adcp.server.base import ToolContext +from adcp.types import ( + CreateMediaBuyRequest, + CreateMediaBuySuccessResponse, + SyncCreativesRequest, +) + +# ---- SPEC_WEBHOOK_TASK_TYPES drift-guard ---- + + +def test_spec_webhook_task_types_matches_schema_cache() -> None: + """Pin the constant to the on-disk spec enum. CI catches + out-of-band drift when the schema cache refreshes from upstream.""" + schema_path = Path(__file__).parent.parent / "schemas" / "cache" / "enums" / "task-type.json" + with schema_path.open() as f: + on_disk = frozenset(json.load(f)["enum"]) + assert SPEC_WEBHOOK_TASK_TYPES == on_disk, ( + f"SPEC_WEBHOOK_TASK_TYPES drifted from on-disk task-type enum. " + f"Missing from constant: {sorted(on_disk - SPEC_WEBHOOK_TASK_TYPES)}; " + f"extra in constant: {sorted(SPEC_WEBHOOK_TASK_TYPES - on_disk)}." + ) + + +# ---- _extract_push_notification_url_and_token ---- + + +def test_extract_returns_none_when_config_missing() -> None: + """No ``push_notification_config`` field → no auto-emit.""" + + class _Bare: + pass + + assert _extract_push_notification_url_and_token(_Bare()) is None + + +def test_extract_returns_none_when_config_is_none() -> None: + """Field present but ``None`` → no auto-emit.""" + + class _NullConfig: + push_notification_config = None + + assert _extract_push_notification_url_and_token(_NullConfig()) is None + + +def test_extract_returns_url_and_token_when_present() -> None: + """Field with URL + token → both pulled out.""" + + class _Config: + url = "https://buyer.example.com/webhooks" + token = "echo-back-this-token" + + class _Params: + push_notification_config = _Config() + + extracted = _extract_push_notification_url_and_token(_Params()) + assert extracted == ("https://buyer.example.com/webhooks", "echo-back-this-token") + + +def test_extract_returns_url_with_none_token() -> None: + """Field with URL only → token is None.""" + + class _Config: + url = "https://buyer.example.com/webhooks" + token = None + + class _Params: + push_notification_config = _Config() + + extracted = _extract_push_notification_url_and_token(_Params()) + assert extracted == ("https://buyer.example.com/webhooks", None) + + +def test_extract_handles_dict_params() -> None: + """Test fixtures using plain-dict params still work.""" + params = { + "push_notification_config": { + "url": "https://buyer.example.com/webhooks", + "token": "tok", + } + } + extracted = _extract_push_notification_url_and_token(params) + assert extracted == ("https://buyer.example.com/webhooks", "tok") + + +# ---- maybe_emit_sync_completion gate ---- + + +@pytest.mark.asyncio +async def test_maybe_emit_skips_when_disabled() -> None: + """``enabled=False`` → no delivery, no background task.""" + sender = AsyncMock() + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + maybe_emit_sync_completion( + sender=sender, + enabled=False, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + await asyncio.sleep(0) + sender.send_mcp.assert_not_called() + + +@pytest.mark.asyncio +async def test_maybe_emit_skips_when_sender_none() -> None: + """``sender=None`` → silent skip (no emitter wired).""" + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + # Smoke — must not raise. + maybe_emit_sync_completion( + sender=None, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + + +@pytest.mark.asyncio +async def test_maybe_emit_skips_when_no_push_url() -> None: + """Request without ``push_notification_config.url`` → no delivery.""" + sender = AsyncMock() + + class _Params: + push_notification_config = None + + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + await asyncio.sleep(0) + sender.send_mcp.assert_not_called() + + +@pytest.mark.asyncio +async def test_maybe_emit_skips_tool_outside_spec_enum(caplog) -> None: + """Tool not in ``SPEC_WEBHOOK_TASK_TYPES`` → skip + warn. + + Spec-validating receivers reject envelopes with non-spec + ``task_type`` values; the framework logs once per skip so adopters + notice they extended the tool surface beyond the spec enum.""" + import logging + + sender = AsyncMock() + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + with caplog.at_level(logging.WARNING, logger="adcp.decisioning.webhook_emit"): + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="custom_adopter_tool", # Not in spec enum + params=_Params(), + result={"x": 1}, + ) + await asyncio.sleep(0) + sender.send_mcp.assert_not_called() + assert any("not in spec task-type enum" in rec.message for rec in caplog.records) + + +@pytest.mark.asyncio +async def test_maybe_emit_fires_when_url_set() -> None: + """Happy path — URL set + tool in enum + enabled → background + delivery via ``WebhookSender.send_mcp``.""" + sender = AsyncMock() + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + # Drain background task. + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + call_kwargs = sender.send_mcp.await_args.kwargs + assert call_kwargs["url"] == "https://buyer.example.com/wh" + assert call_kwargs["task_type"] == "create_media_buy" + assert call_kwargs["status"] == "completed" + assert call_kwargs["result"] == {"media_buy_id": "mb_1"} + assert call_kwargs["task_id"].startswith("sync-") + + +@pytest.mark.asyncio +async def test_maybe_emit_echoes_token_via_payload_field() -> None: + """Buyer-supplied ``push_notification_config.token`` round-trips + on the payload's ``token`` field per spec + (``schemas/cache/core/push_notification_config.json``: "Echoed + back in webhook payload to validate request authenticity"). + Cross-language wire-parity with the JS reference impl + (``buildTaskWebhookPayload`` in ``from-platform.ts``).""" + sender = AsyncMock() + + class _Config: + url = "https://buyer.example.com/wh" + token = "echo-this-back-1234567890" + + class _Params: + push_notification_config = _Config() + + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + # Token is on the payload via the ``token`` kwarg, NOT on a + # custom header. Receivers reading body.token per spec find it. + call_kwargs = sender.send_mcp.await_args.kwargs + assert call_kwargs["token"] == "echo-this-back-1234567890" + + +@pytest.mark.asyncio +async def test_maybe_emit_swallows_delivery_failure(caplog) -> None: + """Webhook delivery failure must NOT propagate — sync response + has already returned to the buyer.""" + import logging + + sender = AsyncMock() + sender.send_mcp.side_effect = RuntimeError("receiver down") + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + with caplog.at_level(logging.WARNING, logger="adcp.decisioning.webhook_emit"): + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + assert any( + "sync completion webhook" in rec.message and "failed" in rec.message + for rec in caplog.records + ) + + +def test_maybe_emit_skips_silently_with_no_running_loop() -> None: + """Sync test paths that call the gate outside an event loop get a + silent skip — surfacing this would be strictly worse than the + quiet best-effort behavior.""" + sender = AsyncMock() + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + # No asyncio.run wrapping this — must not raise. + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + sender.send_mcp.assert_not_called() + + +# ---- PlatformHandler integration: sync-success fires, handoff doesn't ---- + + +@pytest.fixture +def executor(): + pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test-f12-") + yield pool + pool.shutdown(wait=True) + + +def _make_request(*, with_url: bool = True, idem_suffix: str = "x") -> CreateMediaBuyRequest: + """Build a minimal CreateMediaBuyRequest with optional push config.""" + payload: dict[str, Any] = { + "account": {"account_id": "acct_a"}, + "brand": {"domain": "example.com"}, + "idempotency_key": f"idem_aaaa12345678{idem_suffix}", + "start_time": "2026-05-01T00:00:00Z", + "end_time": "2026-05-31T23:59:59Z", + } + if with_url: + payload["push_notification_config"] = { + "url": "https://buyer.example.com/wh", + "token": "echo-back-xxxxxxxxxxxxx", + } + return CreateMediaBuyRequest(**payload) + + +class _SyncSuccessPlatform(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + accounts = SingletonAccounts(account_id="hello") + + def get_products(self, req, ctx): + return {"products": []} + + def create_media_buy(self, req, ctx): + return CreateMediaBuySuccessResponse(media_buy_id="mb_1", packages=[], status="active") + + def update_media_buy(self, media_buy_id, patch, ctx): + return {"media_buy_id": media_buy_id, "status": "active"} + + def sync_creatives(self, req, ctx): + return {"creatives": []} + + def get_media_buy_delivery(self, req, ctx): + return {"deliveries": []} + + +class _HandoffPlatform(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + accounts = SingletonAccounts(account_id="hello") + + def get_products(self, req, ctx): + return {"products": []} + + def create_media_buy(self, req, ctx): + async def _review(task_ctx): + return CreateMediaBuySuccessResponse( + media_buy_id="mb_after_review", packages=[], status="active" + ) + + return ctx.handoff_to_task(_review) + + def update_media_buy(self, media_buy_id, patch, ctx): + return {"media_buy_id": media_buy_id, "status": "active"} + + def sync_creatives(self, req, ctx): + return {"creatives": []} + + def get_media_buy_delivery(self, req, ctx): + return {"deliveries": []} + + +@pytest.mark.asyncio +async def test_handler_fires_auto_emit_on_sync_success(executor) -> None: + """End-to-end: sync mutating tool with push URL → auto-emit fires.""" + sender = AsyncMock() + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + await handler.create_media_buy(_make_request(with_url=True), ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + assert sender.send_mcp.await_args.kwargs["task_type"] == "create_media_buy" + + +@pytest.mark.asyncio +async def test_handler_does_not_double_fire_on_handoff_path(executor) -> None: + """TaskHandoff projection returns the Submitted envelope; the + registry completion path emits its own webhook on terminal state. + The auto-emit MUST NOT fire on this arm — buyer would receive + duplicate webhooks.""" + sender = AsyncMock() + handler = PlatformHandler( + _HandoffPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + result = await handler.create_media_buy(_make_request(with_url=True), ToolContext()) + # Drain any background tasks (handoff fn runs in background). + for _ in range(20): + await asyncio.sleep(0.05) + # The auto-emit must NOT have fired — handoff path is responsible + # for its own webhook. + sender.send_mcp.assert_not_called() + # Sanity check: result is the Submitted envelope. + assert isinstance(result, dict) + assert result["status"] == "submitted" + + +@pytest.mark.asyncio +async def test_handler_opt_out_suppresses_auto_emit(executor) -> None: + """``auto_emit_completion_webhooks=False`` → no delivery on sync + success, even with URL set. Adopter middleware emits manually.""" + sender = AsyncMock() + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=False, + ) + await handler.create_media_buy(_make_request(with_url=True), ToolContext()) + await asyncio.sleep(0.05) + sender.send_mcp.assert_not_called() + + +@pytest.mark.asyncio +async def test_handler_no_url_no_emit(executor) -> None: + """Request without ``push_notification_config`` → no auto-emit.""" + sender = AsyncMock() + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + await handler.create_media_buy(_make_request(with_url=False), ToolContext()) + await asyncio.sleep(0.05) + sender.send_mcp.assert_not_called() + + +@pytest.mark.asyncio +async def test_handler_default_is_enabled(executor) -> None: + """``auto_emit_completion_webhooks`` defaults to True — adopter + not setting the flag still gets webhook delivery.""" + sender = AsyncMock() + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + # NOT passing auto_emit_completion_webhooks — testing default. + ) + await handler.create_media_buy(_make_request(with_url=True), ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_handler_no_sender_no_emit(executor) -> None: + """No webhook_sender wired (the default for ``serve()``) → silent + skip. Adopters who don't want webhooks just don't pass one.""" + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=None, + auto_emit_completion_webhooks=True, + ) + # Smoke — must not raise. + await handler.create_media_buy(_make_request(with_url=True), ToolContext()) + + +@pytest.mark.asyncio +async def test_handler_sync_creatives_also_fires(executor) -> None: + """The auto-emit isn't create_media_buy-only — sync_creatives is + also a mutating tool in the spec enum and triggers identically. + + Uses ``model_construct`` to bypass creative-payload validation + (the F12 behavior is what's under test, not the request shape).""" + sender = AsyncMock() + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + from adcp.types import PushNotificationConfig + + req = SyncCreativesRequest.model_construct( + account={"account_id": "acct_a"}, # type: ignore[arg-type] + creatives=[], + idempotency_key="idem_aaaa1234567890", + push_notification_config=PushNotificationConfig.model_construct( + url="https://buyer.example.com/wh", + token="echo-back-xxxxxxxxxxxxx", + ), + ) + await handler.sync_creatives(req, ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + assert sender.send_mcp.await_args.kwargs["task_type"] == "sync_creatives" + + +# ---- Round-2 expert review: non-blocking + concurrency + adopter-loose-shape ---- + + +@pytest.mark.asyncio +async def test_handler_returns_before_webhook_delivers(executor) -> None: + """The PR's load-bearing invariant: sync response returns BEFORE + webhook delivery completes. A future refactor that awaits the + webhook before returning would be a documented DoS vector + (slowloris webhook receiver holds the seller's request worker). + Block ``send_mcp`` on an asyncio.Event and assert the handler's + ``create_media_buy`` returns first.""" + webhook_started = asyncio.Event() + webhook_can_finish = asyncio.Event() + + async def _slow_send_mcp(*args, **kwargs): + webhook_started.set() + await webhook_can_finish.wait() + + sender = AsyncMock() + sender.send_mcp.side_effect = _slow_send_mcp + + handler = PlatformHandler( + _SyncSuccessPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + + # Sync response returns even though the webhook is still blocked. + response = await handler.create_media_buy( + _make_request(with_url=True, idem_suffix="nb"), ToolContext() + ) + # Handler returned its sync result. + assert response.media_buy_id == "mb_1" + + # Background task started but is blocked. The handler already + # returned its sync response above; the webhook receiver is still + # holding the delivery, proving the response path is non-blocking. + await asyncio.wait_for(webhook_started.wait(), timeout=1.0) + assert len(_BACKGROUND_WEBHOOK_TASKS) >= 1 + + # Release the webhook receiver and let the background task drain. + webhook_can_finish.set() + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + sender.send_mcp.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_concurrent_emissions_dont_corrupt_strong_ref_set(executor) -> None: + """100 concurrent ``maybe_emit_sync_completion`` calls — each + schedules a background task; ``_BACKGROUND_WEBHOOK_TASKS`` add / + discard pattern must remain consistent. A future regression + swapping ``set`` for a list, or using ``clear()`` instead of + ``discard``, would break this test.""" + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + sender = AsyncMock() + sender.send_mcp.return_value = None + + for _ in range(100): + maybe_emit_sync_completion( + sender=sender, + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb"}, + ) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + assert sender.send_mcp.await_count == 100 + # Set drained completely — done callbacks discarded each task. + assert len(_BACKGROUND_WEBHOOK_TASKS) == 0 + + +@pytest.mark.asyncio +async def test_handler_does_not_skip_loose_submitted_shape(executor) -> None: + """Round-2 expert review (P1): an adopter that legitimately returns + a sync ``{"status": "submitted", ...}`` (queue-acceptance with + extra metadata) must NOT have the auto-emit suppressed. The + framework's TaskHandoff projection is the EXACT 2-key shape + ``{"task_id", "status"}``; only that exact shape skips.""" + + class _LooseSubmittedPlatform(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + accounts = SingletonAccounts(account_id="hello") + + def get_products(self, req, ctx): + return {"products": []} + + def create_media_buy(self, req, ctx): + # Adopter returns a dict with "status": "submitted" PLUS + # extra fields — NOT a TaskHandoff projection. + return { + "task_id": "mb_xyz", + "status": "submitted", + "media_buy_id": "mb_xyz", + "queued_at": "2026-04-30T23:00:00Z", + } + + def update_media_buy(self, media_buy_id, patch, ctx): + return {} + + def sync_creatives(self, req, ctx): + return {} + + def get_media_buy_delivery(self, req, ctx): + return {} + + sender = AsyncMock() + handler = PlatformHandler( + _LooseSubmittedPlatform(), + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + auto_emit_completion_webhooks=True, + ) + await handler.create_media_buy(_make_request(with_url=True, idem_suffix="ls"), ToolContext()) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + # Auto-emit MUST fire — the response had extra fields, so it's + # not a TaskHandoff projection. + sender.send_mcp.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_gate_swallows_unexpected_exceptions(caplog) -> None: + """Round-2 expert review (P0): the gate's body MUST never propagate + an exception to the handler shim. Test by passing a sender whose + method-resolution raises (simulating a broken duck-typed sender). + The handler returns successfully and the gate logs the failure.""" + import logging + + # Sender that raises on attribute access — simulates a misconfigured + # duck-typed object that passes the ``sender is None`` check but + # explodes inside ``send_mcp`` lookup. + class _ExplodingSender: + @property + def send_mcp(self): + raise RuntimeError("synthetic sender access failure") + + class _Config: + url = "https://buyer.example.com/wh" + token = None + + class _Params: + push_notification_config = _Config() + + with caplog.at_level(logging.WARNING, logger="adcp.decisioning.webhook_emit"): + # Must NOT raise — the gate's outer try/except swallows. + maybe_emit_sync_completion( + sender=_ExplodingSender(), # type: ignore[arg-type] + enabled=True, + method_name="create_media_buy", + params=_Params(), + result={"media_buy_id": "mb_1"}, + ) + while _BACKGROUND_WEBHOOK_TASKS: + await asyncio.sleep(0) + # The logged failure surfaces via the framework logger so + # operators see it without breaking the buyer's sync response. + assert any("sync completion webhook" in rec.message for rec in caplog.records)