Skip to content

feat: Web Streams API (ReadableStream + WritableStream + TransformStream) — closes #237#301

Merged
proggeramlug merged 2 commits intoPerryTS:mainfrom
TheHypnoo:fix/issue-237-web-streams
Apr 29, 2026
Merged

feat: Web Streams API (ReadableStream + WritableStream + TransformStream) — closes #237#301
proggeramlug merged 2 commits intoPerryTS:mainfrom
TheHypnoo:fix/issue-237-web-streams

Conversation

@TheHypnoo
Copy link
Copy Markdown
Contributor

Summary

Closes #237. Lands the WHATWG Web Streams API as a coherent surface so blob.stream(), response.body, user-source new ReadableStream({...}), new WritableStream({...}), and new TransformStream({...}) all work via the same handle/dispatch pattern that landed for Blob in #234. Avoids the discoverability trap the issue flagged where one consumer would work and the rest would silently fail.

Surface delivered

  • ReadableStream<Uint8Array> ctor, getReader(), cancel(), tee(), pipeTo(), pipeThrough(), locked
  • ReadableStreamDefaultReader read(), releaseLock(), cancel(), closed
  • WritableStream ctor, getWriter(), abort(), close(), locked
  • WritableStreamDefaultWriter write(), close(), abort(), releaseLock(), closed, ready, desiredSize
  • TransformStream ctor, readable / writable getters
  • controller.enqueue/close/error/desiredSize on the readable controller
  • Symbol.asyncIterator lowering: for await (const c of stream) desugars to a getReader()/read() loop in HIR
  • blob.stream() returns a single-chunk ReadableStream over the buffered bytes
  • response.body returns a single-chunk ReadableStream over the buffered body bytes (matches Perry's existing buffered fetch model; true reqwest-chunked streaming is tracked as a Web Streams API: implement ReadableStream + blob.stream() / response.body #237 follow-up)

Implementation

  • New crates/perry-stdlib/src/streams.rs (~1.3k LOC) — registries (READABLE_STREAMS, WRITABLE_STREAMS, TRANSFORM_STREAMS, READERS, WRITERS), every FFI, plus a GC root scanner that marks user closures + queued chunks + pending-read promises so the malloc-triggered sweep can't free them out from under dispatch. Mirrors BLOB_REGISTRY from fetch.rs:827 and the GC bridge from ws.rs::ensure_gc_scanner_registered.
  • HIR pre-registers the controller param of every start / pull / transform / flush callback in stream constructors as a readable_stream::ReadableStream native instance so controller.enqueue(...) inside the closure resolves through the new codegen arms instead of silently no-op'ing. transform(chunk, controller) correctly registers param 1.
  • 5 new module dispatch arms in lower_call.rs (readable_stream, readable_stream_reader, writable_stream, writable_stream_writer, transform_stream) plus response.body and blob.stream() on existing arms.
  • 3 new ctor arms in lower_call/builtin.rs destructuring {start, pull, cancel} / {write, close, abort} / {transform, flush} + highWaterMark.
  • 44 new FFI declarations in runtime_decls.rs.
  • 6 chained-typed-method binding arms in destructuring.rs so getReader() / getWriter() / body / readable / writable / pipeThrough() / blob.stream() results get tagged with the right native instance type for downstream method dispatch.
  • New for await (const c of <ReadableStream>) lowering in lower_decl.rs that desugars to the synthetic getReader()/read() loop documented in the issue's acceptance criteria.

Why a buffered model for response.body

The existing fetch path already eagerly drains reqwest::Response::bytes(). Wrapping that in a single-chunk ReadableStream gives every consumer the same WHATWG-shaped surface without needing a cross-thread tokio bridge in this PR. Real chunked streaming over reqwest::Response::chunk() is a focused follow-up that doesn't break the API.

Stubs that throw clear "not yet implemented" errors

Genuinely deferred surface — each throws an Error("... not yet implemented (issue #237 followup)") so users discover the gap loudly:

  • BYOB readers
  • Custom QueuingStrategy / ByteLengthQueuingStrategy
  • ReadableStream.from(asyncIterable)

Test plan

3 new regression tests under test-files/, all passing byte-for-byte:

Verification:

  • cargo build --release -p perry-runtime -p perry-stdlib -p perry-hir -p perry-codegen -p perry — clean
  • cargo test --release --workspace --exclude perry-ui-{ios,tvos,watchos,gtk4,android,windows,visionos} — all green
  • All 3 new regression tests pass end-to-end on the macOS arm64 host
  • Existing test_issue_234_blob_methods.ts still passes byte-for-byte (no regression)
  • Real-world example with a streaming consumer — needs a follow-up after the PR lands once a real consumer surfaces

Out of scope (file as follow-ups)

  • True chunked response.body from reqwest::Response::chunk() with a cross-thread tokio bridge
  • branches[0] / branches[1] index access on tee() results auto-retagging as ReadableStream native instances (today users would destructure const [a, b] = stream.tee() once that path is wired, or drive branches via direct FFI)
  • BYOB / ByteLengthQueuingStrategy / ReadableStream.from(asyncIterable)

TheHypnoo and others added 2 commits April 29, 2026 21:09
…sformStream) — closes PerryTS#237

