Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/adcp/decisioning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ def create_media_buy(
create_adcp_server_from_platform,
serve,
)
from adcp.decisioning.specialisms import (
AudiencePlatform,
SalesPlatform,
SignalsPlatform,
)
from adcp.decisioning.state import (
GovernanceContextJWS,
Proposal,
Expand All @@ -101,6 +106,7 @@ def create_media_buy(
"Account",
"AccountStore",
"AdcpError",
"AudiencePlatform",
"AuthInfo",
"CollectionList",
"DecisioningCapabilities",
Expand All @@ -118,7 +124,9 @@ def create_media_buy(
"PropertyListReference",
"RequestContext",
"ResourceResolver",
"SalesPlatform",
"SalesResult",
"SignalsPlatform",
"SingletonAccounts",
"StateReader",
"TaskHandoff",
Expand Down
23 changes: 23 additions & 0 deletions src/adcp/decisioning/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,29 @@
"sync_catalogs",
}
),
# Signals specialisms — third-party data brokers and first-party
# data providers share the same SignalsPlatform Protocol surface.
"signal-marketplace": frozenset(
{
"get_signals",
"activate_signal",
}
),
"signal-owned": frozenset(
{
"get_signals",
"activate_signal",
}
),
# Audience-sync — first-party CRM audience push with delta upsert.
# ``poll_audience_statuses`` is an adopter-internal helper not
# surfaced as a wire tool; ``sync_audiences`` is the only required
# method for spec coverage.
"audience-sync": frozenset(
{
"sync_audiences",
}
),
}


Expand Down
24 changes: 15 additions & 9 deletions src/adcp/decisioning/specialisms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,25 @@

Public surface re-exported from :mod:`adcp.decisioning.specialisms`:

* :class:`SalesPlatform` — covers all 9 ``sales-*`` specialisms
(non-guaranteed, guaranteed, broadcast-tv, streaming-tv, social,
exchange, proposal-mode, catalog-driven, retail-media) under one
unified hybrid shape.
* :class:`SalesPlatform` — covers the spec ``sales-*`` slugs
(non-guaranteed, guaranteed, broadcast-tv, social, proposal-mode,
catalog-driven) under one unified hybrid shape.
* :class:`SignalsPlatform` — covers ``signal-marketplace`` +
``signal-owned``. Two methods: ``get_signals`` (catalog discovery)
and ``activate_signal`` (provisioning onto destination platforms).
* :class:`AudiencePlatform` — covers ``audience-sync``. Two methods:
``sync_audiences`` (push first-party CRM audiences with delta
upsert) and ``poll_audience_statuses`` (batch state read).

Other specialism Protocols (audience, signals, creative-*, governance,
property-lists, etc.) are added as adopters need them — first
:class:`SalesPlatform` because that's the v6.0 vertical-slice the
foundation PR proves out.
Remaining specialism Protocols (creative-*, governance-*,
brand-rights, content-standards, property-lists, collection-lists)
are added in subsequent breadth-sprint PRs as adopters need them.
"""

from __future__ import annotations

from adcp.decisioning.specialisms.audience import AudiencePlatform
from adcp.decisioning.specialisms.sales import SalesPlatform
from adcp.decisioning.specialisms.signals import SignalsPlatform

__all__ = ["SalesPlatform"]
__all__ = ["AudiencePlatform", "SalesPlatform", "SignalsPlatform"]
124 changes: 124 additions & 0 deletions src/adcp/decisioning/specialisms/audience.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""AudiencePlatform Protocol — covers the ``audience-sync`` specialism.

Used standalone (LiveRamp, Oracle Data Cloud, Salesforce CDP) or
composed with ``sales-social`` (Snap/Meta/TikTok). The framework owns
cross-platform threading + idempotency + cross-tenant scoping; the
adopter answers "given this audience, what happened on my system?"

The slug mirrors ``schemas/cache/enums/specialism.json``.

Two methods:

* :meth:`sync_audiences` — push audiences to the platform (creates,
updates, deletes per the wire spec)
* :meth:`poll_audience_statuses` — batch-poll current status for one
or more audiences

Mirrors the JS-side ``AudiencePlatform`` interface at
``src/lib/server/decisioning/specialisms/audiences.ts``.
"""

from __future__ import annotations

from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Generic, Protocol, runtime_checkable

from typing_extensions import TypeVar

if TYPE_CHECKING:
from adcp.decisioning.context import RequestContext
from adcp.decisioning.types import MaybeAsync
from adcp.types import (
SyncAudiencesAudience,
SyncAudiencesSuccessResponse,
)

