Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 71 additions & 31 deletions src/adcp/decisioning/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
_build_request_context,
_invoke_platform_method,
)
from adcp.decisioning.webhook_emit import maybe_emit_sync_completion
from adcp.server.base import ADCPHandler, ToolContext

if TYPE_CHECKING:
Expand Down Expand Up @@ -70,6 +71,7 @@
UpdateMediaBuyRequest,
UpdateMediaBuySuccessResponse,
)
from adcp.webhook_sender import WebhookSender


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -141,13 +143,17 @@ def __init__(
registry: TaskRegistry,
state_reader: StateReader | None = None,
resource_resolver: ResourceResolver | None = None,
webhook_sender: WebhookSender | None = None,
auto_emit_completion_webhooks: bool = True,
) -> None:
super().__init__()
self._platform = platform
self._executor = executor
self._registry = registry
self._state_reader = state_reader
self._resource_resolver = resource_resolver
self._webhook_sender = webhook_sender
self._auto_emit_completion_webhooks = auto_emit_completion_webhooks

# ----- account resolution helper -----

Expand Down Expand Up @@ -211,6 +217,43 @@ def _extract_auth_info(ctx: ToolContext) -> AuthInfo | None:
)
return None

def _maybe_auto_emit_sync_completion(
self,
method_name: str,
params: Any,
result: Any,
) -> None:
"""Fire the F12 sync-completion webhook if applicable.

Skips TaskHandoff projections — those go through the registry
completion path which emits its own webhook on terminal state.
The auto-emit fires on the sync-success arm only, mirroring the
JS-side ``routeIfHandoff`` logic at
``src/lib/server/decisioning/runtime/from-platform.ts``.

TaskHandoff projection returns the exact 2-key dict ``{"task_id":
..., "status": "submitted"}`` from ``_project_handoff``; we
match the full key set rather than the loose ``status ==
"submitted"`` predicate so an adopter who legitimately returns a
sync ``{"status": "submitted", ...}`` (e.g., synchronous queue
acceptance with extra metadata) still gets the auto-emit.
"""
if (
isinstance(result, dict)
and set(result.keys()) == {"task_id", "status"}
and result.get("status") == "submitted"
):
# TaskHandoff projection — registry completion path emits
# its own webhook on terminal state.
return
maybe_emit_sync_completion(
sender=self._webhook_sender,
enabled=self._auto_emit_completion_webhooks,
method_name=method_name,
params=params,
result=result,
)