Adds the WHATWG Web Streams API to Perry as a single coherent surface so
`blob.stream()`, `response.body`, user-source `new ReadableStream({...})`,
`new WritableStream({...})` and `new TransformStream({...})` all work via
the same handle/dispatch pattern that landed for Blob in PerryTS#234. Avoids the
discoverability trap the issue flagged where one consumer would work and
the rest would silently fail.

Surface delivered:
- `ReadableStream<Uint8Array>` ctor, `getReader()`, `cancel()`, `tee()`,
  `pipeTo()`, `pipeThrough()`, `locked`
- `ReadableStreamDefaultReader` `read()`, `releaseLock()`, `cancel()`,
  `closed`
- `WritableStream` ctor, `getWriter()`, `abort()`, `close()`, `locked`
- `WritableStreamDefaultWriter` `write()`, `close()`, `abort()`,
  `releaseLock()`, `closed`, `ready`, `desiredSize`
- `TransformStream` ctor, `readable` / `writable` getters
- `controller.enqueue/close/error/desiredSize` on the readable side
- `Symbol.asyncIterator` lowering: `for await (const c of stream)`
  desugars to a `getReader()/read()` loop in the HIR
- `blob.stream()` returns a single-chunk ReadableStream over the
  buffered bytes
- `response.body` returns a single-chunk ReadableStream over the
  buffered body bytes (matches Perry's existing buffered fetch model;
  true reqwest-chunked streaming is tracked as a PerryTS#237 follow-up)

Why a buffered model for `response.body`: the existing fetch path
already eagerly drains `reqwest::Response::bytes()`, so wrapping the
result in a ReadableStream with one chunk gives every consumer the same
WHATWG-shaped surface without needing the cross-thread tokio bridge
this PR would otherwise have to introduce. Real chunked streaming is a
focused follow-up that doesn't break the API.

Implementation:
- New `crates/perry-stdlib/src/streams.rs` (~1.3k LOC) — registries
  (`READABLE_STREAMS`, `WRITABLE_STREAMS`, `TRANSFORM_STREAMS`,
  `READERS`, `WRITERS`), every FFI, and a GC root scanner that marks
  user closures + queued chunks + pending-read promises so the
  malloc-triggered sweep can't free them out from under the dispatch.
  Mirrors `BLOB_REGISTRY` shape from `fetch.rs:827` and
  `ws.rs::ensure_gc_scanner_registered` for the GC bridge.
- HIR pre-registers the controller param of every `start` / `pull` /
  `transform` / `flush` callback on stream constructors as a
  `readable_stream::ReadableStream` native instance, so
  `controller.enqueue(...)` inside the closure resolves through the new
  codegen arms instead of silently no-op'ing. `transform(chunk,
  controller)` correctly registers param 1 as the controller.
- Codegen adds 5 module dispatch arms in `lower_call.rs`
  (`readable_stream`, `readable_stream_reader`, `writable_stream`,
  `writable_stream_writer`, `transform_stream`) plus the
  `response.body` and `blob.stream()` properties on existing arms; new
  ctor arms in `lower_call/builtin.rs` for the three stream classes;
  44 new FFI declarations in `runtime_decls.rs`.
- HIR `destructuring.rs` adds 6 chained-typed-method binding arms so
  bindings produced by `getReader()` / `getWriter()` / `body` /
  `readable` / `writable` / `pipeThrough()` / `blob.stream()` /
  `new {Readable,Writable,Transform}Stream(...)` get tagged with the
  right native instance type for downstream method dispatch.
- HIR `lower_decl.rs` `for await of` lowering recognizes
  `ReadableStream` iterables and lowers to the synthetic
  `getReader()/read()` loop documented in the issue's acceptance
  criteria.

Stubs that throw clear "not yet implemented" errors with a PerryTS#237
follow-up pointer:
- BYOB readers
- Custom `QueuingStrategy` / `ByteLengthQueuingStrategy`
- `ReadableStream.from(asyncIterable)`

Tests (under `test-files/`):
- `test_issue_237_streams_blob.ts` — acceptance criteria PerryTS#1 + PerryTS#2
  (`blob.stream().getReader().read()` round-trip + `response.body` +
  `for await`)
- `test_issue_237_streams_user_source.ts` — `new ReadableStream({
  start(controller) { ... } })` + `controller.enqueue` /
  `controller.close` flow
- `test_issue_237_streams_pipe.ts` — `pipeTo` / `pipeThrough` with
  TransformStream + `tee()`

Out of scope (filed as follow-ups when this lands):
- True chunked `response.body` from `reqwest::Response::chunk()` with a
  cross-thread tokio bridge
- `branches[0]` / `branches[1]` index access on `tee()` results auto-
  retagging as ReadableStream native instances (today users either
  destructure `const [a, b] = stream.tee()` once that path is wired
  too, or drive branches via direct FFI)
- BYOB / ByteLengthQueuingStrategy / `ReadableStream.from(asyncIterable)`

Verified: 3 new regression tests pass byte-for-byte; existing
`test_issue_234_blob_methods.ts` still passes.
@proggeramlug proggeramlug merged commit 7a302c5 into PerryTS:main Apr 29, 2026
1 of 2 checks passed
@TheHypnoo TheHypnoo deleted the fix/issue-237-web-streams branch April 29, 2026 19:46
proggeramlug added a commit that referenced this pull request Apr 29, 2026
… merges

The lint job on c61296b caught rustfmt drift from the recent batch
of squash-merges:

- crates/perry-doc-tests/src/image_diff.rs (PR #298)
- crates/perry-hir/src/destructuring.rs (PR #301)
- crates/perry-hir/src/lower/expr_new.rs (PR #301)
- crates/perry-hir/src/lower_decl.rs (PR #301)
- crates/perry-stdlib/src/fetch.rs (PR #301)
- crates/perry-stdlib/src/streams.rs (PR #301)
- crates/perry-stdlib/src/ethers.rs (PR #299, my conflict resolution
  inserted a long line that needed wrapping)
- crates/perry-codegen/src/lower_call.rs +
  crates/perry-codegen/src/lower_call/builtin.rs +
  crates/perry-codegen/src/runtime_decls.rs (PR #301)

Plus Cargo.lock sync from 0.5.384 → 0.5.385 (my v0.5.385 commit
landed Cargo.toml's version bump but I forgot to stage Cargo.lock).

Pure `cargo fmt --all` output, no hand edits. Verified
`cargo build --release -p perry -p perry-runtime -p perry-stdlib`
clean post-fmt in 1m 31s.

No version bump — same precedent as ea95e85 (rustfmt baseline as
chore companion to #294).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Web Streams API: implement ReadableStream + blob.stream() / response.body

2 participants