#: Per-platform metadata generic; matches ``RequestContext[TMeta]`` and
#: ``Account[TMeta]`` upstream.
TMeta = TypeVar("TMeta", default=dict[str, Any])

# Note on adopter-facing row types: the wire schema doesn't export a
# top-level ``Audience`` type — the row shape is defined inline on
# ``SyncAudiencesRequest.audiences[]``. Adopters import
# :class:`adcp.types.SyncAudiencesAudience` directly for typing.
# The wire success response is :class:`adcp.types.SyncAudiencesSuccessResponse`,
# which wraps per-audience result rows in ``{audiences: [...]}`` with
# the spec's status enum (``created`` / ``updated`` / ``unchanged`` /
# ``deleted`` / ``failed``; note ``rejected`` is NOT a valid wire
# status — use ``failed`` for buyer-rejected audiences).


@runtime_checkable
class AudiencePlatform(Protocol, Generic[TMeta]):
"""Sync first-party CRM audiences with delta upsert semantics.

Methods may be sync (return ``T`` directly) or async (return
``Awaitable[T]``); the dispatch adapter detects via
:func:`asyncio.iscoroutinefunction` and runs sync methods on a
thread pool.

Throw :class:`adcp.decisioning.AdcpError` for buyer-fixable
rejection (``AUDIENCE_TOO_SMALL``, ``REFERENCE_NOT_FOUND``, etc.);
the framework projects to the wire structured-error envelope.
"""

def sync_audiences(
self,
audiences: Sequence[SyncAudiencesAudience],
ctx: RequestContext[TMeta],
) -> MaybeAsync[SyncAudiencesSuccessResponse]:
"""Push audiences to the platform.

Framework handles batching, idempotency, and cross-tenant
scoping; the adopter handles match-rate computation and
activation lifecycle.

Sync acknowledgment with status changes via
``ctx.publish_status_change``: return per-audience result rows
immediately (``'pending'`` / ``'matching'`` are valid sync
outcomes). The match-rate computation and activation pipeline
run in the background — call
``ctx.publish_status_change(resource_type='audience', ...)``
from the platform's webhook handler / job queue / cron when
each audience reaches a terminal state.

:param audiences: List of audience rows projected from the
wire ``SyncAudiencesRequest.audiences[]`` field. Adopter
ergonomic — receives the list directly rather than the
full request.
:raises adcp.decisioning.AdcpError: for buyer-fixable
rejection (e.g., ``AUDIENCE_TOO_SMALL``).
"""
...

def poll_audience_statuses(
self,
audience_ids: Sequence[str],
ctx: RequestContext[TMeta],
) -> MaybeAsync[Mapping[str, str]]:
"""Batch-poll current status for one or more audiences.

Sync — this is a state-read, not a mutating operation. Useful
for buyer-side polling outside the framework's task envelope
(e.g., querying long-lived audiences) and for adapter code
that needs to check N audiences at once.

Returns a ``dict[audience_id, AudienceStatus]``. Audiences not
found are omitted from the map (callers handle missing keys);
raise ``AdcpError(code='REFERENCE_NOT_FOUND')`` only when the
entire batch is unresolvable for the tenant.

Single-audience polling is
``poll_audience_statuses([id], ctx).get(id)``. The batch shape
composes with upstream identity-graph APIs that natively
return per-audience-id arrays — adopters do NOT need to wrap
a single-id lookup over an N-call loop.

Adopter-internal helper — not surfaced as a wire tool. Used
by adopter code orchestrating cross-platform audience flows
and by the framework's optional bulk-status middleware.
"""
...


__all__ = ["AudiencePlatform"]
114 changes: 114 additions & 0 deletions src/adcp/decisioning/specialisms/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""SignalsPlatform Protocol — covers ``signal-marketplace`` + ``signal-owned``.

A platform claiming either ``signal-marketplace`` (third-party data
brokers — LiveRamp, Oracle Data Cloud, third-party DMPs) or
``signal-owned`` (first-party data providers — publisher first-party
data, retailer customer-graph) implements the methods on this Protocol.
The slugs mirror ``schemas/cache/enums/specialism.json``.

Two methods:

* :meth:`get_signals` — sync catalog discovery
* :meth:`activate_signal` — sync provisioning onto destination platforms

Async story: ``activate_signal`` is sync at the wire level — its
response union has no ``Submitted`` arm. Long-running activation
pipelines (identity-graph match: 5–30 min, destination provisioning:
hours) return :class:`ActivateSignalSuccessResponse` immediately with
``deployments`` rows in ``pending`` state, then emit
``ctx.publish_status_change(resource_type='signal', ...)`` events as
each deployment reaches ``activating`` / ``deployed`` / ``failed``.

