Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions blockrun_llm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
ChatCompletionChunk,
ChatChunkChoice,
ChatChunkDelta,
ChatChunkToolCall,
ChatChunkFunctionCall,
Model,
APIError,
PaymentError,
Expand Down Expand Up @@ -203,6 +205,8 @@
"ChatCompletionChunk",
"ChatChunkChoice",
"ChatChunkDelta",
"ChatChunkToolCall",
"ChatChunkFunctionCall",
"Model",
"APIError",
"PaymentError",
Expand Down
54 changes: 34 additions & 20 deletions blockrun_llm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
SmartChatResponse,
RoutingProfile,
SearchResult,
stream_choice_content,
stream_choice_finish_reason,
chunk_meta,
chunk_usage_dict,
)
from .router import route as route_request
from .tx_log import TransactionLogger, decode_settlement_header, _resolve_log_dir
Expand Down Expand Up @@ -881,16 +885,21 @@ def _iter_and_archive(
for chunk in self._iter_sse_chunks(response):
if chunk.choices:
choice = chunk.choices[0]
if choice.delta.content:
content_parts.append(choice.delta.content)
if choice.finish_reason:
finish_reason = choice.finish_reason
if assembled_id is None and chunk.id:
assembled_id = chunk.id
assembled_model = chunk.model
assembled_created = chunk.created
if chunk.usage is not None:
usage_dict = chunk.usage.model_dump(exclude_none=True)
content = stream_choice_content(choice)
if content:
content_parts.append(content)
fr = stream_choice_finish_reason(choice)
if fr:
finish_reason = fr
if assembled_id is None:
_id, _model, _created = chunk_meta(chunk)
if _id:
assembled_id = _id
assembled_model = _model
assembled_created = _created
_usage = chunk_usage_dict(chunk)
if _usage is not None:
usage_dict = _usage
yield chunk

# Stream complete (saw [DONE]). Free models have cost_usd == 0; only
Expand Down Expand Up @@ -2544,16 +2553,21 @@ async def _aiter_and_archive(
async for chunk in self._aiter_sse_chunks(response):
if chunk.choices:
choice = chunk.choices[0]
if choice.delta.content:
content_parts.append(choice.delta.content)
if choice.finish_reason:
finish_reason = choice.finish_reason
if assembled_id is None and chunk.id:
assembled_id = chunk.id
assembled_model = chunk.model
assembled_created = chunk.created
if chunk.usage is not None:
usage_dict = chunk.usage.model_dump(exclude_none=True)
content = stream_choice_content(choice)
if content:
content_parts.append(content)
fr = stream_choice_finish_reason(choice)
if fr:
finish_reason = fr
if assembled_id is None:
_id, _model, _created = chunk_meta(chunk)
if _id:
assembled_id = _id
assembled_model = _model
assembled_created = _created
_usage = chunk_usage_dict(chunk)
if _usage is not None:
usage_dict = _usage
yield chunk

if cost_usd > 0:
Expand Down
54 changes: 34 additions & 20 deletions blockrun_llm/solana_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
APIError,
PaymentError,
SearchResult,
stream_choice_content,
stream_choice_finish_reason,
chunk_meta,
chunk_usage_dict,
)
from .solana_wallet import get_solana_public_key
from .tx_log import TransactionLogger, decode_settlement_header, _resolve_log_dir
Expand Down Expand Up @@ -813,16 +817,21 @@ def _iter_and_archive(
for chunk in self._iter_sse_chunks(response):
if chunk.choices:
choice = chunk.choices[0]
if choice.delta.content:
content_parts.append(choice.delta.content)
if choice.finish_reason:
finish_reason = choice.finish_reason
if assembled_id is None and chunk.id:
assembled_id = chunk.id
assembled_model = chunk.model
assembled_created = chunk.created
if chunk.usage is not None:
usage_dict = chunk.usage.model_dump(exclude_none=True)
content = stream_choice_content(choice)
if content:
content_parts.append(content)
fr = stream_choice_finish_reason(choice)
if fr:
finish_reason = fr
if assembled_id is None:
_id, _model, _created = chunk_meta(chunk)
if _id:
assembled_id = _id
assembled_model = _model
assembled_created = _created
_usage = chunk_usage_dict(chunk)
if _usage is not None:
usage_dict = _usage
yield chunk

if cost_usd > 0:
Expand Down Expand Up @@ -2255,16 +2264,21 @@ async def _aiter_and_archive(
async for chunk in self._aiter_sse_chunks(response):
if chunk.choices:
choice = chunk.choices[0]
if choice.delta.content:
content_parts.append(choice.delta.content)
if choice.finish_reason:
finish_reason = choice.finish_reason
if assembled_id is None and chunk.id:
assembled_id = chunk.id
assembled_model = chunk.model
assembled_created = chunk.created
if chunk.usage is not None:
usage_dict = chunk.usage.model_dump(exclude_none=True)
content = stream_choice_content(choice)
if content:
content_parts.append(content)
fr = stream_choice_finish_reason(choice)
if fr:
finish_reason = fr
if assembled_id is None:
_id, _model, _created = chunk_meta(chunk)
if _id:
assembled_id = _id
assembled_model = _model
assembled_created = _created
_usage = chunk_usage_dict(chunk)
if _usage is not None:
usage_dict = _usage
yield chunk

if cost_usd > 0:
Expand Down
91 changes: 90 additions & 1 deletion blockrun_llm/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,44 @@ class Config:
# ---------------------------------------------------------------------------


class ChatChunkFunctionCall(BaseModel):
"""Streaming function-call delta. The model sends ``name`` on the first
frame and ``arguments`` in fragments afterwards, so both are optional here —
unlike the non-stream :class:`FunctionCall` where both are required."""

name: Optional[str] = None
arguments: Optional[str] = None

class Config:
extra = "allow"


class ChatChunkToolCall(BaseModel):
"""One streaming tool-call delta.

OpenAI streams tool calls incrementally: the first frame carries
``index`` + ``id`` + ``function.name`` (+ empty args), later frames carry
only ``index`` + ``function.arguments`` fragments. Every field is therefore
optional. The strict non-stream :class:`ToolCall` (``id`` / ``function.name``
/ ``arguments`` all required) rejected the argument-fragment frames, which
made ``ChatCompletionChunk(**chunk)`` raise and fall back to
``model_construct`` — leaving ``choices`` as raw dicts and crashing the
archive loop with ``'dict' object has no attribute 'delta'``. Using this
lenient type keeps streamed tool calls parsing into real objects.
"""

index: Optional[int] = None
id: Optional[str] = None
# Kept as a free-form ``str`` (not ``Literal["function"]``) so an upstream
# that streams a non-"function" tool type can't fail validation and re-trigger
# the very ``model_construct`` fallback this lenient type exists to avoid.
type: Optional[str] = None
function: Optional[ChatChunkFunctionCall] = None

class Config:
extra = "allow"


class ChatChunkDelta(BaseModel):
"""Incremental ``message`` delta sent over SSE.

Expand All @@ -132,7 +170,7 @@ class ChatChunkDelta(BaseModel):

role: Optional[Literal["system", "user", "assistant", "tool"]] = None
content: Optional[str] = None
tool_calls: Optional[List[ToolCall]] = None
tool_calls: Optional[List[ChatChunkToolCall]] = None
reasoning_content: Optional[str] = None
thinking: Optional[str] = None

Expand Down Expand Up @@ -168,6 +206,57 @@ class Config:
extra = "allow"


def stream_choice_content(choice: Any) -> Optional[str]:
"""Text delta from a streaming choice, tolerant of a raw ``dict`` choice.

A chunk that fails strict validation falls back to ``model_construct``,
which leaves nested ``choices`` as plain dicts. Defensive accessors keep the
stream-archiving loop from crashing on those (``'dict' object has no
attribute 'delta'``); a tool-call frame simply has no content and yields
``None``.
"""
if isinstance(choice, dict):
delta = choice.get("delta")
return delta.get("content") if isinstance(delta, dict) else None
delta = getattr(choice, "delta", None)
return getattr(delta, "content", None) if delta is not None else None


def stream_choice_finish_reason(choice: Any) -> Optional[str]:
"""``finish_reason`` from a streaming choice, tolerant of a raw dict choice."""
if isinstance(choice, dict):
return choice.get("finish_reason")
return getattr(choice, "finish_reason", None)


def chunk_meta(chunk: Any) -> "tuple[Optional[str], Optional[str], Optional[int]]":
"""``(id, model, created)`` of a chunk, tolerant of a ``model_construct``'d
chunk that omits required fields.

``model_construct`` does not populate missing required fields, so a drifted
frame that lost its top-level ``id`` yields a chunk object with no ``id``
attribute. Reading ``chunk.id`` directly would then raise ``AttributeError``
and crash the stream-archiving loop — the same failure class the other
accessors here guard against. ``getattr`` keeps those reads safe.
"""
return (
getattr(chunk, "id", None),
getattr(chunk, "model", None),
getattr(chunk, "created", None),
)


def chunk_usage_dict(chunk: Any) -> Optional[Dict[str, Any]]:
"""``usage`` of a chunk as a dict, tolerant of a model_construct'd chunk
whose ``usage`` is a raw dict (no ``.model_dump``)."""
usage = getattr(chunk, "usage", None)
if usage is None:
return None
if isinstance(usage, dict):
return {k: v for k, v in usage.items() if v is not None}
return usage.model_dump(exclude_none=True)


class Model(BaseModel):
"""Available model information."""

Expand Down
5 changes: 4 additions & 1 deletion blockrun_llm/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
"""

import re
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, TYPE_CHECKING
from urllib.parse import urlparse

if TYPE_CHECKING:
from .types import PaymentError


# Localhost domains that are allowed to use HTTP
LOCALHOST_DOMAINS = {"localhost", "127.0.0.1"}
Expand Down
36 changes: 25 additions & 11 deletions examples/benchmark_claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ def _count_tokens(text: str, model_hint: str = "") -> int:
@dataclass
class ReqResult:
ok: bool
ttft: Optional[float] = None # seconds to first content token
latency: Optional[float] = None # seconds request → last token
ttft: Optional[float] = None # seconds to first content token
latency: Optional[float] = None # seconds request → last token
out_tokens: int = 0
error: str = ""

Expand Down Expand Up @@ -180,7 +180,9 @@ def cache_probe(self) -> float:
usage = getattr(resp, "usage", None)
if usage is None:
return 0.0
u: Dict[str, Any] = usage.model_dump(exclude_none=True) if hasattr(usage, "model_dump") else dict(usage)
u: Dict[str, Any] = (
usage.model_dump(exclude_none=True) if hasattr(usage, "model_dump") else dict(usage)
)
prompt_tokens = u.get("prompt_tokens") or 0
cache_read = u.get("cache_read_input_tokens") or 0
cache_creation = u.get("cache_creation_input_tokens") or 0
Expand Down Expand Up @@ -212,8 +214,10 @@ def fmt(x: float) -> str:
print("\n" + "=" * 56)
print(f" Claude E2E benchmark — {self.model} ({self.chain})")
print(f" {self.api_url}")
print(f" requests={self.requests} concurrency={self.concurrency} "
f"max_tokens={self.max_tokens}")
print(
f" requests={self.requests} concurrency={self.concurrency} "
f"max_tokens={self.max_tokens}"
)
print("=" * 56)
rows = [
("单个请求吞吐 (token/s)", fmt(statistics.mean(per_req_tps)) if per_req_tps else "nan"),
Expand All @@ -232,7 +236,9 @@ def fmt(x: float) -> str:
for name, val in rows:
print(f" {name:<34} {val}")
print("-" * 56)
print(f" 样本: 成功 {len(ok)}/{self.requests} 总输出≈{total_out} tokens wall={wall:.2f}s")
print(
f" 样本: 成功 {len(ok)}/{self.requests} 总输出≈{total_out} tokens wall={wall:.2f}s"
)
fails = [r for r in self.results if not r.ok]
if fails:
print(f" 失败 {len(fails)} 例,示例: {fails[0].error}")
Expand All @@ -249,15 +255,23 @@ def main() -> None:
p.add_argument("--max-tokens", type=int, default=256)
p.add_argument("--prompt", default=DEFAULT_PROMPT)
p.add_argument("--private-key", default=None, help="wallet key (else env / ~/.blockrun)")
p.add_argument("--cache-probe", action="store_true",
help="add 2 non-streaming calls to measure cache hit rate (extra spend)")
p.add_argument(
"--cache-probe",
action="store_true",
help="add 2 non-streaming calls to measure cache hit rate (extra spend)",
)
args = p.parse_args()

api_url = args.api_url or (SOLANA_API_URL if args.chain == "solana" else BASE_API_URL)
bench = Bench(
chain=args.chain, model=args.model, api_url=api_url,
requests=args.requests, concurrency=args.concurrency,
prompt=args.prompt, max_tokens=args.max_tokens, private_key=args.private_key,
chain=args.chain,
model=args.model,
api_url=api_url,
requests=args.requests,
concurrency=args.concurrency,
prompt=args.prompt,
max_tokens=args.max_tokens,
private_key=args.private_key,
)
print(f"[benchmark] {args.requests} paid streaming requests → {api_url} ({args.model}) …")
wall = bench.run_throughput_phase()
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/test_image_poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from __future__ import annotations

import json
from typing import List

import httpx
Expand Down Expand Up @@ -240,7 +239,6 @@ def test_image_poll_surfaces_upstream_failure(monkeypatch: pytest.MonkeyPatch) -
monkeypatch.setattr(ImageClient, "IMAGE_POLL_INTERVAL_SECONDS", 0.0)

def handler(request: httpx.Request) -> httpx.Response:
path = request.url.path
if request.method == "POST":
if "PAYMENT-SIGNATURE" not in request.headers:
return _payment_required_402(request)
Expand Down
Loading
Loading