diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5e8a8d1 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2026 Dottxt Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index d42285f..8a5aebc 100644 --- a/README.md +++ b/README.md @@ -162,6 +162,56 @@ For direct `chat.completions.create(...)`, pass the wrapped OpenAI-style Use `DotTxt.models.list()` and `AsyncDotTxt.models.list()` for model listing. +## Streaming Fields + +`AsyncDotTxt.stream_fields(...)` yields `(field, value)` tuples as the model +fills in a schema-constrained response, one leaf at a time. The wire format is +the gateway's `stream: "patch"` mode (RFC 6902 JSON Patch over NDJSON). + +Field names are JSON Pointers with the leading `/` stripped — top-level keys +read as `"intent"`, array items as `"steps/0"`, nested object fields as +`"address/city"`. Empty container ops are skipped; consumers only see leaves. + +```python +import asyncio +from typing import Literal + +from pydantic import BaseModel + +from dottxt import AsyncDotTxt + + +class SupportTicket(BaseModel): + # Field order = arrival order. Put what unblocks downstream work first. + intent: Literal["billing", "technical", "account"] + urgency: Literal["low", "medium", "high", "critical"] + reply: str + + +async def main() -> None: + client = AsyncDotTxt() + async for field, value in client.stream_fields( + model="openai/gpt-oss-20b", + response_format=SupportTicket, + input="I was charged twice this month — please refund the duplicate.", + ): + match field: + case "intent": + print(f"dispatching to {value} queue") + case "urgency" if value == "critical": + print("paging oncall") + case "reply": + print(f"reply: {value}") + + +asyncio.run(main()) +``` + +The routing decision can fire tens of milliseconds into generation while +`reply` continues to stream. See +[docs/client.md](docs/client.md#streaming-fields-patch-stream) for the full +reference. + ## OpenAI-Compatible Usage Use `DotTxt` when you want an OpenAI-style client surface with @@ -223,3 +273,7 @@ The compatibility surface expects the wrapped OpenAI-style - [Use a Genson schema builder to generate](examples/generate_genson.py) - [List available models](examples/list_models.py) - [OpenAI-Compatible chat completions](examples/openai_chat_completions.py) +- [Stream fields as they arrive](examples/stream_field_printer.py) +- [Route on /intent before /reply finishes](examples/stream_early_routing.py) +- [Mid-stream human approval](examples/stream_hitl_approval.py) +- [Fan out research on each /steps/N](examples/stream_fanout.py) diff --git a/docs/client.md b/docs/client.md index d5e2438..f2cfada 100644 --- a/docs/client.md +++ b/docs/client.md @@ -209,6 +209,66 @@ result = client.generate( print(result) # {'severity': 'high', 'team': 'checkout'} ``` +## Streaming Fields (Patch Stream) + +`AsyncDotTxt.stream_fields(...)` yields `(field, value)` tuples as the model +fills in a schema-constrained response, one leaf at a time. It is built on the +gateway's `stream: "patch"` mode, which emits RFC 6902 JSON Patch operations +in schema order — so downstream work can start the moment a field arrives, +without waiting for the closing brace. + +Parameters mirror `generate(...)`: + +- `model` (`str`) +- `input` (`str | list[dict]`) +- `response_format` (`Any`) — any schema input accepted by `generate(...)` +- `temperature`, `max_tokens`, `seed`, `timeout` — optional +- `extra` (`dict | None`) — extra chat-completions body fields + +Each yielded `field` is a JSON Pointer with the leading `/` stripped. Top-level +keys read naturally in `match` statements (`case "intent":`), array items keep +their index (`"steps/0"`), and nested object fields keep their slashes +(`"address/city"`). Empty container ops (object/array initialisers) are +skipped; consumers only see leaves. + +```python +import asyncio +from typing import Literal +from pydantic import BaseModel +from dottxt import AsyncDotTxt + +class SupportTicket(BaseModel): + # Field order = arrival order. Put what unblocks downstream work first. + intent: Literal["billing", "technical", "account"] + urgency: Literal["low", "medium", "high", "critical"] + reply: str + +async def main(): + client = AsyncDotTxt() + async for field, value in client.stream_fields( + model="openai/gpt-oss-20b", + response_format=SupportTicket, + input="I was charged twice this month, please refund the duplicate.", + ): + match field: + case "intent": + asyncio.create_task(dispatch_to_queue(value)) + case "urgency" if value == "critical": + asyncio.create_task(page_oncall()) + case "reply": + await send(value) + +asyncio.run(main()) +``` + +The routing decision fires the moment `intent` arrives — typically tens of +milliseconds in — while `reply` continues to stream. + +Errors: + +- `dottxt.PatchStreamError`: raised when the gateway returns a non-200 + status. Exposes `status_code` and `body`. + ## OpenAI-Compatible Text Generation If you prefer the standard OpenAI SDK surface, you can call @@ -268,3 +328,11 @@ Runnable examples live in the [`examples/`](../examples) directory: - [`list_models.py`](../examples/list_models.py): list available models - [`openai_chat_completions.py`](../examples/openai_chat_completions.py): use the OpenAI-compatible `chat.completions.create` surface +- [`stream_field_printer.py`](../examples/stream_field_printer.py): minimal + `stream_fields` demo — print each `(field, value)` as it lands +- [`stream_early_routing.py`](../examples/stream_early_routing.py): route on + `/intent` while `/reply` is still streaming +- [`stream_hitl_approval.py`](../examples/stream_hitl_approval.py): approve a + proposed action mid-stream and discard the reply if the operator declines +- [`stream_fanout.py`](../examples/stream_fanout.py): fan research tasks out + on each `/steps/N` as the planner emits them diff --git a/examples/stream_early_routing.py b/examples/stream_early_routing.py new file mode 100644 index 0000000..ee32ab1 --- /dev/null +++ b/examples/stream_early_routing.py @@ -0,0 +1,100 @@ +"""Route on /intent before /reply finishes. + +The schema is ordered ``intent`` → ``urgency`` → ``reply``. Because dottxt +streams fields in schema order, the routing decision fires the moment +``intent`` arrives — typically tens of milliseconds in — while the model +continues generating the (much longer) ``reply``. + +Run with VERBOSE=1 to see wall-clock timestamps for each field. + +Usage: + DOTTXT_API_KEY=sk-... python examples/stream_early_routing.py +""" + +import asyncio +import os +import time +from typing import Literal + +from pydantic import BaseModel, Field + +from dottxt import AsyncDotTxt + + +class SupportTicket(BaseModel): + """A triaged support reply. + + Field order is significant — earlier fields arrive first and unblock + downstream work that does not depend on later fields. + """ + + intent: Literal["billing", "technical", "account", "feedback"] + urgency: Literal["low", "medium", "high", "critical"] + reply: str = Field(max_length=400) + + +async def route_to_billing(ticket_id: str) -> None: + """Dispatch the ticket to the billing queue (stub).""" + print(f" -> dispatched {ticket_id} to billing queue") + + +async def route_to_technical(ticket_id: str) -> None: + """Dispatch the ticket to the technical queue (stub).""" + print(f" -> dispatched {ticket_id} to technical queue") + + +async def page_oncall(ticket_id: str) -> None: + """Page the on-call engineer (stub).""" + print(f" -> paged oncall for {ticket_id}") + + +HANDLERS = { + "billing": route_to_billing, + "technical": route_to_technical, +} + + +async def main() -> None: + """Run the example.""" + verbose = os.environ.get("VERBOSE") == "1" + ticket_id = "TKT-8821" + user_message = ( + "I was charged twice for my subscription this month and the second " + "charge doesn't appear in my invoice list. Please refund the duplicate." + ) + + client = AsyncDotTxt() + started = time.monotonic() + try: + async for field, value in client.stream_fields( + model="openai/gpt-oss-20b", + response_format=SupportTicket, + input=[ + { + "role": "system", + "content": "Triage support tickets and draft a reply.", + }, + {"role": "user", "content": user_message}, + ], + max_tokens=400, + ): + elapsed_ms = int((time.monotonic() - started) * 1000) + if verbose: + print(f"[t+{elapsed_ms:>5}ms] {field}: {value!r}") + match field: + case "intent": + handler = HANDLERS.get(value) + if handler is not None: + # Fire-and-forget: routing kicks off while /reply is + # still streaming. + asyncio.create_task(handler(ticket_id)) + case "urgency" if value == "critical": + asyncio.create_task(page_oncall(ticket_id)) + case "reply": + print(f"reply ({elapsed_ms}ms): {value}") + finally: + await client.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/stream_fanout.py b/examples/stream_fanout.py new file mode 100644 index 0000000..f5d8010 --- /dev/null +++ b/examples/stream_fanout.py @@ -0,0 +1,75 @@ +"""Fan out research tasks as a planner emits each step. + +The planner schema has a top-level ``steps`` array. Each item streams in as +a separate field (``steps/0``, ``steps/1``, ...). We launch a research +coroutine on each one — so step 0's research is already underway by the +time step 1 lands. + +Usage: + DOTTXT_API_KEY=sk-... python examples/stream_fanout.py +""" + +import asyncio +import time +from typing import Any + +from pydantic import BaseModel, Field + +from dottxt import AsyncDotTxt + + +class Plan(BaseModel): + """An ordered research plan.""" + + topic: str = Field(max_length=80) + steps: list[str] = Field(min_length=3, max_length=5) + + +async def research(step_index: int, step: str) -> dict[str, Any]: + """Pretend to research a single step (sleep + return).""" + started = time.monotonic() + print(f" [step {step_index}] started: {step!r}") + # Simulate a real lookup. Different durations make the overlap visible. + await asyncio.sleep(1.0 + 0.2 * step_index) + elapsed_ms = int((time.monotonic() - started) * 1000) + print(f" [step {step_index}] done in {elapsed_ms}ms") + return {"step": step, "elapsed_ms": elapsed_ms} + + +async def main() -> None: + """Run the example.""" + client = AsyncDotTxt() + tasks: list[asyncio.Task[dict[str, Any]]] = [] + started = time.monotonic() + + try: + async for field, value in client.stream_fields( + model="openai/gpt-oss-20b", + response_format=Plan, + input=( + "Plan three to five research steps to answer the question: " + "'What are the trade-offs between RAG and fine-tuning for " + "domain-specific assistants?' Each step should be a short " + "imperative sentence." + ), + max_tokens=400, + ): + elapsed_ms = int((time.monotonic() - started) * 1000) + if field.startswith("steps/"): + index = int(field.split("/", 1)[1]) + print(f"[t+{elapsed_ms:>5}ms] received step {index}") + tasks.append(asyncio.create_task(research(index, value))) + elif field == "topic": + print(f"[t+{elapsed_ms:>5}ms] topic: {value}") + + results = await asyncio.gather(*tasks) + finally: + await client.close() + + total_ms = int((time.monotonic() - started) * 1000) + print() + print(f"all {len(results)} steps researched in {total_ms}ms total") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/stream_field_printer.py b/examples/stream_field_printer.py new file mode 100644 index 0000000..32fd306 --- /dev/null +++ b/examples/stream_field_printer.py @@ -0,0 +1,42 @@ +"""Print each ``(field, value)`` as it arrives from the model. + +Smallest possible demo of the patch-stream interface: define a schema, +iterate, print. No buffering, no closing brace. + +Usage: + DOTTXT_API_KEY=sk-... python examples/stream_field_printer.py +""" + +import asyncio +from typing import Literal + +from pydantic import BaseModel, Field + +from dottxt import AsyncDotTxt + + +class Engineer(BaseModel): + """A software engineer profile.""" + + name: str = Field(max_length=32) + role: Literal["backend", "frontend", "ml", "infra"] + years_experience: int = Field(ge=0, le=50) + favorite_languages: list[str] = Field(min_length=1, max_length=4) + + +async def main() -> None: + """Run the example.""" + client = AsyncDotTxt() + try: + async for field, value in client.stream_fields( + model="openai/gpt-oss-20b", + response_format=Engineer, + input="Generate a profile for a senior backend engineer.", + ): + print(f"{field:>24} = {value!r}") + finally: + await client.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/stream_hitl_approval.py b/examples/stream_hitl_approval.py new file mode 100644 index 0000000..74db32e --- /dev/null +++ b/examples/stream_hitl_approval.py @@ -0,0 +1,93 @@ +"""Mid-stream human approval — no checkpointer, no resume. + +The proposed action arrives as a fact (``action``) before the reply is +generated. We prompt the operator between receiving the fact and acting on +it. If the operator declines, the rest of the stream is consumed but the +``reply`` is never sent. + +Usage: + DOTTXT_API_KEY=sk-... python examples/stream_hitl_approval.py +""" + +import asyncio +from typing import Literal + +from pydantic import BaseModel, Field + +from dottxt import AsyncDotTxt + + +class AgentDecision(BaseModel): + """An agent's proposed action and customer-facing reply. + + ``action`` precedes ``reply`` so the operator can approve or reject + while the reply text is still streaming. + """ + + action: Literal[ + "answer_only", + "open_ticket", + "issue_refund", + "delete_account", + ] + reply: str = Field(max_length=300) + + +HIGH_RISK_ACTIONS = {"issue_refund", "delete_account"} + + +async def ask_human(question: str) -> bool: + """Prompt the operator on stdin; return True if they approve.""" + # input() is blocking; run it off the event loop so other tasks + # (e.g. background dispatching) keep running while we wait. + answer = await asyncio.to_thread(input, f"{question} [y/N]: ") + return answer.strip().lower() in {"y", "yes"} + + +async def send_reply(reply: str) -> None: + """Send the customer-facing reply (stub).""" + print(f"sent reply: {reply}") + + +async def main() -> None: + """Run the example.""" + client = AsyncDotTxt() + user_message = "Please close my account permanently. I am leaving." + + approved = True + proposed_action: str | None = None + + try: + async for field, value in client.stream_fields( + model="openai/gpt-oss-20b", + response_format=AgentDecision, + input=[ + { + "role": "system", + "content": ( + "You are a customer support agent. Decide on an " + "action and draft a reply." + ), + }, + {"role": "user", "content": user_message}, + ], + max_tokens=300, + ): + match field: + case "action": + proposed_action = value + print(f"proposed action: {value}") + if value in HIGH_RISK_ACTIONS: + approved = await ask_human(f"Approve action '{value}'?") + if not approved: + print("operator declined — reply will not be sent") + case "reply" if approved: + await send_reply(value) + case "reply": + print(f"discarded reply (action '{proposed_action}' declined)") + finally: + await client.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index e98ce62..464c005 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,13 +1,15 @@ [build-system] build-backend = "setuptools.build_meta" -requires = [ "setuptools>=64", "setuptools-scm>=8", "wheel" ] +requires = [ "setuptools>=77", "setuptools-scm>=8", "wheel" ] [project] name = "dottxt" description = "The official library for the .txt platform" readme.content-type = "text/markdown" readme.file = "README.md" -authors = [ { name = ".txt ", email = "support@dottxt.ai" } ] +license = "Apache-2.0" +license-files = [ "LICENSE" ] +authors = [ { name = "dottxt", email = "contact@dottxt.ai" } ] requires-python = ">=3.10" classifiers = [ "Programming Language :: Python :: 3 :: Only", diff --git a/src/dottxt/__init__.py b/src/dottxt/__init__.py index 6ff6981..081d540 100644 --- a/src/dottxt/__init__.py +++ b/src/dottxt/__init__.py @@ -3,10 +3,11 @@ from importlib.metadata import PackageNotFoundError, version from dottxt.client import AsyncDotTxt, DotTxt, InvalidOutputError +from dottxt.streaming import PatchStreamError try: # pragma: no cover __version__ = version("dottxt") except PackageNotFoundError: # pragma: no cover __version__ = "0.0.0" -__all__ = ["DotTxt", "AsyncDotTxt", "InvalidOutputError"] +__all__ = ["DotTxt", "AsyncDotTxt", "InvalidOutputError", "PatchStreamError"] diff --git a/src/dottxt/client.py b/src/dottxt/client.py index c4759cc..caaa86b 100644 --- a/src/dottxt/client.py +++ b/src/dottxt/client.py @@ -9,6 +9,7 @@ from pydantic import BaseModel, ValidationError from dottxt.schemas import SchemaInput, build_response_format +from dottxt.streaming import stream_fields as _stream_fields DEFAULT_BASE_URL = "https://api.dottxt.ai/v1" @@ -239,6 +240,61 @@ async def generate( original_error=exc, ) from exc + async def stream_fields( + self, + *, + model: str, + response_format: SchemaInput, + input: str | list[dict[str, Any]], + temperature: float | None = None, + max_tokens: int | None = None, + seed: int | None = None, + timeout: float = 60.0, + extra: dict[str, Any] | None = None, + ) -> Any: + """Stream ``(field, value)`` tuples as the model fills in the response. + + The dottxt gateway emits one JSON Patch ``add`` op per leaf as a + schema-constrained response is generated. Each op is converted to a + ``(field, value)`` tuple, where ``field`` is the JSON Pointer with + the leading ``"/"`` stripped — top-level keys read as ``"intent"``, + array items as ``"steps/0"``, nested objects as ``"address/city"``. + + Empty container ops are skipped; consumers only see leaves. + + Args: + model: Model identifier. + response_format: Schema input accepted by ``build_response_format``. + input: A prompt string or a list of chat messages. + temperature: Optional temperature value. + max_tokens: Optional max output tokens. + seed: Optional deterministic seed. + timeout: HTTP timeout in seconds. + extra: Additional chat-completions parameters merged into the body. + + Yields: + ``(field, value)`` tuples, one per leaf, in the order the model + produced them. + + Raises: + PatchStreamError: If the upstream returns a non-200 status. + """ + base_url = str(self._client.base_url) + api_key = self._client.api_key + async for field, value in _stream_fields( + base_url=base_url, + api_key=api_key, + model=model, + response_format=response_format, + input=input, + temperature=temperature, + max_tokens=max_tokens, + seed=seed, + timeout=timeout, + extra=extra, + ): + yield field, value + async def close(self) -> None: """Close the underlying SDK client.""" await self._client.close() diff --git a/src/dottxt/streaming.py b/src/dottxt/streaming.py new file mode 100644 index 0000000..183ce78 --- /dev/null +++ b/src/dottxt/streaming.py @@ -0,0 +1,113 @@ +"""Patch-stream consumer for the dottxt ``stream: "patch"`` endpoint. + +The gateway emits one RFC 6902 ``add`` op per JSON token as the model +generates a structured response. This module turns that wire stream into +``(field, value)`` tuples for application code. + +See ``the-closing-brace`` essay for the motivation. +""" + +from __future__ import annotations + +import json +from collections.abc import AsyncIterator +from typing import Any + +import httpx + +from dottxt.schemas import SchemaInput, build_response_format + + +class PatchStreamError(RuntimeError): + """Raised when the upstream patch stream returns a non-200 status.""" + + def __init__(self, *, status_code: int, body: str) -> None: + super().__init__(f"patch stream failed: {status_code} {body[:500]}") + self.status_code = status_code + self.body = body + + +def _field_from_op(op: dict[str, Any]) -> tuple[str, Any] | None: + """Convert a JSON Patch op into a ``(field, value)`` tuple, or ``None`` to skip. + + Skipped: + - The root seed op (``path: ""``) — it carries the empty document, not a fact. + - Empty-container ops (``value`` is ``{}`` or ``[]``) — leaves fill in next. + + The returned ``field`` is a JSON Pointer with the leading ``"/"`` stripped, + so top-level keys read naturally in ``match`` statements (e.g. ``case "intent"``). + Nested paths keep their segments joined by ``"/"`` (e.g. ``"steps/0"``). + """ + if op.get("op") != "add": + return None + path = op.get("path", "") + value = op.get("value") + if value == {} or value == []: + return None + if path == "": + return None + field = path[1:] if path.startswith("/") else path + return field, value + + +async def stream_fields( + *, + base_url: str, + api_key: str, + model: str, + response_format: SchemaInput, + input: str | list[dict[str, Any]], + temperature: float | None = None, + max_tokens: int | None = None, + seed: int | None = None, + timeout: float = 60.0, + extra: dict[str, Any] | None = None, +) -> AsyncIterator[tuple[str, Any]]: + """Yield ``(field, value)`` tuples from a patch-streamed chat completion. + + Sends ``stream: "patch"`` to ``{base_url}/chat/completions`` and reads + the NDJSON response, yielding one tuple per leaf value as it arrives. + """ + if isinstance(input, str): + input = [{"role": "user", "content": input}] + body: dict[str, Any] = { + **(extra or {}), + "model": model, + "messages": input, + "response_format": build_response_format(response_format), + "stream": "patch", + } + if temperature is not None: + body["temperature"] = temperature + if max_tokens is not None: + body["max_tokens"] = max_tokens + if seed is not None: + body["seed"] = seed + + url = f"{base_url.rstrip('/')}/chat/completions" + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + } + async with httpx.AsyncClient(timeout=timeout) as client: + async with client.stream("POST", url, json=body, headers=headers) as resp: + if resp.status_code != 200: + detail = (await resp.aread()).decode("utf-8", errors="replace") + raise PatchStreamError(status_code=resp.status_code, body=detail) + buf = "" + async for chunk in resp.aiter_text(): + buf += chunk + while "\n" in buf: + line, buf = buf.split("\n", 1) + line = line.strip() + if not line: + continue + op = json.loads(line) + pair = _field_from_op(op) + if pair is not None: + yield pair + if buf.strip(): + op = json.loads(buf) + pair = _field_from_op(op) + if pair is not None: + yield pair diff --git a/tests/test_streaming.py b/tests/test_streaming.py new file mode 100644 index 0000000..2261033 --- /dev/null +++ b/tests/test_streaming.py @@ -0,0 +1,369 @@ +"""Tests for the patch-stream consumer.""" + +from __future__ import annotations + +import json +from typing import Any + +import httpx +import pytest + +from dottxt.streaming import PatchStreamError, _field_from_op, stream_fields + + +def test_field_from_op_skips_root_seed() -> None: + """The empty-path root op carries no fact.""" + assert _field_from_op({"op": "add", "path": "", "value": {}}) is None + + +def test_field_from_op_skips_empty_object_container() -> None: + """Empty object containers are structural; leaves fill in next.""" + assert _field_from_op({"op": "add", "path": "/address", "value": {}}) is None + + +def test_field_from_op_skips_empty_array_container() -> None: + """Empty array containers are structural; items fill in next.""" + assert _field_from_op({"op": "add", "path": "/skills", "value": []}) is None + + +def test_field_from_op_yields_top_level_leaf() -> None: + """Top-level field is the key without a leading slash.""" + assert _field_from_op({"op": "add", "path": "/intent", "value": "billing"}) == ( + "intent", + "billing", + ) + + +def test_field_from_op_yields_array_index() -> None: + """Array items keep their index in the pointer path.""" + assert _field_from_op({"op": "add", "path": "/steps/0", "value": "first"}) == ( + "steps/0", + "first", + ) + + +def test_field_from_op_yields_nested_object_field() -> None: + """Nested object keys keep their segments joined by `/`.""" + assert _field_from_op({"op": "add", "path": "/address/city", "value": "Paris"}) == ( + "address/city", + "Paris", + ) + + +def test_field_from_op_passes_numeric_value() -> None: + """Non-string leaves pass through unchanged.""" + assert _field_from_op({"op": "add", "path": "/age", "value": 30}) == ("age", 30) + + +def test_field_from_op_ignores_non_add_ops() -> None: + """Replace/remove ops aren't part of the current wire protocol.""" + assert _field_from_op({"op": "replace", "path": "/x", "value": 1}) is None + assert _field_from_op({"op": "remove", "path": "/x"}) is None + + +def test_field_from_op_skips_root_with_non_empty_value() -> None: + """A root op with a non-empty value is still structural — skip it.""" + assert _field_from_op({"op": "add", "path": "", "value": {"x": 1}}) is None + + +def _ndjson_response(ops: list[dict[str, Any]]) -> httpx.Response: + """Build a fake NDJSON streaming response.""" + body = "".join(json.dumps(op) + "\n" for op in ops).encode() + return httpx.Response( + 200, + content=body, + headers={"content-type": "application/x-ndjson"}, + ) + + +@pytest.mark.asyncio +async def test_stream_fields_yields_expected_sequence(monkeypatch: Any) -> None: + """End-to-end: a canned NDJSON stream produces the expected tuples.""" + captured: dict[str, Any] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + captured["body"] = json.loads(request.content) + captured["auth"] = request.headers.get("authorization") + return _ndjson_response( + [ + {"op": "add", "path": "", "value": {}}, + {"op": "add", "path": "/intent", "value": "billing"}, + {"op": "add", "path": "/urgency", "value": "high"}, + {"op": "add", "path": "/steps", "value": []}, + {"op": "add", "path": "/steps/0", "value": "verify"}, + {"op": "add", "path": "/steps/1", "value": "refund"}, + {"op": "add", "path": "/reply", "value": "Done."}, + ] + ) + + # Patch httpx.AsyncClient to use a MockTransport so the real network is + # never touched. + real_async_client = httpx.AsyncClient + + def fake_async_client(*args: Any, **kwargs: Any) -> httpx.AsyncClient: + kwargs["transport"] = httpx.MockTransport(handler) + return real_async_client(*args, **kwargs) + + monkeypatch.setattr("dottxt.streaming.httpx.AsyncClient", fake_async_client) + + schema = { + "type": "object", + "properties": { + "intent": {"type": "string"}, + "urgency": {"type": "string"}, + "steps": {"type": "array", "items": {"type": "string"}}, + "reply": {"type": "string"}, + }, + } + collected = [ + pair + async for pair in stream_fields( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="openai/gpt-oss-20b", + response_format=schema, + input="go", + ) + ] + + assert collected == [ + ("intent", "billing"), + ("urgency", "high"), + ("steps/0", "verify"), + ("steps/1", "refund"), + ("reply", "Done."), + ] + assert captured["url"] == "https://api.example.com/v1/chat/completions" + assert captured["body"]["stream"] == "patch" + assert captured["body"]["model"] == "openai/gpt-oss-20b" + assert captured["auth"] == "Bearer sk-test" + + +@pytest.mark.asyncio +async def test_stream_fields_raises_on_non_200(monkeypatch: Any) -> None: + """A non-200 response surfaces as PatchStreamError with the body.""" + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 403, + content=b'{"error":"Forbidden","message":"no access"}', + headers={"content-type": "application/json"}, + ) + + real_async_client = httpx.AsyncClient + + def fake_async_client(*args: Any, **kwargs: Any) -> httpx.AsyncClient: + kwargs["transport"] = httpx.MockTransport(handler) + return real_async_client(*args, **kwargs) + + monkeypatch.setattr("dottxt.streaming.httpx.AsyncClient", fake_async_client) + + with pytest.raises(PatchStreamError) as info: + async for _ in stream_fields( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="m", + response_format={"type": "object"}, + input="go", + ): + pass + assert info.value.status_code == 403 + assert "Forbidden" in info.value.body + + +@pytest.mark.asyncio +async def test_stream_fields_passes_list_input_unchanged(monkeypatch: Any) -> None: + """When `input` is already a list of messages, it is forwarded as-is.""" + captured: dict[str, Any] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["body"] = json.loads(request.content) + return _ndjson_response([{"op": "add", "path": "/x", "value": 1}]) + + real_async_client = httpx.AsyncClient + + def fake_async_client(*args: Any, **kwargs: Any) -> httpx.AsyncClient: + kwargs["transport"] = httpx.MockTransport(handler) + return real_async_client(*args, **kwargs) + + monkeypatch.setattr("dottxt.streaming.httpx.AsyncClient", fake_async_client) + + messages = [ + {"role": "system", "content": "be brief"}, + {"role": "user", "content": "go"}, + ] + pairs = [ + p + async for p in stream_fields( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="m", + response_format={"type": "object"}, + input=messages, + ) + ] + assert pairs == [("x", 1)] + assert captured["body"]["messages"] == messages + + +@pytest.mark.asyncio +async def test_stream_fields_passes_generation_params(monkeypatch: Any) -> None: + """temperature / max_tokens / seed land in the request body.""" + captured: dict[str, Any] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["body"] = json.loads(request.content) + return _ndjson_response( + [{"op": "add", "path": "/x", "value": 1}], + ) + + real_async_client = httpx.AsyncClient + + def fake_async_client(*args: Any, **kwargs: Any) -> httpx.AsyncClient: + kwargs["transport"] = httpx.MockTransport(handler) + return real_async_client(*args, **kwargs) + + monkeypatch.setattr("dottxt.streaming.httpx.AsyncClient", fake_async_client) + + pairs = [ + p + async for p in stream_fields( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="m", + response_format={"type": "object"}, + input="go", + temperature=0.2, + max_tokens=128, + seed=7, + extra={"top_p": 0.9}, + ) + ] + assert pairs == [("x", 1)] + body = captured["body"] + assert body["temperature"] == 0.2 + assert body["max_tokens"] == 128 + assert body["seed"] == 7 + assert body["top_p"] == 0.9 + + +@pytest.mark.asyncio +async def test_stream_fields_tolerates_blank_lines_and_trailing_op( + monkeypatch: Any, +) -> None: + """Blank lines are skipped; a trailing op without a newline is flushed.""" + + def handler(request: httpx.Request) -> httpx.Response: + # Two blank lines between ops, plus a final op without trailing newline. + body = ( + b'{"op":"add","path":"/a","value":1}\n' + b"\n" + b"\n" + b'{"op":"add","path":"/b","value":2}\n' + b'{"op":"add","path":"/c","value":3}' + ) + return httpx.Response( + 200, + content=body, + headers={"content-type": "application/x-ndjson"}, + ) + + real_async_client = httpx.AsyncClient + + def fake_async_client(*args: Any, **kwargs: Any) -> httpx.AsyncClient: + kwargs["transport"] = httpx.MockTransport(handler) + return real_async_client(*args, **kwargs) + + monkeypatch.setattr("dottxt.streaming.httpx.AsyncClient", fake_async_client) + + pairs = [ + p + async for p in stream_fields( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="m", + response_format={"type": "object"}, + input="go", + ) + ] + assert pairs == [("a", 1), ("b", 2), ("c", 3)] + + +@pytest.mark.asyncio +async def test_stream_fields_trailing_op_is_skippable(monkeypatch: Any) -> None: + """A trailing structural op (no newline) still parses without yielding.""" + + def handler(request: httpx.Request) -> httpx.Response: + body = ( + b'{"op":"add","path":"/a","value":1}\n' + b'{"op":"add","path":"/skills","value":[]}' + ) + return httpx.Response( + 200, + content=body, + headers={"content-type": "application/x-ndjson"}, + ) + + real_async_client = httpx.AsyncClient + + def fake_async_client(*args: Any, **kwargs: Any) -> httpx.AsyncClient: + kwargs["transport"] = httpx.MockTransport(handler) + return real_async_client(*args, **kwargs) + + monkeypatch.setattr("dottxt.streaming.httpx.AsyncClient", fake_async_client) + + pairs = [ + p + async for p in stream_fields( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="m", + response_format={"type": "object"}, + input="go", + ) + ] + assert pairs == [("a", 1)] + + +@pytest.mark.asyncio +async def test_async_dottxt_stream_fields_uses_client_credentials( + monkeypatch: Any, +) -> None: + """AsyncDotTxt.stream_fields forwards base_url + api_key from the SDK client.""" + from dottxt import AsyncDotTxt + + captured: dict[str, Any] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + captured["auth"] = request.headers.get("authorization") + return _ndjson_response([{"op": "add", "path": "/intent", "value": "billing"}]) + + real_async_client = httpx.AsyncClient + + def fake_async_client(*args: Any, **kwargs: Any) -> httpx.AsyncClient: + kwargs["transport"] = httpx.MockTransport(handler) + return real_async_client(*args, **kwargs) + + monkeypatch.setattr("dottxt.streaming.httpx.AsyncClient", fake_async_client) + + client = AsyncDotTxt( + api_key="sk-async-test", + base_url="https://api.example.com/v1", + ) + try: + pairs = [ + p + async for p in client.stream_fields( + model="m", + response_format={"type": "object"}, + input="go", + ) + ] + finally: + await client.close() + + assert pairs == [("intent", "billing")] + assert captured["url"] == "https://api.example.com/v1/chat/completions" + assert captured["auth"] == "Bearer sk-async-test"