diff --git a/src/adcp/decisioning/__init__.py b/src/adcp/decisioning/__init__.py index cb63a0def..512ac1ff2 100644 --- a/src/adcp/decisioning/__init__.py +++ b/src/adcp/decisioning/__init__.py @@ -107,6 +107,7 @@ def create_media_buy( MaybeAsync, SalesResult, TaskHandoff, + WorkflowHandoff, ) __all__ = [ @@ -147,6 +148,7 @@ def create_media_buy( "TaskHandoffContext", "TaskRegistry", "TaskState", + "WorkflowHandoff", "create_adcp_server_from_platform", "serve", "WorkflowObjectType", diff --git a/src/adcp/decisioning/context.py b/src/adcp/decisioning/context.py index 5b5d91ee6..71f694659 100644 --- a/src/adcp/decisioning/context.py +++ b/src/adcp/decisioning/context.py @@ -23,7 +23,7 @@ from adcp.decisioning.resolve import ResourceResolver, _make_default_resolver from adcp.decisioning.state import StateReader, _make_default_state_reader -from adcp.decisioning.types import Account, TaskHandoff +from adcp.decisioning.types import Account, TaskHandoff, WorkflowHandoff from adcp.server.base import ToolContext if TYPE_CHECKING: @@ -195,5 +195,85 @@ def handoff_to_task( Adopter code passes either a coroutine function (``async def review_async(task_ctx): ...``) or a sync callable; the dispatcher detects which and runs it appropriately. + + For external workflows that complete on their own schedule + (human queue review, batch jobs, Airflow DAGs, ML pipelines) + — use :meth:`handoff_to_workflow` instead. The split is purely + about where the work runs (in-process / framework-managed vs. + adopter-owned external system). """ return TaskHandoff(fn) + + def handoff_to_workflow( + self, + fn: Callable[[Any], Awaitable[None] | None], + ) -> WorkflowHandoff: + """Promote this call to an externally-completed task. + + For workflows that run OUTSIDE the framework's process — + human queue review (trafficker UI), nightly batch jobs, + Airflow DAGs, ML pipelines, scheduled cron. The framework + allocates a ``task_id``, calls ``fn`` ONCE synchronously + (or awaits it if a coroutine) to register the work into the + adopter's external system, persists ``submitted`` state, and + returns the wire envelope. NO background coroutine runs in + the framework. + + ``fn`` receives a :class:`TaskHandoffContext` carrying + ``id`` (framework-allocated task_id) and ``_registry`` + (adopter can stash a reference for later completion). The + adopter's external workflow later calls + ``registry.complete(task_id, result)`` or + ``registry.fail(task_id, error)`` directly when the work + finishes — minutes, hours, or days later. + + Buyer experience is identical to :meth:`handoff_to_task` — + same ``{task_id, status: 'submitted'}`` wire envelope, same + ``tasks/get`` polling, same push-notification webhook on + terminal state. + + **Rollback.** If ``fn`` raises during enqueue, the framework + discards the just-allocated task_id from the registry and + propagates the exception (wrapped to ``AdcpError`` per the + dispatch contract). Adopter enqueue fns that need + transactional persistence wrap their own DB write in their + own transaction; the framework's rollback is registry-side + only. + + Example:: + + class TraffickerSeller(DecisioningPlatform): + def __init__(self, review_queue, task_registry): + self.review_queue = review_queue + # Stash for later completion when human acts + self.task_registry = task_registry + + def create_media_buy(self, req, ctx): + if self._needs_human_approval(req): + return ctx.handoff_to_workflow( + lambda task_ctx: self._enqueue(task_ctx, req) + ) + return CreateMediaBuySuccess(media_buy_id="mb_1", ...) + + def _enqueue(self, task_ctx, req): + self.review_queue.add( + task_id=task_ctx.id, + request_snapshot=req.model_dump(), + ) + + # Elsewhere — Flask handler for the trafficker UI: + async def on_decision(self, task_id, decision): + if decision.approved: + await self.task_registry.complete( + task_id, + CreateMediaBuySuccess(...).model_dump(), + ) + else: + await self.task_registry.fail( + task_id, AdcpError(...).to_wire(), + ) + + See :class:`adcp.decisioning.WorkflowHandoff` for the full + semantics. + """ + return WorkflowHandoff(fn) diff --git a/src/adcp/decisioning/dispatch.py b/src/adcp/decisioning/dispatch.py index 8a130b249..0eb5f39aa 100644 --- a/src/adcp/decisioning/dispatch.py +++ b/src/adcp/decisioning/dispatch.py @@ -53,7 +53,13 @@ TaskHandoffContext, TaskRegistry, ) -from adcp.decisioning.types import AdcpError, TaskHandoff, is_task_handoff +from adcp.decisioning.types import ( + AdcpError, + TaskHandoff, + WorkflowHandoff, + is_task_handoff, + is_workflow_handoff, +) if TYPE_CHECKING: from pydantic import BaseModel @@ -806,6 +812,14 @@ async def _invoke_platform_method( registry=registry, executor=executor, ) + if is_workflow_handoff(result): + return await _project_workflow_handoff( + result, + ctx, + method_name=method_name, + registry=registry, + executor=executor, + ) return result @@ -939,6 +953,83 @@ async def _run() -> None: _BACKGROUND_HANDOFF_TASKS: set[asyncio.Task[None]] = set() +async def _project_workflow_handoff( + handoff: WorkflowHandoff, + ctx: RequestContext[Any], + *, + method_name: str, + registry: TaskRegistry, + executor: ThreadPoolExecutor, +) -> dict[str, Any]: + """Project a :class:`WorkflowHandoff` to the wire Submitted envelope. + + Distinct from :func:`_project_handoff`: NO background coroutine + runs. The framework allocates a ``task_id`` via + :meth:`TaskRegistry.issue` and calls the adopter's enqueue fn + ONCE — synchronously if it's a sync callable, awaited if it's a + coroutine. The enqueue fn registers the work into the adopter's + external system (trafficker UI queue, batch DB, Airflow trigger, + etc.) and returns; the framework then returns the Submitted + envelope to the buyer. + + The adopter's external workflow later calls + ``registry.complete(task_id, result)`` or + ``registry.fail(task_id, error)`` directly — minutes, hours, or + days later. The registry is the long-lived control surface; the + framework's role ends after enqueue. + + **Rollback.** If the enqueue fn raises, the just-allocated + task_id is discarded from the registry via + :meth:`TaskRegistry.discard` so the buyer never sees a Submitted + envelope referencing an orphan id their external workflow never + registered. The exception is re-raised; the dispatch wrapper + catches it and projects to ``AdcpError`` per the handler + contract. + + :param method_name: Wire-spec verb name — used as ``task_type`` + on the registry row so ``tasks/get`` round-trips correctly. + """ + fn = handoff._fn + + task_id = await registry.issue( + account_id=ctx.account.id, + task_type=method_name, + ) + handoff_ctx = TaskHandoffContext(id=task_id, _registry=registry) + + try: + if asyncio.iscoroutinefunction(fn): + await fn(handoff_ctx) + else: + ctx_snapshot = contextvars.copy_context() + loop = asyncio.get_running_loop() + await loop.run_in_executor( + executor, + functools.partial(ctx_snapshot.run, fn, handoff_ctx), + ) + except BaseException: + # Rollback: the buyer can't be left with a Submitted envelope + # referencing a task_id the adopter's external workflow never + # registered. Discard the just-allocated registry row, then + # re-raise so the outer dispatch wrapper projects the + # exception to AdcpError. ``BaseException`` (not Exception) + # so KeyboardInterrupt / SystemExit also clean up the + # registry side; framework state should never strand on + # interpreter teardown. + await registry.discard(task_id) + raise + + # Wire ``Submitted`` envelope — same shape as the TaskHandoff + # path. Buyers can't tell which path the seller took; that's + # intentional. ``task_type`` lives on the registry row (for + # ``tasks/get``), not on the wire envelope, per the same posture + # as :func:`_project_handoff`. + return { + "task_id": task_id, + "status": "submitted", + } + + __all__ = [ "REQUIRED_METHODS_PER_SPECIALISM", "SPEC_SPECIALISM_ENUM", diff --git a/src/adcp/decisioning/task_registry.py b/src/adcp/decisioning/task_registry.py index 08a348f30..0c7ea4273 100644 --- a/src/adcp/decisioning/task_registry.py +++ b/src/adcp/decisioning/task_registry.py @@ -267,6 +267,28 @@ async def get( """ ... + async def discard(self, task_id: str) -> None: + """Remove a task_id from the registry — rollback path. + + Used by the WorkflowHandoff dispatch projection + (:func:`adcp.decisioning.dispatch._project_workflow_handoff`) + when the adopter's enqueue fn raises after the task_id has + been allocated. Without rollback, the buyer would receive a + Submitted envelope referencing an orphan task_id their + external workflow never registered. + + Idempotent: discarding an unknown task_id is a no-op (no + raise). The discard window is tightly scoped — between + ``issue()`` and the framework's projection step, with the + adopter's enqueue fn in between. In practice this is a few + milliseconds. + + Adopters MUST NOT call ``discard`` on a task that has + progressed past ``submitted`` — that's the wrong recovery + path; use ``fail()`` instead. + """ + ... + # --------------------------------------------------------------------------- # In-memory reference implementation — v6.0 ships this; v6.1 lands a @@ -417,6 +439,13 @@ async def get( return None return record.to_dict() + async def discard(self, task_id: str) -> None: + async with self._lock: + # Idempotent: pop with default. The Protocol contract + # tolerates discarding an unknown id (no raise) so the + # WorkflowHandoff projection's rollback can be unconditional. + self._records.pop(task_id, None) + # --------------------------------------------------------------------------- # TaskHandoffContext — what the framework passes into adopter handoff fns diff --git a/src/adcp/decisioning/types.py b/src/adcp/decisioning/types.py index 37fe9ea93..e37d2d83b 100644 --- a/src/adcp/decisioning/types.py +++ b/src/adcp/decisioning/types.py @@ -178,36 +178,23 @@ def create_media_buy(self, req, ctx): artifact, and emits a webhook on completion. Typical wall-clock: seconds to minutes. - **What TaskHandoff is NOT for** — human-driven HITL workflows where - a real person eventually clicks "approve" in a queue. The handoff - fn would either block the framework's background runner indefinitely - (until the human acts), or poll an external queue (which doesn't - fit the "fn returns terminal artifact" contract). For human-approval - workflows, the recommended pattern is: - - 1. Adopter persists the in-flight buy in their own DB and returns - ``input-required`` (NOT a TaskHandoff) on the synchronous arm, - carrying a stable ``task_id`` they allocated. The - ``input-required`` status is in the spec task-status enum - (``schemas/cache/enums/task-status.json``); the per-tool - ``*_async_response_input_required`` envelopes are in - :mod:`adcp.types`. - 2. Trafficker UI flips the row to approved/rejected on its own - schedule. - 3. Adopter's webhook emitter fires the terminal webhook to the - buyer when the human acts. Use the SDK's webhook primitives in - :mod:`adcp.webhooks` (payload builders) + - :mod:`adcp.webhook_sender` (HMAC-SHA256 signing + IP-pinned - transport delivery) — same wire shape the framework uses on the - TaskHandoff path. - - The adopter owns the whole lifecycle in the human-driven case; the - framework's TaskHandoff projector exists only for the "fn returns - terminal artifact within a reasonable wall-clock" shape. v6.1 may - add a richer ``ctx.handoff_to_human()`` primitive for the queued- - approval pattern; a worked end-to-end example lands with the - multi-tenant ``hello_publisher.py`` example in 4.5.0. For v6.0, - keep human approvals out of the TaskHandoff path. + **What TaskHandoff is NOT for** — external workflows that complete + on their own schedule (human queue review, nightly batch jobs, + Airflow DAGs, ML pipelines that run hours later). The handoff fn + would either block the framework's background runner indefinitely + (until the external system acts), or poll an external queue (which + doesn't fit the "fn returns terminal artifact" contract). Use + :class:`WorkflowHandoff` instead — obtained via + :meth:`RequestContext.handoff_to_workflow`. The framework allocates + a ``task_id``, persists ``submitted`` state, and returns the wire + envelope; the adopter's external system later calls + ``registry.complete(task_id, result)`` or + ``registry.fail(task_id, error)`` directly. + + Buyer experience is identical across the three paths — sync return, + TaskHandoff, WorkflowHandoff — they all surface as polled-or-webhook + completion against the same wire shape. The split is purely about + where the work runs (in-process / framework-managed / adopter-owned). """ __slots__ = ("_fn",) @@ -236,6 +223,101 @@ def is_task_handoff(obj: Any) -> bool: return type(obj) is TaskHandoff +class WorkflowHandoff: + """Marker the framework recognizes as 'register this call as a task + completed externally.' + + Adopters obtain instances via :meth:`RequestContext.handoff_to_workflow`; + the framework dispatches based on type-identity (``type(obj) is + WorkflowHandoff``) — same posture as :class:`TaskHandoff`. + + **Distinct from :class:`TaskHandoff`.** TaskHandoff is for + framework-managed in-process async work — the adopter's coroutine + runs in the background and returns a terminal artifact within + seconds-to-minutes. WorkflowHandoff is for adopter-owned external + workflows that complete on their own schedule (human queue review, + nightly batch jobs, Airflow DAGs, ML pipelines, scheduled cron). + The framework allocates a ``task_id``, calls the adopter's enqueue + fn ONCE synchronously to register the work into the adopter's + external system, persists ``submitted`` state, and returns the + wire envelope. NO background coroutine runs. + + The adopter's external workflow later calls + ``registry.complete(task_id, result)`` or + ``registry.fail(task_id, error)`` directly — the registry handle + is plumbed through the platform's own DI / app-level config. + + Example:: + + class TraffickerSeller(DecisioningPlatform): + def __init__(self, review_queue, task_registry): + self.review_queue = review_queue + # Stash the registry so the trafficker UI can call + # registry.complete(task_id, result) when the human acts. + self.task_registry = task_registry + + def create_media_buy(self, req, ctx): + if self._needs_human_approval(req): + # Framework allocates task_id, calls _enqueue with + # task_ctx, persists 'submitted', returns Submitted. + # No background work runs in the framework. + return ctx.handoff_to_workflow( + lambda task_ctx: self._enqueue(task_ctx, req) + ) + return CreateMediaBuySuccess(media_buy_id="mb_1", ...) + + def _enqueue(self, task_ctx, req): + # Persist for the trafficker UI. ``task_ctx.id`` is the + # framework-allocated task_id; the buyer polls/webhooks + # on this id. + self.review_queue.add( + task_id=task_ctx.id, + request_snapshot=req.model_dump(), + ) + # Return — no work done here. Trafficker UI completes + # via self.task_registry.complete() when they decide. + + **Wire-shape parity.** The buyer cannot tell whether the seller + used sync, TaskHandoff, or WorkflowHandoff. All three project to + the same Submitted envelope (``{task_id, status: 'submitted'}``); + completion (whenever it happens, by whatever path) flows via + ``tasks/get`` or push-notification webhook with the same payload + shape. + + **Rollback.** If the enqueue fn raises, the framework discards + the just-allocated task_id from the registry and propagates the + exception (wrapped to ``AdcpError`` per the dispatch contract). + The buyer never sees an orphan task_id they can't reach. Adopter + enqueue fns that need transactional persistence wrap their own DB + write in their own transaction; the framework's rollback is + registry-side only. + """ + + __slots__ = ("_fn",) + + def __init__(self, fn: Callable[[Any], Awaitable[None] | None]) -> None: + # ``fn`` is ``Callable[[TaskHandoffContext], Awaitable[None] | + # None]`` — the framework calls it once synchronously (or + # awaits it if a coroutine) at handoff time. Return value + # unused; the adopter's external workflow completes via + # ``registry.complete()`` later, NOT via fn return. + # TaskHandoffContext lives in task_registry.py to avoid a cycle. + self._fn = fn + + def __repr__(self) -> str: + return "WorkflowHandoff()" + + +def is_workflow_handoff(obj: Any) -> bool: + """Type-identity dispatch helper for :class:`WorkflowHandoff`. + + Same posture as :func:`is_task_handoff`: ``type(obj) is + WorkflowHandoff``, not ``isinstance``. Adopter subclasses are not + supported. + """ + return type(obj) is WorkflowHandoff + + # --------------------------------------------------------------------------- # Result type aliases # --------------------------------------------------------------------------- diff --git a/tests/test_decisioning_task_registry.py b/tests/test_decisioning_task_registry.py index cb256c94a..582c81af1 100644 --- a/tests/test_decisioning_task_registry.py +++ b/tests/test_decisioning_task_registry.py @@ -77,6 +77,9 @@ async def get( ) -> dict[str, Any] | None: return None + async def discard(self, task_id: str) -> None: + pass + assert isinstance(_Stub(), TaskRegistry) diff --git a/tests/test_decisioning_workflow_handoff.py b/tests/test_decisioning_workflow_handoff.py new file mode 100644 index 000000000..a66ae6aa2 --- /dev/null +++ b/tests/test_decisioning_workflow_handoff.py @@ -0,0 +1,427 @@ +"""Tests for ``handoff_to_workflow`` — the externally-completed task primitive. + +Distinct from ``handoff_to_task`` (framework-managed background work). +``handoff_to_workflow`` is for adopter-owned external workflows +(human queue review, batch jobs, Airflow DAGs, ML pipelines, scheduled +cron) that complete on their own schedule via direct calls to +``registry.complete()`` / ``registry.fail()``. + +Test surfaces: + +* Wire-shape parity with TaskHandoff (Submitted envelope identical). +* Sync + async enqueue fns supported. +* Registry rollback on enqueue exception (no orphan task_id reaches + the buyer). +* No background coroutine runs in the framework. +* External completion via ``registry.complete()`` transitions state + correctly; buyers polling ``tasks/get`` see the terminal artifact. +* Cross-tenant probe semantics survive the workflow lifecycle. +""" + +from __future__ import annotations + +import asyncio +from concurrent.futures import ThreadPoolExecutor + +import pytest +from pydantic import BaseModel + +from adcp.decisioning import ( + DecisioningCapabilities, + DecisioningPlatform, + InMemoryTaskRegistry, + SingletonAccounts, + WorkflowHandoff, +) +from adcp.decisioning.dispatch import ( + _build_request_context, + _invoke_platform_method, + _project_workflow_handoff, +) +from adcp.decisioning.types import Account, AdcpError +from adcp.server.base import ToolContext + + +@pytest.fixture +def executor(): + pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test-workflow-") + yield pool + pool.shutdown(wait=True) + + +class _ProductsRequest(BaseModel): + pass + + +# ---- Public API + marker shape ---- + + +def test_handoff_to_workflow_returns_workflow_marker() -> None: + """``ctx.handoff_to_workflow(fn)`` returns a :class:`WorkflowHandoff` + marker — distinct from :class:`TaskHandoff`.""" + from adcp.decisioning.context import RequestContext + + ctx = RequestContext() + marker = ctx.handoff_to_workflow(lambda task_ctx: None) + assert type(marker) is WorkflowHandoff + + +def test_workflow_handoff_dispatch_uses_type_identity_not_isinstance() -> None: + """``is_workflow_handoff`` matches by type identity. Adopter + subclasses don't trigger the workflow path — they fall through to + sync return (silent — adopter contract).""" + from adcp.decisioning.types import is_workflow_handoff + + assert is_workflow_handoff(WorkflowHandoff(lambda task_ctx: None)) + + class _Subclass(WorkflowHandoff): + pass + + assert not is_workflow_handoff(_Subclass(lambda task_ctx: None)) + + +# ---- Wire-shape parity ---- + + +@pytest.mark.asyncio +async def test_workflow_handoff_returns_submitted_envelope( + executor: ThreadPoolExecutor, +) -> None: + """The wire envelope is the EXACT same shape as TaskHandoff: + ``{task_id, status: 'submitted'}``. Buyers can't tell which + path the seller took.""" + registry = InMemoryTaskRegistry() + ctx = _build_request_context(ToolContext(), Account(id="acct_a"), None) + + enqueue_called = False + + def _enqueue(task_ctx): + nonlocal enqueue_called + enqueue_called = True + + handoff = WorkflowHandoff(_enqueue) + envelope = await _project_workflow_handoff( + handoff, + ctx, + method_name="create_media_buy", + registry=registry, + executor=executor, + ) + assert set(envelope.keys()) == {"task_id", "status"} + assert envelope["status"] == "submitted" + assert envelope["task_id"].startswith("task_") + assert enqueue_called is True + + +@pytest.mark.asyncio +async def test_workflow_handoff_persists_submitted_state_in_registry( + executor: ThreadPoolExecutor, +) -> None: + """After projection, the registry holds a record in + ``submitted`` state — the adopter's external workflow drives + the eventual transition to completed/failed.""" + registry = InMemoryTaskRegistry() + ctx = _build_request_context(ToolContext(), Account(id="acct_a"), None) + + handoff = WorkflowHandoff(lambda task_ctx: None) + envelope = await _project_workflow_handoff( + handoff, + ctx, + method_name="create_media_buy", + registry=registry, + executor=executor, + ) + rec = await registry.get(envelope["task_id"], expected_account_id="acct_a") + assert rec is not None + assert rec["state"] == "submitted" + assert rec["task_type"] == "create_media_buy" + # No background work ran — no progress or terminal artifact. + assert rec["progress"] is None + assert rec["result"] is None + assert rec["error"] is None + + +# ---- Sync + async enqueue ---- + + +@pytest.mark.asyncio +async def test_workflow_handoff_supports_async_enqueue( + executor: ThreadPoolExecutor, +) -> None: + """Async enqueue fn — framework awaits it inline, doesn't dispatch + to a background task.""" + registry = InMemoryTaskRegistry() + ctx = _build_request_context(ToolContext(), Account(id="acct_a"), None) + + enqueued_task_ids: list[str] = [] + + async def _async_enqueue(task_ctx): + # Could await something here (DB write, queue publish). + await asyncio.sleep(0) + enqueued_task_ids.append(task_ctx.id) + + envelope = await _project_workflow_handoff( + WorkflowHandoff(_async_enqueue), + ctx, + method_name="create_media_buy", + registry=registry, + executor=executor, + ) + assert enqueued_task_ids == [envelope["task_id"]] + + +@pytest.mark.asyncio +async def test_workflow_handoff_supports_sync_enqueue( + executor: ThreadPoolExecutor, +) -> None: + """Sync enqueue fn — framework runs on the executor with a + contextvars snapshot.""" + registry = InMemoryTaskRegistry() + ctx = _build_request_context(ToolContext(), Account(id="acct_a"), None) + + enqueued: list[str] = [] + + def _sync_enqueue(task_ctx): + enqueued.append(task_ctx.id) + + envelope = await _project_workflow_handoff( + WorkflowHandoff(_sync_enqueue), + ctx, + method_name="create_media_buy", + registry=registry, + executor=executor, + ) + assert enqueued == [envelope["task_id"]] + + +# ---- Rollback on enqueue exception ---- + + +@pytest.mark.asyncio +async def test_workflow_handoff_rolls_back_registry_on_sync_enqueue_failure( + executor: ThreadPoolExecutor, +) -> None: + """If the sync enqueue fn raises, the framework discards the + just-allocated task_id from the registry. The buyer never sees + a Submitted envelope referencing an orphan id.""" + registry = InMemoryTaskRegistry() + ctx = _build_request_context(ToolContext(), Account(id="acct_a"), None) + + captured_task_id: list[str] = [] + + def _failing_enqueue(task_ctx): + captured_task_id.append(task_ctx.id) + raise RuntimeError("trafficker queue down") + + with pytest.raises(RuntimeError, match="trafficker queue down"): + await _project_workflow_handoff( + WorkflowHandoff(_failing_enqueue), + ctx, + method_name="create_media_buy", + registry=registry, + executor=executor, + ) + + # The framework allocated a task_id, called enqueue (which + # raised), and discarded the id. Registry has nothing. + assert len(captured_task_id) == 1 + rec = await registry.get(captured_task_id[0]) + assert rec is None + + +@pytest.mark.asyncio +async def test_workflow_handoff_rolls_back_registry_on_async_enqueue_failure( + executor: ThreadPoolExecutor, +) -> None: + """Same rollback semantics for async enqueue.""" + registry = InMemoryTaskRegistry() + ctx = _build_request_context(ToolContext(), Account(id="acct_a"), None) + + captured_task_id: list[str] = [] + + async def _failing_enqueue(task_ctx): + captured_task_id.append(task_ctx.id) + await asyncio.sleep(0) + raise RuntimeError("airflow trigger failed") + + with pytest.raises(RuntimeError, match="airflow trigger failed"): + await _project_workflow_handoff( + WorkflowHandoff(_failing_enqueue), + ctx, + method_name="create_media_buy", + registry=registry, + executor=executor, + ) + rec = await registry.get(captured_task_id[0]) + assert rec is None + + +# ---- External completion via registry ---- + + +@pytest.mark.asyncio +async def test_external_workflow_completion_transitions_state( + executor: ThreadPoolExecutor, +) -> None: + """The adopter's external workflow calls + ``registry.complete(task_id, result)`` — buyer polling + ``tasks/get`` then sees the terminal artifact. End-to-end + integration of the workflow handoff lifecycle.""" + registry = InMemoryTaskRegistry() + ctx = _build_request_context(ToolContext(), Account(id="acct_a"), None) + + enqueued_task_ids: list[str] = [] + + envelope = await _project_workflow_handoff( + WorkflowHandoff(lambda task_ctx: enqueued_task_ids.append(task_ctx.id)), + ctx, + method_name="create_media_buy", + registry=registry, + executor=executor, + ) + task_id = envelope["task_id"] + + # Adopter's external workflow does work, then completes the task. + await registry.complete( + task_id, + {"media_buy_id": "mb_after_human_review", "status": "active"}, + ) + + rec = await registry.get(task_id, expected_account_id="acct_a") + assert rec is not None + assert rec["state"] == "completed" + assert rec["result"] == { + "media_buy_id": "mb_after_human_review", + "status": "active", + } + + +@pytest.mark.asyncio +async def test_external_workflow_failure_via_registry_fail( + executor: ThreadPoolExecutor, +) -> None: + """Adopter's external workflow can fail the task via + ``registry.fail(task_id, error)`` — same path the + TaskHandoff projector uses for adopter-raised errors.""" + registry = InMemoryTaskRegistry() + ctx = _build_request_context(ToolContext(), Account(id="acct_a"), None) + + envelope = await _project_workflow_handoff( + WorkflowHandoff(lambda task_ctx: None), + ctx, + method_name="create_media_buy", + registry=registry, + executor=executor, + ) + task_id = envelope["task_id"] + + error_payload = AdcpError( + "POLICY_VIOLATION", + message="Trafficker rejected: brand mismatch", + recovery="correctable", + ).to_wire() + await registry.fail(task_id, error_payload) + + rec = await registry.get(task_id, expected_account_id="acct_a") + assert rec is not None + assert rec["state"] == "failed" + assert rec["error"]["code"] == "POLICY_VIOLATION" + + +# ---- Integration via _invoke_platform_method ---- + + +@pytest.mark.asyncio +async def test_invoke_platform_method_routes_workflow_handoff( + executor: ThreadPoolExecutor, +) -> None: + """End-to-end: a platform method returning ``ctx.handoff_to_workflow(fn)`` + flows through ``_invoke_platform_method`` and produces the + Submitted envelope without the caller knowing it was a workflow + handoff. Same dispatch surface as TaskHandoff.""" + registry = InMemoryTaskRegistry() + ctx = _build_request_context(ToolContext(), Account(id="acct_a"), None) + + enqueued: list[str] = [] + + def _enqueue(task_ctx): + enqueued.append(task_ctx.id) + + class _WorkflowPlatform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="x") + + def create_media_buy(self, req, ctx): + return ctx.handoff_to_workflow(_enqueue) + + result = await _invoke_platform_method( + _WorkflowPlatform(), + "create_media_buy", + _ProductsRequest(), + ctx, + executor=executor, + registry=registry, + ) + assert isinstance(result, dict) + assert result["status"] == "submitted" + assert "task_type" not in result + assert len(enqueued) == 1 + + +@pytest.mark.asyncio +async def test_workflow_handoff_does_not_run_background_task( + executor: ThreadPoolExecutor, +) -> None: + """Critical distinction from TaskHandoff: NO background coroutine + runs. After projection, the registry stays in ``submitted`` state + — there's no work to do until the adopter's external workflow + calls registry.complete(). + + Sleep briefly to give any (incorrectly-scheduled) background work + a chance to run, then assert the state is still submitted.""" + from adcp.decisioning.dispatch import _BACKGROUND_HANDOFF_TASKS + + registry = InMemoryTaskRegistry() + ctx = _build_request_context(ToolContext(), Account(id="acct_a"), None) + + initial_bg_tasks = len(_BACKGROUND_HANDOFF_TASKS) + + envelope = await _project_workflow_handoff( + WorkflowHandoff(lambda task_ctx: None), + ctx, + method_name="create_media_buy", + registry=registry, + executor=executor, + ) + # Yield to give any (incorrectly-scheduled) background work a + # chance to run. + await asyncio.sleep(0.05) + + # No background handoff tasks were spawned for this workflow path. + assert len(_BACKGROUND_HANDOFF_TASKS) == initial_bg_tasks + # State stays submitted — no fn ran to completion in the framework. + rec = await registry.get(envelope["task_id"], expected_account_id="acct_a") + assert rec is not None + assert rec["state"] == "submitted" + + +# ---- Public exports ---- + + +def test_workflow_handoff_publicly_exported() -> None: + """``WorkflowHandoff`` is on ``adcp.decisioning.__all__`` so + adopters import from the canonical public surface.""" + import adcp.decisioning as dx + + assert "WorkflowHandoff" in dx.__all__ + assert dx.WorkflowHandoff is WorkflowHandoff + + +def test_request_context_exposes_handoff_to_workflow() -> None: + """``RequestContext.handoff_to_workflow`` is the adopter-facing + seam. Pinned alongside the existing ``handoff_to_task`` so a + future refactor doesn't drop the workflow primitive.""" + from adcp.decisioning import RequestContext + + assert hasattr(RequestContext, "handoff_to_task") + assert hasattr(RequestContext, "handoff_to_workflow")