Mirrors the JS-side ``SignalsPlatform`` interface at
``src/lib/server/decisioning/specialisms/signals.ts``.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Generic, Protocol, runtime_checkable

from typing_extensions import TypeVar

if TYPE_CHECKING:
from adcp.decisioning.context import RequestContext
from adcp.decisioning.types import MaybeAsync
from adcp.types import (
ActivateSignalRequest,
ActivateSignalSuccessResponse,
GetSignalsRequest,
GetSignalsResponse,
)

#: Per-platform metadata generic; matches ``RequestContext[TMeta]`` and
#: ``Account[TMeta]`` upstream so a platform parameterizing
#: ``SignalsPlatform[TenantMeta]`` gets ``ctx.account.metadata``-style
#: typed access inside method bodies.
TMeta = TypeVar("TMeta", default=dict[str, Any])


@runtime_checkable
class SignalsPlatform(Protocol, Generic[TMeta]):
"""Catalog discovery + activation for marketplace / owned signals.

Methods may be sync (return ``T`` directly) or async (return
``Awaitable[T]``); the dispatch adapter detects via
:func:`asyncio.iscoroutinefunction` and runs sync methods on a
thread pool so a blocking sync handler doesn't serialize the event
loop.

Throw :class:`adcp.decisioning.AdcpError` for buyer-fixable
rejection (``SIGNAL_NOT_FOUND``, ``POLICY_VIOLATION``,
``INVALID_REQUEST``, etc.); the framework projects to the wire
structured-error envelope.
"""

def get_signals(
self,
req: GetSignalsRequest,
ctx: RequestContext[TMeta],
) -> MaybeAsync[GetSignalsResponse]:
"""Catalog discovery — query your signal index, return signals
matching the buyer's filters (industry, intent type, audience
size, etc.).

Sync at the wire level — :class:`GetSignalsResponse` has no
async envelope. Platforms with slow catalog stores need
internal caches.

:raises adcp.decisioning.AdcpError: ``code='POLICY_VIOLATION'``
when the buyer doesn't have rights to the requested data
category.
"""
...

def activate_signal(
self,
req: ActivateSignalRequest,
ctx: RequestContext[TMeta],
) -> MaybeAsync[ActivateSignalSuccessResponse]:
"""Provision a signal onto one or more destination platforms
(Snap, Meta, TikTok, etc.).

Returns the success-arm shape immediately with ``deployments``
rows in their current state — ``'pending'`` is a valid sync
return for slow activation pipelines.

Subsequent state changes (per-deployment ``activating`` /
``deployed`` / ``failed``) flow via
``ctx.publish_status_change(resource_type='signal',
resource_id=signal_agent_segment_id, payload=...)`` as each
destination's identity-graph match completes.

Use ``req.action='deactivate'`` for GDPR/CCPA-compliant
teardown when campaigns end.

:raises adcp.decisioning.AdcpError: ``code='SIGNAL_NOT_FOUND'``
(unknown ``signal_agent_segment_id``),
``code='POLICY_VIOLATION'`` (buyer lacks rights to activate
this data), or ``code='INVALID_REQUEST'`` (missing or
unrecognized destination).
"""
...


__all__ = ["SignalsPlatform"]
12 changes: 8 additions & 4 deletions tests/test_decisioning_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,18 +277,22 @@ def test_spec_specialism_enum_matches_schema_cache() -> None:

def test_validate_platform_warns_on_unenforced_spec_specialism() -> None:
"""Spec-recognized specialism that the v6.0 framework doesn't yet
enforce (e.g. ``signal-marketplace``) emits an "unenforced
enforce (e.g. ``creative-ad-server``) emits an "unenforced
specialism" UserWarning — distinct from the "novel" warning, since
it's a real claim, just not method-checked."""
it's a real claim, just not method-checked.

Use ``creative-ad-server`` here because ``signal-marketplace`` /
``audience-sync`` got method-coverage rules in the breadth-sprint
Batch 1; ``creative-*`` are still pending until Batch 2."""

class _UnenforcedSpecPlatform(DecisioningPlatform):
capabilities = DecisioningCapabilities(specialisms=["signal-marketplace"])
capabilities = DecisioningCapabilities(specialisms=["creative-ad-server"])
accounts = SingletonAccounts(account_id="hello")

with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always", UserWarning)
validate_platform(_UnenforcedSpecPlatform())
matched = [w for w in caught if "signal-marketplace" in str(w.message)]
matched = [w for w in caught if "creative-ad-server" in str(w.message)]
assert len(matched) == 1
assert "spec-recognized" in str(matched[0].message)

Expand Down
Loading
Loading