Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 227 additions & 2 deletions examples/v3_reference_seller/src/platform.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
"""DecisioningPlatform impl for the v3 reference seller.

Sales-non-guaranteed specialism with the five required Sales methods:
Sales-non-guaranteed specialism with seven implemented methods:

* :meth:`get_products` — read inventory catalog
* :meth:`create_media_buy` — terminal artifact insert; idempotency-keyed
* :meth:`update_media_buy` — patch (status / pause / spend cap)
* :meth:`sync_creatives` — accept creative manifests
* :meth:`get_media_buy_delivery` — read delivery actuals
* :meth:`sync_accounts` — upsert accounts; billing_entity stored with bank, stripped on response
* :meth:`list_accounts` — list accounts projected through the write-only guard

All five run against the SQLAlchemy models in :mod:`models`. The
All seven run against the SQLAlchemy models in :mod:`models`. The
platform reads the resolved :class:`adcp.decisioning.BuyerAgent`
from :attr:`RequestContext.buyer_agent` (set by the framework's
dispatch gate) and the :class:`adcp.decisioning.Account` from
Expand Down Expand Up @@ -36,16 +38,24 @@
DecisioningCapabilities,
DecisioningPlatform,
ExplicitAccounts,
project_account_for_response,
project_business_entity_for_response,
)
from adcp.decisioning.specialisms import SalesPlatform
from adcp.server import current_tenant
from adcp.types import (
Account as AdcpAccount,
CreateMediaBuyRequest,
CreateMediaBuySuccessResponse,
GetMediaBuyDeliveryRequest,
GetMediaBuyDeliveryResponse,
GetProductsRequest,
GetProductsResponse,
ListAccountsRequest,
ListAccountsResponse,
Product,
SyncAccountsRequest,
SyncAccountsSuccessResponse,
SyncCreativesRequest,
SyncCreativesSuccessResponse,
UpdateMediaBuyRequest,
Expand All @@ -58,6 +68,7 @@
from adcp.decisioning import RequestContext

from .models import Account as AccountRow
from .models import BuyerAgent as BuyerAgentRow
from .models import MediaBuy as MediaBuyRow

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -311,6 +322,220 @@ async def get_media_buy_delivery(
del req, ctx
return GetMediaBuyDeliveryResponse(media_buys=[])

# ----- sync_accounts ---------------------------------------------------

async def sync_accounts(
self, req: SyncAccountsRequest, ctx: RequestContext
) -> SyncAccountsSuccessResponse:
"""Upsert advertiser accounts and return results with bank details stripped.

Persists the full :attr:`billing_entity` (including bank coordinates) to the
``accounts`` table for invoicing, then echoes it back with ``bank`` cleared —
the AdCP 3.1 write-only guard. Production adopters add approval workflows,
credit-limit checks, and dry_run rollback here.