def _build_ctx(
self,
tool_ctx: ToolContext,
Expand Down Expand Up @@ -260,17 +303,16 @@ async def create_media_buy( # type: ignore[override]
tool_ctx = context or ToolContext()
account = await self._resolve_account(params.account, tool_ctx)
ctx = self._build_ctx(tool_ctx, account)
return cast(
"CreateMediaBuySuccessResponse",
await _invoke_platform_method(
self._platform,
"create_media_buy",
params,
ctx,
executor=self._executor,
registry=self._registry,
),
result = await _invoke_platform_method(
self._platform,
"create_media_buy",
params,
ctx,
executor=self._executor,
registry=self._registry,
)
self._maybe_auto_emit_sync_completion("create_media_buy", params, result)
return cast("CreateMediaBuySuccessResponse", result)

async def update_media_buy( # type: ignore[override]
self,
Expand All @@ -285,18 +327,17 @@ async def update_media_buy( # type: ignore[override]
tool_ctx = context or ToolContext()
account = await self._resolve_account(params.account, tool_ctx)
ctx = self._build_ctx(tool_ctx, account)
return cast(
"UpdateMediaBuySuccessResponse",
await _invoke_platform_method(
self._platform,
"update_media_buy",
params,
ctx,
executor=self._executor,
registry=self._registry,
arg_projector={"media_buy_id": params.media_buy_id, "patch": params},
),
result = await _invoke_platform_method(
self._platform,
"update_media_buy",
params,
ctx,
executor=self._executor,
registry=self._registry,
arg_projector={"media_buy_id": params.media_buy_id, "patch": params},
)
self._maybe_auto_emit_sync_completion("update_media_buy", params, result)
return cast("UpdateMediaBuySuccessResponse", result)

async def sync_creatives( # type: ignore[override]
self,
Expand All @@ -306,17 +347,16 @@ async def sync_creatives( # type: ignore[override]
tool_ctx = context or ToolContext()
account = await self._resolve_account(params.account, tool_ctx)
ctx = self._build_ctx(tool_ctx, account)
return cast(
"SyncCreativesSuccessResponse",
await _invoke_platform_method(
self._platform,
"sync_creatives",
params,
ctx,
executor=self._executor,
registry=self._registry,
),
result = await _invoke_platform_method(
self._platform,
"sync_creatives",
params,
ctx,
executor=self._executor,
registry=self._registry,
)
self._maybe_auto_emit_sync_completion("sync_creatives", params, result)
return cast("SyncCreativesSuccessResponse", result)

async def get_media_buy_delivery( # type: ignore[override]
self,
Expand Down
35 changes: 35 additions & 0 deletions src/adcp/decisioning/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from adcp.decisioning.resolve import ResourceResolver
from adcp.decisioning.state import StateReader
from adcp.decisioning.task_registry import TaskRegistry
from adcp.webhook_sender import WebhookSender


def _is_production_env() -> bool:
Expand Down Expand Up @@ -75,6 +76,8 @@ def create_adcp_server_from_platform(
registry: TaskRegistry | None = None,
state_reader: StateReader | None = None,
resource_resolver: ResourceResolver | None = None,
webhook_sender: WebhookSender | None = None,
auto_emit_completion_webhooks: bool = True,
) -> tuple[PlatformHandler, ThreadPoolExecutor, TaskRegistry]:
"""Build the :class:`PlatformHandler` + supporting wiring from a
:class:`DecisioningPlatform`.
Expand Down Expand Up @@ -117,6 +120,24 @@ def create_adcp_server_from_platform(
(D15 — async framework-mediated fetches). Default is the
v6.0 stub (raises ``NotImplementedError`` with a pointer to
v6.1).
:param webhook_sender: Bring-your-own
:class:`adcp.webhook_sender.WebhookSender` for sync-completion
and HITL-completion webhook delivery. Default ``None`` — when
unset, sync-completion auto-emit is a silent no-op (no URL to
deliver to, framework can't synthesize a sender). Adopters
wiring webhook delivery pass a configured sender (with their
signing key, IP-pinned transport, etc.).
:param auto_emit_completion_webhooks: F12 feature gate. When
``True`` (default), the framework auto-fires a completion
webhook on the sync-success arm of mutating tools whenever the
request supplied ``push_notification_config.url`` AND the tool
is in :data:`adcp.decisioning.webhook_emit.SPEC_WEBHOOK_TASK_TYPES`.
Buyers passing the URL expect notification regardless of
whether the seller routed sync vs HITL. Set ``False`` for
adopters who emit webhooks manually inside their handlers
(avoid duplicate delivery; idempotency-key dedup at the
receiver would handle it but explicit suppression matches the
v5 manual-emit posture for adopters mid-migration).

:raises ValueError: when ``executor`` and ``thread_pool_size`` are
both supplied (D5 mutually-exclusive validation).
Expand Down Expand Up @@ -213,6 +234,8 @@ def create_adcp_server_from_platform(
registry=registry,
state_reader=state_reader,
resource_resolver=resource_resolver,
webhook_sender=webhook_sender,
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
)
return handler, executor, registry

Expand All @@ -226,6 +249,8 @@ def serve(
registry: TaskRegistry | None = None,
state_reader: StateReader | None = None,
resource_resolver: ResourceResolver | None = None,
webhook_sender: WebhookSender | None = None,
auto_emit_completion_webhooks: bool = True,
advertise_all: bool = False,
**serve_kwargs: Any,
) -> None:
Expand All @@ -246,6 +271,14 @@ def serve(
:class:`InMemoryTaskRegistry` (gated for production).
:param state_reader: Custom :class:`StateReader` impl (D15).
:param resource_resolver: Custom :class:`ResourceResolver` impl (D15).
:param webhook_sender: BYO :class:`adcp.webhook_sender.WebhookSender`
for completion webhook delivery (sync auto-emit + HITL terminal).
``None`` disables auto-emit silently.
:param auto_emit_completion_webhooks: F12 — auto-fire a completion
webhook on the sync-success arm of mutating tools when the
request supplied ``push_notification_config.url``. Default
``True``. Set ``False`` for adopters who emit webhooks
manually inside their handlers.
:param advertise_all: Forwarded to :func:`adcp.server.serve`. When
``True``, ``tools/list`` advertises every method on the
handler regardless of override status. Default ``False`` —
Expand All @@ -267,6 +300,8 @@ def serve(
registry=registry,
state_reader=state_reader,
resource_resolver=resource_resolver,
webhook_sender=webhook_sender,
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
)

server_name = name or type(platform).__name__
Expand Down
Loading
Loading