feat: Web Streams API (ReadableStream + WritableStream + TransformStream) — closes #237#301
Merged
proggeramlug merged 2 commits intoPerryTS:mainfrom Apr 29, 2026
Merged
Conversation
…sformStream) — closes PerryTS#237 Adds the WHATWG Web Streams API to Perry as a single coherent surface so `blob.stream()`, `response.body`, user-source `new ReadableStream({...})`, `new WritableStream({...})` and `new TransformStream({...})` all work via the same handle/dispatch pattern that landed for Blob in PerryTS#234. Avoids the discoverability trap the issue flagged where one consumer would work and the rest would silently fail. Surface delivered: - `ReadableStream<Uint8Array>` ctor, `getReader()`, `cancel()`, `tee()`, `pipeTo()`, `pipeThrough()`, `locked` - `ReadableStreamDefaultReader` `read()`, `releaseLock()`, `cancel()`, `closed` - `WritableStream` ctor, `getWriter()`, `abort()`, `close()`, `locked` - `WritableStreamDefaultWriter` `write()`, `close()`, `abort()`, `releaseLock()`, `closed`, `ready`, `desiredSize` - `TransformStream` ctor, `readable` / `writable` getters - `controller.enqueue/close/error/desiredSize` on the readable side - `Symbol.asyncIterator` lowering: `for await (const c of stream)` desugars to a `getReader()/read()` loop in the HIR - `blob.stream()` returns a single-chunk ReadableStream over the buffered bytes - `response.body` returns a single-chunk ReadableStream over the buffered body bytes (matches Perry's existing buffered fetch model; true reqwest-chunked streaming is tracked as a PerryTS#237 follow-up) Why a buffered model for `response.body`: the existing fetch path already eagerly drains `reqwest::Response::bytes()`, so wrapping the result in a ReadableStream with one chunk gives every consumer the same WHATWG-shaped surface without needing the cross-thread tokio bridge this PR would otherwise have to introduce. Real chunked streaming is a focused follow-up that doesn't break the API. Implementation: - New `crates/perry-stdlib/src/streams.rs` (~1.3k LOC) — registries (`READABLE_STREAMS`, `WRITABLE_STREAMS`, `TRANSFORM_STREAMS`, `READERS`, `WRITERS`), every FFI, and a GC root scanner that marks user closures + queued chunks + pending-read promises so the malloc-triggered sweep can't free them out from under the dispatch. Mirrors `BLOB_REGISTRY` shape from `fetch.rs:827` and `ws.rs::ensure_gc_scanner_registered` for the GC bridge. - HIR pre-registers the controller param of every `start` / `pull` / `transform` / `flush` callback on stream constructors as a `readable_stream::ReadableStream` native instance, so `controller.enqueue(...)` inside the closure resolves through the new codegen arms instead of silently no-op'ing. `transform(chunk, controller)` correctly registers param 1 as the controller. - Codegen adds 5 module dispatch arms in `lower_call.rs` (`readable_stream`, `readable_stream_reader`, `writable_stream`, `writable_stream_writer`, `transform_stream`) plus the `response.body` and `blob.stream()` properties on existing arms; new ctor arms in `lower_call/builtin.rs` for the three stream classes; 44 new FFI declarations in `runtime_decls.rs`. - HIR `destructuring.rs` adds 6 chained-typed-method binding arms so bindings produced by `getReader()` / `getWriter()` / `body` / `readable` / `writable` / `pipeThrough()` / `blob.stream()` / `new {Readable,Writable,Transform}Stream(...)` get tagged with the right native instance type for downstream method dispatch. - HIR `lower_decl.rs` `for await of` lowering recognizes `ReadableStream` iterables and lowers to the synthetic `getReader()/read()` loop documented in the issue's acceptance criteria. Stubs that throw clear "not yet implemented" errors with a PerryTS#237 follow-up pointer: - BYOB readers - Custom `QueuingStrategy` / `ByteLengthQueuingStrategy` - `ReadableStream.from(asyncIterable)` Tests (under `test-files/`): - `test_issue_237_streams_blob.ts` — acceptance criteria PerryTS#1 + PerryTS#2 (`blob.stream().getReader().read()` round-trip + `response.body` + `for await`) - `test_issue_237_streams_user_source.ts` — `new ReadableStream({ start(controller) { ... } })` + `controller.enqueue` / `controller.close` flow - `test_issue_237_streams_pipe.ts` — `pipeTo` / `pipeThrough` with TransformStream + `tee()` Out of scope (filed as follow-ups when this lands): - True chunked `response.body` from `reqwest::Response::chunk()` with a cross-thread tokio bridge - `branches[0]` / `branches[1]` index access on `tee()` results auto- retagging as ReadableStream native instances (today users either destructure `const [a, b] = stream.tee()` once that path is wired too, or drive branches via direct FFI) - BYOB / ByteLengthQueuingStrategy / `ReadableStream.from(asyncIterable)` Verified: 3 new regression tests pass byte-for-byte; existing `test_issue_234_blob_methods.ts` still passes.
proggeramlug
added a commit
that referenced
this pull request
Apr 29, 2026
… merges The lint job on c61296b caught rustfmt drift from the recent batch of squash-merges: - crates/perry-doc-tests/src/image_diff.rs (PR #298) - crates/perry-hir/src/destructuring.rs (PR #301) - crates/perry-hir/src/lower/expr_new.rs (PR #301) - crates/perry-hir/src/lower_decl.rs (PR #301) - crates/perry-stdlib/src/fetch.rs (PR #301) - crates/perry-stdlib/src/streams.rs (PR #301) - crates/perry-stdlib/src/ethers.rs (PR #299, my conflict resolution inserted a long line that needed wrapping) - crates/perry-codegen/src/lower_call.rs + crates/perry-codegen/src/lower_call/builtin.rs + crates/perry-codegen/src/runtime_decls.rs (PR #301) Plus Cargo.lock sync from 0.5.384 → 0.5.385 (my v0.5.385 commit landed Cargo.toml's version bump but I forgot to stage Cargo.lock). Pure `cargo fmt --all` output, no hand edits. Verified `cargo build --release -p perry -p perry-runtime -p perry-stdlib` clean post-fmt in 1m 31s. No version bump — same precedent as ea95e85 (rustfmt baseline as chore companion to #294).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes #237. Lands the WHATWG Web Streams API as a coherent surface so
blob.stream(),response.body, user-sourcenew ReadableStream({...}),new WritableStream({...}), andnew TransformStream({...})all work via the same handle/dispatch pattern that landed for Blob in #234. Avoids the discoverability trap the issue flagged where one consumer would work and the rest would silently fail.Surface delivered
ReadableStream<Uint8Array>ctor,getReader(),cancel(),tee(),pipeTo(),pipeThrough(),lockedReadableStreamDefaultReaderread(),releaseLock(),cancel(),closedWritableStreamctor,getWriter(),abort(),close(),lockedWritableStreamDefaultWriterwrite(),close(),abort(),releaseLock(),closed,ready,desiredSizeTransformStreamctor,readable/writablegetterscontroller.enqueue/close/error/desiredSizeon the readable controllerSymbol.asyncIteratorlowering:for await (const c of stream)desugars to agetReader()/read()loop in HIRblob.stream()returns a single-chunkReadableStreamover the buffered bytesresponse.bodyreturns a single-chunkReadableStreamover the buffered body bytes (matches Perry's existing buffered fetch model; true reqwest-chunked streaming is tracked as a Web Streams API: implement ReadableStream + blob.stream() / response.body #237 follow-up)Implementation
crates/perry-stdlib/src/streams.rs(~1.3k LOC) — registries (READABLE_STREAMS,WRITABLE_STREAMS,TRANSFORM_STREAMS,READERS,WRITERS), every FFI, plus a GC root scanner that marks user closures + queued chunks + pending-read promises so the malloc-triggered sweep can't free them out from under dispatch. MirrorsBLOB_REGISTRYfromfetch.rs:827and the GC bridge fromws.rs::ensure_gc_scanner_registered.start/pull/transform/flushcallback in stream constructors as areadable_stream::ReadableStreamnative instance socontroller.enqueue(...)inside the closure resolves through the new codegen arms instead of silently no-op'ing.transform(chunk, controller)correctly registers param 1.lower_call.rs(readable_stream,readable_stream_reader,writable_stream,writable_stream_writer,transform_stream) plusresponse.bodyandblob.stream()on existing arms.lower_call/builtin.rsdestructuring{start, pull, cancel}/{write, close, abort}/{transform, flush}+highWaterMark.runtime_decls.rs.destructuring.rssogetReader()/getWriter()/body/readable/writable/pipeThrough()/blob.stream()results get tagged with the right native instance type for downstream method dispatch.for await (const c of <ReadableStream>)lowering inlower_decl.rsthat desugars to the syntheticgetReader()/read()loop documented in the issue's acceptance criteria.Why a buffered model for
response.bodyThe existing fetch path already eagerly drains
reqwest::Response::bytes(). Wrapping that in a single-chunkReadableStreamgives every consumer the same WHATWG-shaped surface without needing a cross-thread tokio bridge in this PR. Real chunked streaming overreqwest::Response::chunk()is a focused follow-up that doesn't break the API.Stubs that throw clear "not yet implemented" errors
Genuinely deferred surface — each throws an
Error("... not yet implemented (issue #237 followup)")so users discover the gap loudly:QueuingStrategy/ByteLengthQueuingStrategyReadableStream.from(asyncIterable)Test plan
3 new regression tests under
test-files/, all passing byte-for-byte:test_issue_237_streams_blob.ts— acceptance criteria Support custom menu bar items #1 + linux compilation and README #2 (blob.stream().getReader().read()round-trip +response.body+for await of stream)test_issue_237_streams_user_source.ts—new ReadableStream({ start(controller) { ... } })+controller.enqueue/controller.closeflowtest_issue_237_streams_pipe.ts—pipeTointoWritableStream,pipeThroughof identityTransformStream,tee()shapeVerification:
cargo build --release -p perry-runtime -p perry-stdlib -p perry-hir -p perry-codegen -p perry— cleancargo test --release --workspace --exclude perry-ui-{ios,tvos,watchos,gtk4,android,windows,visionos}— all greentest_issue_234_blob_methods.tsstill passes byte-for-byte (no regression)Out of scope (file as follow-ups)
response.bodyfromreqwest::Response::chunk()with a cross-thread tokio bridgebranches[0]/branches[1]index access ontee()results auto-retagging as ReadableStream native instances (today users would destructureconst [a, b] = stream.tee()once that path is wired, or drive branches via direct FFI)ByteLengthQueuingStrategy/ReadableStream.from(asyncIterable)