Add JSON Patch to SDK#3
Conversation
9c8d18e to
602c697
Compare
602c697 to
ff64ec5
Compare
| seed: int | None = None, | ||
| timeout: float = 60.0, | ||
| extra: dict[str, Any] | None = None, | ||
| ) -> Any: |
There was a problem hiding this comment.
could type this as AsyncIterator[tuple[str, Any]]
| return None | ||
| if path == "": | ||
| return None | ||
| field = path[1:] if path.startswith("/") else path |
There was a problem hiding this comment.
Should we JSON Pointer-unescape the rest of the path after stripping the leading /? (RFC 6901: ~1 -> /, ~0 -> ~)?
Or is that handled server side?
|
|
||
| try: | ||
| async for field, value in client.stream_fields( | ||
| model="openai/gpt-oss-20b", |
There was a problem hiding this comment.
I got inconsistent behavior on this example which is likely model related
aschwa
left a comment
There was a problem hiding this comment.
Slick examples! A few clarifying nits
There was a problem hiding this comment.
There's already a LICENSE file on main, why is this showing up?
| return field, value | ||
|
|
||
|
|
||
| async def stream_fields( |
There was a problem hiding this comment.
I would call it stream, and the alternative stream_tokens instead.
| seed: int | None = None, | ||
| timeout: float = 60.0, | ||
| extra: dict[str, Any] | None = None, | ||
| ) -> AsyncIterator[tuple[str, Any]]: |
There was a problem hiding this comment.
Why not yield the patch (or probably better, the patch and reconstructed object so far)? JSON Patch is simple, very well documented and already has an ecosystem.
| async with client.stream("POST", url, json=body, headers=headers) as resp: | ||
| if resp.status_code != 200: | ||
| detail = (await resp.aread()).decode("utf-8", errors="replace") | ||
| raise PatchStreamError(status_code=resp.status_code, body=detail) | ||
| buf = "" | ||
| async for chunk in resp.aiter_text(): | ||
| buf += chunk | ||
| while "\n" in buf: | ||
| line, buf = buf.split("\n", 1) | ||
| line = line.strip() | ||
| if not line: | ||
| continue | ||
| op = json.loads(line) | ||
| pair = _field_from_op(op) | ||
| if pair is not None: | ||
| yield pair | ||
| if buf.strip(): | ||
| op = json.loads(buf) | ||
| pair = _field_from_op(op) | ||
| if pair is not None: | ||
| yield pair |
There was a problem hiding this comment.
Do we need to stream the response here? All we care about is the final JSON Patch object, the user does not see anything else.
| if isinstance(input, str): | ||
| input = [{"role": "user", "content": input}] | ||
| body: dict[str, Any] = { | ||
| **(extra or {}), | ||
| "model": model, | ||
| "messages": input, | ||
| "response_format": build_response_format(response_format), | ||
| "stream": "patch", | ||
| } | ||
| if temperature is not None: | ||
| body["temperature"] = temperature | ||
| if max_tokens is not None: | ||
| body["max_tokens"] = max_tokens | ||
| if seed is not None: | ||
| body["seed"] = seed |
There was a problem hiding this comment.
Can some of this code be extracted in a common utility with the non-streaming generation?
| async for field, value in client.stream_fields( | ||
| model="openai/gpt-oss-20b", | ||
| response_format=Plan, | ||
| input=( | ||
| "Plan three to five research steps to answer the question: " | ||
| "'What are the trade-offs between RAG and fine-tuning for " | ||
| "domain-specific assistants?' Each step should be a short " | ||
| "imperative sentence." | ||
| ), | ||
| max_tokens=400, | ||
| ): |
There was a problem hiding this comment.
Very much a nit, but I'd separate stream = client.stream(...) and async for ... in stream
| time step 1 lands. | ||
|
|
||
| Usage: | ||
| DOTTXT_API_KEY=sk-... python examples/stream_fanout.py |
There was a problem hiding this comment.
Explain what the user should pay attention to (total time < sum of individual times?) and why
| elapsed_ms = int((time.monotonic() - started) * 1000) | ||
| if verbose: | ||
| print(f"[t+{elapsed_ms:>5}ms] {field}: {value!r}") |
There was a problem hiding this comment.
I'd strip this off so there is less noise. My understanding is that we mostly want the user to see the succession of " -> did something" statements, instead of a monolithic one at the end?
| case "intent": | ||
| handler = HANDLERS.get(value) | ||
| if handler is not None: | ||
| # Fire-and-forget: routing kicks off while /reply is | ||
| # still streaming. | ||
| asyncio.create_task(handler(ticket_id)) |
There was a problem hiding this comment.
I'd use the explicit version without using a handler.
case "intent" if value == "billing":
case "intent" if value == "technical":| The routing decision fires the moment `intent` arrives — typically tens of | ||
| milliseconds in — while `reply` continues to stream. |
There was a problem hiding this comment.
| The routing decision fires the moment `intent` arrives — typically tens of | |
| milliseconds in — while `reply` continues to stream. | |
| The routing decision fires the moment `intent` arrives while `reply` continues to stream. |
| `AsyncDotTxt.stream_fields(...)` yields `(field, value)` tuples as the model | ||
| fills in a schema-constrained response, one leaf at a time. It is built on the | ||
| gateway's `stream: "patch"` mode, which emits RFC 6902 JSON Patch operations | ||
| in schema order — so downstream work can start the moment a field arrives, |
There was a problem hiding this comment.
| in schema order — so downstream work can start the moment a field arrives, | |
| in schema order so downstream work can start the moment a field arrives, |
| @@ -0,0 +1,93 @@ | |||
| """Mid-stream human approval — no checkpointer, no resume. | |||
No description provided.