Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/adcp/decisioning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def create_media_buy(
MaybeAsync,
SalesResult,
TaskHandoff,
WorkflowHandoff,
)

__all__ = [
Expand Down Expand Up @@ -147,6 +148,7 @@ def create_media_buy(
"TaskHandoffContext",
"TaskRegistry",
"TaskState",
"WorkflowHandoff",
"create_adcp_server_from_platform",
"serve",
"WorkflowObjectType",
Expand Down
82 changes: 81 additions & 1 deletion src/adcp/decisioning/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from adcp.decisioning.resolve import ResourceResolver, _make_default_resolver
from adcp.decisioning.state import StateReader, _make_default_state_reader
from adcp.decisioning.types import Account, TaskHandoff
from adcp.decisioning.types import Account, TaskHandoff, WorkflowHandoff
from adcp.server.base import ToolContext

if TYPE_CHECKING:
Expand Down Expand Up @@ -195,5 +195,85 @@ def handoff_to_task(
Adopter code passes either a coroutine function (``async def
review_async(task_ctx): ...``) or a sync callable; the
dispatcher detects which and runs it appropriately.

For external workflows that complete on their own schedule
(human queue review, batch jobs, Airflow DAGs, ML pipelines)
— use :meth:`handoff_to_workflow` instead. The split is purely
about where the work runs (in-process / framework-managed vs.
adopter-owned external system).
"""
return TaskHandoff(fn)

def handoff_to_workflow(
self,
fn: Callable[[Any], Awaitable[None] | None],
) -> WorkflowHandoff:
"""Promote this call to an externally-completed task.

For workflows that run OUTSIDE the framework's process —
human queue review (trafficker UI), nightly batch jobs,
Airflow DAGs, ML pipelines, scheduled cron. The framework
allocates a ``task_id``, calls ``fn`` ONCE synchronously
(or awaits it if a coroutine) to register the work into the
adopter's external system, persists ``submitted`` state, and
returns the wire envelope. NO background coroutine runs in
the framework.

``fn`` receives a :class:`TaskHandoffContext` carrying
``id`` (framework-allocated task_id) and ``_registry``
(adopter can stash a reference for later completion). The
adopter's external workflow later calls
``registry.complete(task_id, result)`` or
``registry.fail(task_id, error)`` directly when the work
finishes — minutes, hours, or days later.

Buyer experience is identical to :meth:`handoff_to_task` —
same ``{task_id, status: 'submitted'}`` wire envelope, same
``tasks/get`` polling, same push-notification webhook on
terminal state.

**Rollback.** If ``fn`` raises during enqueue, the framework
discards the just-allocated task_id from the registry and
propagates the exception (wrapped to ``AdcpError`` per the
dispatch contract). Adopter enqueue fns that need
transactional persistence wrap their own DB write in their
own transaction; the framework's rollback is registry-side
only.

Example::

class TraffickerSeller(DecisioningPlatform):
def __init__(self, review_queue, task_registry):
self.review_queue = review_queue
# Stash for later completion when human acts
self.task_registry = task_registry

def create_media_buy(self, req, ctx):
if self._needs_human_approval(req):
return ctx.handoff_to_workflow(
lambda task_ctx: self._enqueue(task_ctx, req)
)
return CreateMediaBuySuccess(media_buy_id="mb_1", ...)

def _enqueue(self, task_ctx, req):
self.review_queue.add(
task_id=task_ctx.id,
request_snapshot=req.model_dump(),
)

# Elsewhere — Flask handler for the trafficker UI:
async def on_decision(self, task_id, decision):
if decision.approved:
await self.task_registry.complete(
task_id,
CreateMediaBuySuccess(...).model_dump(),
)
else:
await self.task_registry.fail(
task_id, AdcpError(...).to_wire(),
)

See :class:`adcp.decisioning.WorkflowHandoff` for the full
semantics.
"""
return WorkflowHandoff(fn)
93 changes: 92 additions & 1 deletion src/adcp/decisioning/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@
TaskHandoffContext,
TaskRegistry,
)
from adcp.decisioning.types import AdcpError, TaskHandoff, is_task_handoff
from adcp.decisioning.types import (
AdcpError,
TaskHandoff,
WorkflowHandoff,
is_task_handoff,
is_workflow_handoff,
)

if TYPE_CHECKING:
from pydantic import BaseModel
Expand Down Expand Up @@ -806,6 +812,14 @@ async def _invoke_platform_method(
registry=registry,
executor=executor,
)
if is_workflow_handoff(result):
return await _project_workflow_handoff(
result,
ctx,
method_name=method_name,
registry=registry,
executor=executor,
)
return result


Expand Down Expand Up @@ -939,6 +953,83 @@ async def _run() -> None:
_BACKGROUND_HANDOFF_TASKS: set[asyncio.Task[None]] = set()


async def _project_workflow_handoff(
handoff: WorkflowHandoff,
ctx: RequestContext[Any],
*,
method_name: str,
registry: TaskRegistry,
executor: ThreadPoolExecutor,
) -> dict[str, Any]:
"""Project a :class:`WorkflowHandoff` to the wire Submitted envelope.

Distinct from :func:`_project_handoff`: NO background coroutine
runs. The framework allocates a ``task_id`` via
:meth:`TaskRegistry.issue` and calls the adopter's enqueue fn
ONCE — synchronously if it's a sync callable, awaited if it's a
coroutine. The enqueue fn registers the work into the adopter's
external system (trafficker UI queue, batch DB, Airflow trigger,
etc.) and returns; the framework then returns the Submitted
envelope to the buyer.

The adopter's external workflow later calls
``registry.complete(task_id, result)`` or
``registry.fail(task_id, error)`` directly — minutes, hours, or
days later. The registry is the long-lived control surface; the
framework's role ends after enqueue.

**Rollback.** If the enqueue fn raises, the just-allocated
task_id is discarded from the registry via
:meth:`TaskRegistry.discard` so the buyer never sees a Submitted
envelope referencing an orphan id their external workflow never
registered. The exception is re-raised; the dispatch wrapper
catches it and projects to ``AdcpError`` per the handler
contract.

:param method_name: Wire-spec verb name — used as ``task_type``
on the registry row so ``tasks/get`` round-trips correctly.
"""
fn = handoff._fn

task_id = await registry.issue(
account_id=ctx.account.id,
task_type=method_name,
)
handoff_ctx = TaskHandoffContext(id=task_id, _registry=registry)

try:
if asyncio.iscoroutinefunction(fn):
await fn(handoff_ctx)
else:
ctx_snapshot = contextvars.copy_context()
loop = asyncio.get_running_loop()
await loop.run_in_executor(
executor,
functools.partial(ctx_snapshot.run, fn, handoff_ctx),
)
except BaseException:
# Rollback: the buyer can't be left with a Submitted envelope
# referencing a task_id the adopter's external workflow never
# registered. Discard the just-allocated registry row, then
# re-raise so the outer dispatch wrapper projects the
# exception to AdcpError. ``BaseException`` (not Exception)
# so KeyboardInterrupt / SystemExit also clean up the
# registry side; framework state should never strand on
# interpreter teardown.
await registry.discard(task_id)
raise

# Wire ``Submitted`` envelope — same shape as the TaskHandoff
# path. Buyers can't tell which path the seller took; that's
# intentional. ``task_type`` lives on the registry row (for
# ``tasks/get``), not on the wire envelope, per the same posture
# as :func:`_project_handoff`.
return {
"task_id": task_id,
"status": "submitted",
}


__all__ = [
"REQUIRED_METHODS_PER_SPECIALISM",
"SPEC_SPECIALISM_ENUM",
Expand Down
29 changes: 29 additions & 0 deletions src/adcp/decisioning/task_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,28 @@ async def get(
"""
...

async def discard(self, task_id: str) -> None:
"""Remove a task_id from the registry — rollback path.

Used by the WorkflowHandoff dispatch projection
(:func:`adcp.decisioning.dispatch._project_workflow_handoff`)
when the adopter's enqueue fn raises after the task_id has
been allocated. Without rollback, the buyer would receive a
Submitted envelope referencing an orphan task_id their
external workflow never registered.

Idempotent: discarding an unknown task_id is a no-op (no
raise). The discard window is tightly scoped — between
``issue()`` and the framework's projection step, with the
adopter's enqueue fn in between. In practice this is a few
milliseconds.

Adopters MUST NOT call ``discard`` on a task that has
progressed past ``submitted`` — that's the wrong recovery
path; use ``fail()`` instead.
"""
...


# ---------------------------------------------------------------------------
# In-memory reference implementation — v6.0 ships this; v6.1 lands a
Expand Down Expand Up @@ -417,6 +439,13 @@ async def get(
return None
return record.to_dict()

async def discard(self, task_id: str) -> None:
async with self._lock:
# Idempotent: pop with default. The Protocol contract
# tolerates discarding an unknown id (no raise) so the
# WorkflowHandoff projection's rollback can be unconditional.
self._records.pop(task_id, None)


# ---------------------------------------------------------------------------
# TaskHandoffContext — what the framework passes into adopter handoff fns
Expand Down
Loading
Loading