Note: ``dry_run`` is intentionally ignored in this reference impl — the
session always commits. Production adopters should branch on ``req.dry_run``
before ``session.begin()`` and call ``await session.rollback()`` (or avoid
``session.begin()`` entirely) when it is ``True``.
"""
if ctx.buyer_agent is None:
raise AdcpError(
"INTERNAL_ERROR",
message="Dispatch should have populated buyer_agent.",
recovery="terminal",
)
tenant = current_tenant()
if tenant is None:
raise AdcpError(
"AUTH_INVALID",
message="sync_accounts called without a tenant context.",
recovery="terminal",
)

results: list[dict[str, Any]] = []
async with self._sessionmaker() as session, session.begin():
# ctx.buyer_agent.ext doesn't carry the DB surrogate id — _row_to_agent
# in buyer_registry.py discards row.id. Re-query to get the FK.
ba_result = await session.execute(
select(BuyerAgentRow).where(
BuyerAgentRow.tenant_id == tenant.id,
BuyerAgentRow.agent_url == ctx.buyer_agent.agent_url,
)
)
ba_row = ba_result.scalar_one_or_none()
if ba_row is None:
raise AdcpError(
"INTERNAL_ERROR",
message=(
f"BuyerAgent row for {ctx.buyer_agent.agent_url!r} "
f"not found under tenant {tenant.id!r}."
),
recovery="terminal",
)

for acct in req.accounts:
# Natural upsert key — brand.domain and operator both forbid colons
# (domain-pattern validation), so the colon separator is safe.
# Adopters with an existing account-ID scheme should replace this
# derivation with their own stable key (e.g. a UUID assigned on
# first creation and stored in a separate column).
wire_acct_id = f"{acct.brand.domain}:{acct.operator}"

acct_result = await session.execute(
select(AccountRow).where(
AccountRow.tenant_id == tenant.id,
AccountRow.buyer_agent_id == ba_row.id,
AccountRow.account_id == wire_acct_id,
)
)
row = acct_result.scalar_one_or_none()

billing_json = (
acct.billing_entity.model_dump(mode="json", exclude_none=True)
if acct.billing_entity
else None
)
# Prefer the legal entity name for the human-readable account label;
# fall back to the brand domain when billing_entity is absent.
acct_name = (
acct.billing_entity.legal_name
if acct.billing_entity and acct.billing_entity.legal_name
else acct.brand.domain
)
# Normalise None → False for the NOT NULL column; preserve the
# normalised value in the response so DB and wire stay consistent.
sandbox_val = acct.sandbox if acct.sandbox is not None else False

if row is None:
action = "created"
row = AccountRow(
tenant_id=tenant.id,
buyer_agent_id=ba_row.id,
account_id=wire_acct_id,
name=acct_name,
status="active",
billing=acct.billing.value if acct.billing else None,
billing_entity=billing_json,
sandbox=sandbox_val,
)
session.add(row)
status_str = "active"
else:
action = "updated"
row.billing_entity = billing_json
row.name = acct_name
if acct.billing:
row.billing = acct.billing.value
row.sandbox = sandbox_val
row.updated_at = datetime.now(timezone.utc)
status_str = row.status

# Strip write-only bank details before echoing in the response.
# Entity-level helper used here because acct.billing_entity is a
# raw BusinessEntity from the wire — not a full stored Account.
# Use project_account_for_response when you have an Account object
# from storage (as in list_accounts below).
safe_be = (
project_business_entity_for_response(acct.billing_entity)
if acct.billing_entity is not None
else None
)
results.append(
{
"account_id": wire_acct_id,
"name": acct_name,
"brand": acct.brand.model_dump(mode="json"),
"operator": acct.operator,
"action": action,
"status": status_str,
"billing": acct.billing.value if acct.billing else None,
"billing_entity": (
safe_be.model_dump(mode="json", exclude_none=True)
if safe_be is not None
else None
),
"sandbox": sandbox_val,
}
)

# dry_run is intentionally always False here — we always commit.
# Echoing req.dry_run=True would lie to the caller about what was persisted.
return SyncAccountsSuccessResponse(accounts=results, dry_run=False)

# ----- list_accounts ---------------------------------------------------

async def list_accounts(
self, req: ListAccountsRequest, ctx: RequestContext
) -> ListAccountsResponse:
"""List accounts for the authenticated agent, bank details stripped (write-only).

Applies optional ``status`` and ``sandbox`` filters from the request.
Production adopters add pagination, ownership scoping, and field-level
access control here.
"""
if ctx.buyer_agent is None:
raise AdcpError(
"INTERNAL_ERROR",
message="Dispatch should have populated buyer_agent.",
recovery="terminal",
)
tenant = current_tenant()
if tenant is None:
raise AdcpError(
"AUTH_INVALID",
message="list_accounts called without a tenant context.",
recovery="terminal",
)

async with self._sessionmaker() as session:
# ctx.buyer_agent.ext doesn't carry the DB surrogate id — re-query.
ba_result = await session.execute(
select(BuyerAgentRow).where(
BuyerAgentRow.tenant_id == tenant.id,
BuyerAgentRow.agent_url == ctx.buyer_agent.agent_url,
)
)
ba_row = ba_result.scalar_one_or_none()
if ba_row is None:
return ListAccountsResponse(accounts=[])

query = select(AccountRow).where(
AccountRow.tenant_id == tenant.id,
AccountRow.buyer_agent_id == ba_row.id,
)
if req.status is not None:
query = query.where(AccountRow.status == req.status.value)
if req.sandbox is not None:
query = query.where(AccountRow.sandbox == req.sandbox)

acct_result = await session.execute(query)
rows = list(acct_result.scalars().all())

accounts = []
for row in rows:
# model_validate coerces the billing_entity JSON dict → BusinessEntity
# so project_account_for_response receives a real Pydantic object and
# can call model_copy() safely.
wire_acct = AdcpAccount.model_validate(
{
"account_id": row.account_id,
"name": row.name,
"status": row.status,
"billing": row.billing,
"billing_entity": row.billing_entity,
"rate_card": row.rate_card,
"payment_terms": row.payment_terms,
"credit_limit": row.credit_limit,
"sandbox": row.sandbox,
"ext": row.ext,
"reporting_bucket": row.reporting_bucket,
}
)
accounts.append(project_account_for_response(wire_acct))

return ListAccountsResponse(accounts=accounts)


def _project_start_time(value: Any) -> datetime:
"""Project :class:`StartTiming` (root: ``'asap'`` | :class:`AwareDatetime`)
Expand Down
Loading
Loading