From e24e9e88365b934cdbef7642b2c1804121908d69 Mon Sep 17 00:00:00 2001 From: TheHypnoo Date: Wed, 29 Apr 2026 21:09:23 +0200 Subject: [PATCH] =?UTF-8?q?feat(stdlib):=20web=20streams=20api=20(Readable?= =?UTF-8?q?Stream=20+=20WritableStream=20+=20TransformStream)=20=E2=80=94?= =?UTF-8?q?=20closes=20#237?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the WHATWG Web Streams API to Perry as a single coherent surface so `blob.stream()`, `response.body`, user-source `new ReadableStream({...})`, `new WritableStream({...})` and `new TransformStream({...})` all work via the same handle/dispatch pattern that landed for Blob in #234. Avoids the discoverability trap the issue flagged where one consumer would work and the rest would silently fail. Surface delivered: - `ReadableStream` ctor, `getReader()`, `cancel()`, `tee()`, `pipeTo()`, `pipeThrough()`, `locked` - `ReadableStreamDefaultReader` `read()`, `releaseLock()`, `cancel()`, `closed` - `WritableStream` ctor, `getWriter()`, `abort()`, `close()`, `locked` - `WritableStreamDefaultWriter` `write()`, `close()`, `abort()`, `releaseLock()`, `closed`, `ready`, `desiredSize` - `TransformStream` ctor, `readable` / `writable` getters - `controller.enqueue/close/error/desiredSize` on the readable side - `Symbol.asyncIterator` lowering: `for await (const c of stream)` desugars to a `getReader()/read()` loop in the HIR - `blob.stream()` returns a single-chunk ReadableStream over the buffered bytes - `response.body` returns a single-chunk ReadableStream over the buffered body bytes (matches Perry's existing buffered fetch model; true reqwest-chunked streaming is tracked as a #237 follow-up) Why a buffered model for `response.body`: the existing fetch path already eagerly drains `reqwest::Response::bytes()`, so wrapping the result in a ReadableStream with one chunk gives every consumer the same WHATWG-shaped surface without needing the cross-thread tokio bridge this PR would otherwise have to introduce. Real chunked streaming is a focused follow-up that doesn't break the API. Implementation: - New `crates/perry-stdlib/src/streams.rs` (~1.3k LOC) — registries (`READABLE_STREAMS`, `WRITABLE_STREAMS`, `TRANSFORM_STREAMS`, `READERS`, `WRITERS`), every FFI, and a GC root scanner that marks user closures + queued chunks + pending-read promises so the malloc-triggered sweep can't free them out from under the dispatch. Mirrors `BLOB_REGISTRY` shape from `fetch.rs:827` and `ws.rs::ensure_gc_scanner_registered` for the GC bridge. - HIR pre-registers the controller param of every `start` / `pull` / `transform` / `flush` callback on stream constructors as a `readable_stream::ReadableStream` native instance, so `controller.enqueue(...)` inside the closure resolves through the new codegen arms instead of silently no-op'ing. `transform(chunk, controller)` correctly registers param 1 as the controller. - Codegen adds 5 module dispatch arms in `lower_call.rs` (`readable_stream`, `readable_stream_reader`, `writable_stream`, `writable_stream_writer`, `transform_stream`) plus the `response.body` and `blob.stream()` properties on existing arms; new ctor arms in `lower_call/builtin.rs` for the three stream classes; 44 new FFI declarations in `runtime_decls.rs`. - HIR `destructuring.rs` adds 6 chained-typed-method binding arms so bindings produced by `getReader()` / `getWriter()` / `body` / `readable` / `writable` / `pipeThrough()` / `blob.stream()` / `new {Readable,Writable,Transform}Stream(...)` get tagged with the right native instance type for downstream method dispatch. - HIR `lower_decl.rs` `for await of` lowering recognizes `ReadableStream` iterables and lowers to the synthetic `getReader()/read()` loop documented in the issue's acceptance criteria. Stubs that throw clear "not yet implemented" errors with a #237 follow-up pointer: - BYOB readers - Custom `QueuingStrategy` / `ByteLengthQueuingStrategy` - `ReadableStream.from(asyncIterable)` Tests (under `test-files/`): - `test_issue_237_streams_blob.ts` — acceptance criteria #1 + #2 (`blob.stream().getReader().read()` round-trip + `response.body` + `for await`) - `test_issue_237_streams_user_source.ts` — `new ReadableStream({ start(controller) { ... } })` + `controller.enqueue` / `controller.close` flow - `test_issue_237_streams_pipe.ts` — `pipeTo` / `pipeThrough` with TransformStream + `tee()` Out of scope (filed as follow-ups when this lands): - True chunked `response.body` from `reqwest::Response::chunk()` with a cross-thread tokio bridge - `branches[0]` / `branches[1]` index access on `tee()` results auto- retagging as ReadableStream native instances (today users either destructure `const [a, b] = stream.tee()` once that path is wired too, or drive branches via direct FFI) - BYOB / ByteLengthQueuingStrategy / `ReadableStream.from(asyncIterable)` Verified: 3 new regression tests pass byte-for-byte; existing `test_issue_234_blob_methods.ts` still passes. --- crates/perry-codegen/src/lower_call.rs | 231 +++ .../perry-codegen/src/lower_call/builtin.rs | 104 ++ crates/perry-codegen/src/runtime_decls.rs | 44 + crates/perry-hir/src/destructuring.rs | 89 ++ crates/perry-hir/src/lower/expr_new.rs | 87 ++ crates/perry-hir/src/lower_decl.rs | 110 ++ crates/perry-stdlib/src/fetch.rs | 44 + crates/perry-stdlib/src/lib.rs | 6 + crates/perry-stdlib/src/streams.rs | 1331 +++++++++++++++++ test-files/test_issue_237_streams_blob.ts | 40 + test-files/test_issue_237_streams_pipe.ts | 65 + .../test_issue_237_streams_user_source.ts | 31 + 12 files changed, 2182 insertions(+) create mode 100644 crates/perry-stdlib/src/streams.rs create mode 100644 test-files/test_issue_237_streams_blob.ts create mode 100644 test-files/test_issue_237_streams_pipe.ts create mode 100644 test-files/test_issue_237_streams_user_source.ts diff --git a/crates/perry-codegen/src/lower_call.rs b/crates/perry-codegen/src/lower_call.rs index e06b0cba..505ae469 100644 --- a/crates/perry-codegen/src/lower_call.rs +++ b/crates/perry-codegen/src/lower_call.rs @@ -3204,6 +3204,13 @@ pub(super) fn lower_fetch_native_method( let promise = blk.call(I64, "js_response_blob", &[(DOUBLE, &recv_handle)]); return Ok(Some(nanbox_pointer_inline(blk, &promise))); } + // Issue #237: response.body — returns ReadableStream over the + // buffered body bytes. Property access lowers as a zero-arg + // method call here, same as response.headers above. + "body" => { + let h = ctx.block().call(DOUBLE, "js_response_body", &[(DOUBLE, &recv_handle)]); + return Ok(Some(h)); + } _ => return Ok(None), } } @@ -3276,6 +3283,230 @@ pub(super) fn lower_fetch_native_method( ); return Ok(Some(new_handle)); } + // Issue #237: blob.stream() — returns ReadableStream over the + // blob's bytes. Single-chunk; closes after one read. + "stream" => { + let h = ctx.block().call(DOUBLE, "js_blob_stream", &[(DOUBLE, &recv_handle)]); + return Ok(Some(h)); + } + _ => return Ok(None), + } + } + + // ───────────────────────────────────────────────────────────────── + // Web Streams API (issue #237) + // The receivers are numeric registry-id handles carried as f64, + // mirroring the Blob/Response handle ABI. Locals are tagged + // (module, class_name) by `register_native_instance` in + // `destructuring.rs`. + // ───────────────────────────────────────────────────────────────── + + if module == "readable_stream" { + let recv_handle = lower_expr(ctx, recv)?; + match method { + "getReader" => { + let h = ctx.block().call(DOUBLE, "js_readable_stream_get_reader", &[(DOUBLE, &recv_handle)]); + return Ok(Some(h)); + } + "cancel" => { + let reason = if !args.is_empty() { + lower_expr(ctx, &args[0])? + } else { + double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)) + }; + let blk = ctx.block(); + let promise = blk.call(I64, "js_readable_stream_cancel", &[(DOUBLE, &recv_handle), (DOUBLE, &reason)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + "tee" => { + let h = ctx.block().call(DOUBLE, "js_readable_stream_tee", &[(DOUBLE, &recv_handle)]); + return Ok(Some(h)); + } + "pipeTo" => { + let dest = if !args.is_empty() { + lower_expr(ctx, &args[0])? + } else { + double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)) + }; + let blk = ctx.block(); + let promise = blk.call(I64, "js_readable_stream_pipe_to", &[(DOUBLE, &recv_handle), (DOUBLE, &dest)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + "pipeThrough" => { + // pipeThrough(transform) — transform has .readable / .writable. + // We need both sub-handles. Lower the transform once, then + // call js_transform_stream_writable / _readable to extract. + let transform = if !args.is_empty() { + lower_expr(ctx, &args[0])? + } else { + double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)) + }; + let writable = ctx.block().call(DOUBLE, "js_transform_stream_writable", &[(DOUBLE, &transform)]); + let readable = ctx.block().call(DOUBLE, "js_transform_stream_readable", &[(DOUBLE, &transform)]); + let new_h = ctx.block().call( + DOUBLE, + "js_readable_stream_pipe_through", + &[(DOUBLE, &recv_handle), (DOUBLE, &writable), (DOUBLE, &readable)], + ); + return Ok(Some(new_h)); + } + "locked" => { + let v = ctx.block().call(DOUBLE, "js_readable_stream_locked", &[(DOUBLE, &recv_handle)]); + return Ok(Some(v)); + } + // ReadableStreamDefaultController on the same handle: + "enqueue" => { + let chunk = if !args.is_empty() { + lower_expr(ctx, &args[0])? + } else { + double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)) + }; + let v = ctx.block().call(DOUBLE, "js_readable_stream_controller_enqueue", &[(DOUBLE, &recv_handle), (DOUBLE, &chunk)]); + return Ok(Some(v)); + } + "close" => { + let v = ctx.block().call(DOUBLE, "js_readable_stream_controller_close", &[(DOUBLE, &recv_handle)]); + return Ok(Some(v)); + } + "error" => { + let reason = if !args.is_empty() { + lower_expr(ctx, &args[0])? + } else { + double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)) + }; + let v = ctx.block().call(DOUBLE, "js_readable_stream_controller_error", &[(DOUBLE, &recv_handle), (DOUBLE, &reason)]); + return Ok(Some(v)); + } + "desiredSize" => { + let v = ctx.block().call(DOUBLE, "js_readable_stream_controller_desired_size", &[(DOUBLE, &recv_handle)]); + return Ok(Some(v)); + } + _ => return Ok(None), + } + } + + if module == "readable_stream_reader" { + let recv_handle = lower_expr(ctx, recv)?; + match method { + "read" => { + let blk = ctx.block(); + let promise = blk.call(I64, "js_reader_read", &[(DOUBLE, &recv_handle)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + "releaseLock" => { + let v = ctx.block().call(DOUBLE, "js_reader_release_lock", &[(DOUBLE, &recv_handle)]); + return Ok(Some(v)); + } + "cancel" => { + let reason = if !args.is_empty() { + lower_expr(ctx, &args[0])? + } else { + double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)) + }; + let blk = ctx.block(); + let promise = blk.call(I64, "js_reader_cancel", &[(DOUBLE, &recv_handle), (DOUBLE, &reason)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + "closed" => { + let blk = ctx.block(); + let promise = blk.call(I64, "js_reader_closed", &[(DOUBLE, &recv_handle)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + _ => return Ok(None), + } + } + + if module == "writable_stream" { + let recv_handle = lower_expr(ctx, recv)?; + match method { + "getWriter" => { + let h = ctx.block().call(DOUBLE, "js_writable_stream_get_writer", &[(DOUBLE, &recv_handle)]); + return Ok(Some(h)); + } + "abort" => { + let reason = if !args.is_empty() { + lower_expr(ctx, &args[0])? + } else { + double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)) + }; + let blk = ctx.block(); + let promise = blk.call(I64, "js_writable_stream_abort", &[(DOUBLE, &recv_handle), (DOUBLE, &reason)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + "close" => { + let blk = ctx.block(); + let promise = blk.call(I64, "js_writable_stream_close", &[(DOUBLE, &recv_handle)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + "locked" => { + let v = ctx.block().call(DOUBLE, "js_writable_stream_locked", &[(DOUBLE, &recv_handle)]); + return Ok(Some(v)); + } + _ => return Ok(None), + } + } + + if module == "writable_stream_writer" { + let recv_handle = lower_expr(ctx, recv)?; + match method { + "write" => { + let chunk = if !args.is_empty() { + lower_expr(ctx, &args[0])? + } else { + double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)) + }; + let blk = ctx.block(); + let promise = blk.call(I64, "js_writer_write", &[(DOUBLE, &recv_handle), (DOUBLE, &chunk)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + "close" => { + let blk = ctx.block(); + let promise = blk.call(I64, "js_writer_close", &[(DOUBLE, &recv_handle)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + "abort" => { + let reason = if !args.is_empty() { + lower_expr(ctx, &args[0])? + } else { + double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)) + }; + let blk = ctx.block(); + let promise = blk.call(I64, "js_writer_abort", &[(DOUBLE, &recv_handle), (DOUBLE, &reason)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + "releaseLock" => { + let v = ctx.block().call(DOUBLE, "js_writer_release_lock", &[(DOUBLE, &recv_handle)]); + return Ok(Some(v)); + } + "closed" => { + let blk = ctx.block(); + let promise = blk.call(I64, "js_writer_closed", &[(DOUBLE, &recv_handle)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + "ready" => { + let blk = ctx.block(); + let promise = blk.call(I64, "js_writer_ready", &[(DOUBLE, &recv_handle)]); + return Ok(Some(nanbox_pointer_inline(blk, &promise))); + } + "desiredSize" => { + let v = ctx.block().call(DOUBLE, "js_writer_desired_size", &[(DOUBLE, &recv_handle)]); + return Ok(Some(v)); + } + _ => return Ok(None), + } + } + + if module == "transform_stream" { + let recv_handle = lower_expr(ctx, recv)?; + match method { + "readable" => { + let v = ctx.block().call(DOUBLE, "js_transform_stream_readable", &[(DOUBLE, &recv_handle)]); + return Ok(Some(v)); + } + "writable" => { + let v = ctx.block().call(DOUBLE, "js_transform_stream_writable", &[(DOUBLE, &recv_handle)]); + return Ok(Some(v)); + } _ => return Ok(None), } } diff --git a/crates/perry-codegen/src/lower_call/builtin.rs b/crates/perry-codegen/src/lower_call/builtin.rs index f877c64b..256a57ba 100644 --- a/crates/perry-codegen/src/lower_call/builtin.rs +++ b/crates/perry-codegen/src/lower_call/builtin.rs @@ -361,6 +361,110 @@ pub(super) fn lower_builtin_new( Ok(Some(handle)) } + // Issue #237: Web Streams API constructors. Source / sink / transform + // objects accept `start` / `pull` / `cancel` / `write` / `close` / + // `abort` / `transform` / `flush` callbacks; missing ones are passed + // as TAG_UNDEFINED so the runtime can no-op cleanly. + "ReadableStream" => { + let mut start = double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)); + let mut pull = double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)); + let mut cancel = double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)); + let mut hwm = double_literal(1.0); + if !args.is_empty() { + if let Some(props) = extract_options_fields(ctx, &args[0]) { + for (k, vexpr) in &props { + match k.as_str() { + "start" => { start = lower_expr(ctx, vexpr)?; } + "pull" => { pull = lower_expr(ctx, vexpr)?; } + "cancel" => { cancel = lower_expr(ctx, vexpr)?; } + _ => { let _ = lower_expr(ctx, vexpr)?; } + } + } + } else { + let _ = lower_expr(ctx, &args[0])?; + } + } + if args.len() >= 2 { + if let Some(qprops) = extract_options_fields(ctx, &args[1]) { + for (k, vexpr) in &qprops { + if k == "highWaterMark" { hwm = lower_expr(ctx, vexpr)?; } + } + } + } + let h = ctx.block().call( + DOUBLE, + "js_readable_stream_new", + &[(DOUBLE, &start), (DOUBLE, &pull), (DOUBLE, &cancel), (DOUBLE, &hwm)], + ); + Ok(Some(h)) + } + + "WritableStream" => { + let mut write = double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)); + let mut close = double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)); + let mut abort = double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)); + let mut hwm = double_literal(1.0); + if !args.is_empty() { + if let Some(props) = extract_options_fields(ctx, &args[0]) { + for (k, vexpr) in &props { + match k.as_str() { + "write" => { write = lower_expr(ctx, vexpr)?; } + "close" => { close = lower_expr(ctx, vexpr)?; } + "abort" => { abort = lower_expr(ctx, vexpr)?; } + _ => { let _ = lower_expr(ctx, vexpr)?; } + } + } + } else { + let _ = lower_expr(ctx, &args[0])?; + } + } + if args.len() >= 2 { + if let Some(qprops) = extract_options_fields(ctx, &args[1]) { + for (k, vexpr) in &qprops { + if k == "highWaterMark" { hwm = lower_expr(ctx, vexpr)?; } + } + } + } + let h = ctx.block().call( + DOUBLE, + "js_writable_stream_new", + &[(DOUBLE, &write), (DOUBLE, &close), (DOUBLE, &abort), (DOUBLE, &hwm)], + ); + Ok(Some(h)) + } + + "TransformStream" => { + let mut transform = double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)); + let mut flush = double_literal(f64::from_bits(crate::nanbox::TAG_UNDEFINED)); + let mut hwm = double_literal(1.0); + if !args.is_empty() { + if let Some(props) = extract_options_fields(ctx, &args[0]) { + for (k, vexpr) in &props { + match k.as_str() { + "transform" => { transform = lower_expr(ctx, vexpr)?; } + "flush" => { flush = lower_expr(ctx, vexpr)?; } + _ => { let _ = lower_expr(ctx, vexpr)?; } + } + } + } else { + let _ = lower_expr(ctx, &args[0])?; + } + } + if args.len() >= 2 { + if let Some(qprops) = extract_options_fields(ctx, &args[1]) { + for (k, vexpr) in &qprops { + if k == "highWaterMark" { hwm = lower_expr(ctx, vexpr)?; } + } + } + } + let h = ctx.block().call( + DOUBLE, + "js_transform_stream_new", + &[(DOUBLE, &transform), (DOUBLE, &flush), (DOUBLE, &hwm)], + ); + Ok(Some(h)) + } + "Promise" => { // `new Promise((resolve, reject) => { ... })` — the runtime's // `js_promise_new_with_executor` takes the closure, allocates diff --git a/crates/perry-codegen/src/runtime_decls.rs b/crates/perry-codegen/src/runtime_decls.rs index 053900c1..fd676db0 100644 --- a/crates/perry-codegen/src/runtime_decls.rs +++ b/crates/perry-codegen/src/runtime_decls.rs @@ -856,6 +856,50 @@ pub fn declare_phase_b_strings(module: &mut LlModule) { module.declare_function("js_response_static_json", DOUBLE, &[DOUBLE]); module.declare_function("js_response_static_redirect", DOUBLE, &[I64, DOUBLE]); + // ────────────────────────────────────────────────────────────────── + // Web Streams API (issue #237) — perry-stdlib/src/streams.rs + + // blob.stream() / response.body bridge in perry-stdlib/src/fetch.rs. + // Handles are numeric registry ids carried as f64; promise-returning + // FFIs return *mut Promise (I64) which codegen NaN-boxes via + // nanbox_pointer_inline. + // ────────────────────────────────────────────────────────────────── + module.declare_function("js_blob_stream", DOUBLE, &[DOUBLE]); + module.declare_function("js_response_body", DOUBLE, &[DOUBLE]); + // ReadableStream constructor + methods. + module.declare_function("js_readable_stream_new", DOUBLE, &[DOUBLE, DOUBLE, DOUBLE, DOUBLE]); + module.declare_function("js_readable_stream_get_reader", DOUBLE, &[DOUBLE]); + module.declare_function("js_readable_stream_locked", DOUBLE, &[DOUBLE]); + module.declare_function("js_readable_stream_cancel", I64, &[DOUBLE, DOUBLE]); + module.declare_function("js_readable_stream_tee", DOUBLE, &[DOUBLE]); + module.declare_function("js_readable_stream_pipe_to", I64, &[DOUBLE, DOUBLE]); + module.declare_function("js_readable_stream_pipe_through", DOUBLE, &[DOUBLE, DOUBLE, DOUBLE]); + module.declare_function("js_readable_stream_controller_enqueue", DOUBLE, &[DOUBLE, DOUBLE]); + module.declare_function("js_readable_stream_controller_close", DOUBLE, &[DOUBLE]); + module.declare_function("js_readable_stream_controller_error", DOUBLE, &[DOUBLE, DOUBLE]); + module.declare_function("js_readable_stream_controller_desired_size", DOUBLE, &[DOUBLE]); + // ReadableStreamDefaultReader. + module.declare_function("js_reader_read", I64, &[DOUBLE]); + module.declare_function("js_reader_release_lock", DOUBLE, &[DOUBLE]); + module.declare_function("js_reader_closed", I64, &[DOUBLE]); + module.declare_function("js_reader_cancel", I64, &[DOUBLE, DOUBLE]); + // WritableStream + Writer. + module.declare_function("js_writable_stream_new", DOUBLE, &[DOUBLE, DOUBLE, DOUBLE, DOUBLE]); + module.declare_function("js_writable_stream_get_writer", DOUBLE, &[DOUBLE]); + module.declare_function("js_writable_stream_locked", DOUBLE, &[DOUBLE]); + module.declare_function("js_writable_stream_close", I64, &[DOUBLE]); + module.declare_function("js_writable_stream_abort", I64, &[DOUBLE, DOUBLE]); + module.declare_function("js_writer_write", I64, &[DOUBLE, DOUBLE]); + module.declare_function("js_writer_close", I64, &[DOUBLE]); + module.declare_function("js_writer_abort", I64, &[DOUBLE, DOUBLE]); + module.declare_function("js_writer_release_lock", DOUBLE, &[DOUBLE]); + module.declare_function("js_writer_closed", I64, &[DOUBLE]); + module.declare_function("js_writer_ready", I64, &[DOUBLE]); + module.declare_function("js_writer_desired_size", DOUBLE, &[DOUBLE]); + // TransformStream. + module.declare_function("js_transform_stream_new", DOUBLE, &[DOUBLE, DOUBLE, DOUBLE]); + module.declare_function("js_transform_stream_readable", DOUBLE, &[DOUBLE]); + module.declare_function("js_transform_stream_writable", DOUBLE, &[DOUBLE]); + // ────────────────────────────────────────────────────────────────── // AbortController / AbortSignal — perry-runtime/src/url.rs. // Returns *mut ObjectHeader (i64 pointer) — codegen NaN-boxes with diff --git a/crates/perry-hir/src/destructuring.rs b/crates/perry-hir/src/destructuring.rs index 356c6f24..781d0d8c 100644 --- a/crates/perry-hir/src/destructuring.rs +++ b/crates/perry-hir/src/destructuring.rs @@ -1314,6 +1314,19 @@ pub(crate) fn lower_var_decl_with_destructuring( ctx.register_native_instance(name.clone(), "Request".to_string(), "Request".to_string()); ctx.uses_fetch = true; } + // Issue #237: Web Streams API constructors. + "ReadableStream" => { + ctx.register_native_instance(name.clone(), "readable_stream".to_string(), "ReadableStream".to_string()); + ctx.uses_fetch = true; + } + "WritableStream" => { + ctx.register_native_instance(name.clone(), "writable_stream".to_string(), "WritableStream".to_string()); + ctx.uses_fetch = true; + } + "TransformStream" => { + ctx.register_native_instance(name.clone(), "transform_stream".to_string(), "TransformStream".to_string()); + ctx.uses_fetch = true; + } _ => {} } } @@ -1403,6 +1416,82 @@ pub(crate) fn lower_var_decl_with_destructuring( } } } + + // Issue #237: Web Streams chained-typed-method bindings. + // Recognize chained method/property forms that return a new + // streams native instance so subsequent dispatch routes to + // the right `module == "..."` arm in lower_call.rs. + if let ast::Expr::Call(call_expr) = init_expr.as_ref() { + if let ast::Callee::Expr(callee) = &call_expr.callee { + if let ast::Expr::Member(member) = callee.as_ref() { + if let ast::Expr::Ident(obj_ident) = member.obj.as_ref() { + if let ast::MemberProp::Ident(prop_ident) = &member.prop { + let m = prop_ident.sym.as_ref().to_string(); + let class_owned = ctx + .lookup_native_instance(obj_ident.sym.as_ref()) + .map(|(_, c)| c.to_string()); + if let Some(c) = class_owned { + if m == "stream" && c == "Blob" { + ctx.register_native_instance(name.clone(), "readable_stream".to_string(), "ReadableStream".to_string()); + } + if m == "getReader" && c == "ReadableStream" { + ctx.register_native_instance(name.clone(), "readable_stream_reader".to_string(), "ReadableStreamDefaultReader".to_string()); + } + if m == "getWriter" && c == "WritableStream" { + ctx.register_native_instance(name.clone(), "writable_stream_writer".to_string(), "WritableStreamDefaultWriter".to_string()); + } + } + } + } + } + } + } + + // Issue #237: const stream = response.body / const r = ts.readable / .writable + // Property reads on a native instance — destructured as Member + // expressions (no Call wrapper). + if let ast::Expr::Member(member) = init_expr.as_ref() { + if let ast::Expr::Ident(obj_ident) = member.obj.as_ref() { + if let ast::MemberProp::Ident(prop_ident) = &member.prop { + let p = prop_ident.sym.as_ref().to_string(); + let class_owned = ctx + .lookup_native_instance(obj_ident.sym.as_ref()) + .map(|(_, c)| c.to_string()); + if let Some(c) = class_owned { + if p == "body" && c == "Response" { + ctx.register_native_instance(name.clone(), "readable_stream".to_string(), "ReadableStream".to_string()); + } + if p == "readable" && c == "TransformStream" { + ctx.register_native_instance(name.clone(), "readable_stream".to_string(), "ReadableStream".to_string()); + } + if p == "writable" && c == "TransformStream" { + ctx.register_native_instance(name.clone(), "writable_stream".to_string(), "WritableStream".to_string()); + } + } + } + } + } + + // Issue #237: const stream = upstream.pipeThrough(transform) + // returns a ReadableStream (the transform's readable side). + if let ast::Expr::Call(call_expr) = init_expr.as_ref() { + if let ast::Callee::Expr(callee) = &call_expr.callee { + if let ast::Expr::Member(member) = callee.as_ref() { + if let ast::Expr::Ident(obj_ident) = member.obj.as_ref() { + if let ast::MemberProp::Ident(prop_ident) = &member.prop { + if prop_ident.sym.as_ref() == "pipeThrough" { + let class_owned = ctx + .lookup_native_instance(obj_ident.sym.as_ref()) + .map(|(_, c)| c.to_string()); + if class_owned.as_deref() == Some("ReadableStream") { + ctx.register_native_instance(name.clone(), "readable_stream".to_string(), "ReadableStream".to_string()); + } + } + } + } + } + } + } } // Check if calling a function whose return type is a native module type diff --git a/crates/perry-hir/src/lower/expr_new.rs b/crates/perry-hir/src/lower/expr_new.rs index 204f120b..a0251138 100644 --- a/crates/perry-hir/src/lower/expr_new.rs +++ b/crates/perry-hir/src/lower/expr_new.rs @@ -20,6 +20,93 @@ use crate::lower_types::extract_ts_type_with_ctx; use super::{lower_expr, LoweringContext}; pub(super) fn lower_new(ctx: &mut LoweringContext, new_expr: &ast::NewExpr) -> Result { + // Issue #237: pre-register the controller param of every + // `start` / `pull` / `cancel` / `transform` / `flush` callback + // passed to `new ReadableStream({...})` / + // `new TransformStream({...})` as a native instance so + // `controller.enqueue(...)` etc. dispatch through the streams + // arms in lower_call.rs. Without this hook the callback's + // `controller` param has no type-tagged binding and method + // calls on it silently no-op. Each field maps to (param_index, + // module, class_name) — TransformStream's `transform(chunk, + // controller)` controller is param 1, the rest are param 0. + if let ast::Expr::Ident(ident) = new_expr.callee.as_ref() { + let cls = ident.sym.as_ref(); + let field_specs: &[(&'static str, usize, &'static str, &'static str)] = match cls { + "ReadableStream" => &[ + ("start", 0, "readable_stream", "ReadableStream"), + ("pull", 0, "readable_stream", "ReadableStream"), + ], + "TransformStream" => &[ + ("transform", 1, "readable_stream", "ReadableStream"), + ("flush", 0, "readable_stream", "ReadableStream"), + ], + _ => &[], + }; + if !field_specs.is_empty() { + if let Some(args) = new_expr.args.as_ref() { + if let Some(first) = args.first() { + if let ast::Expr::Object(obj_lit) = first.expr.as_ref() { + for prop in &obj_lit.props { + if let ast::PropOrSpread::Prop(boxed_prop) = prop { + let mut handled = false; + match boxed_prop.as_ref() { + ast::Prop::KeyValue(kv) => { + let n = match &kv.key { + ast::PropName::Ident(i) => Some(i.sym.as_ref()), + ast::PropName::Str(s) => s.value.as_str(), + _ => None, + }; + if let Some(name) = n { + if let Some((_, idx, mod_name, class_name)) = field_specs.iter().find(|(f, _, _, _)| *f == name) { + let pat: Option<&ast::Pat> = match kv.value.as_ref() { + ast::Expr::Arrow(arrow) => arrow.params.get(*idx), + ast::Expr::Fn(fn_expr) => fn_expr.function.params.get(*idx).map(|p| &p.pat), + _ => None, + }; + if let Some(ast::Pat::Ident(pid)) = pat { + ctx.register_native_instance( + pid.id.sym.to_string(), + mod_name.to_string(), + class_name.to_string(), + ); + handled = true; + } + } + } + } + ast::Prop::Method(m) => { + let n = match &m.key { + ast::PropName::Ident(i) => Some(i.sym.as_ref()), + ast::PropName::Str(s) => s.value.as_str(), + _ => None, + }; + if let Some(name) = n { + if let Some((_, idx, mod_name, class_name)) = field_specs.iter().find(|(f, _, _, _)| *f == name) { + if let Some(param) = m.function.params.get(*idx) { + if let ast::Pat::Ident(pid) = ¶m.pat { + ctx.register_native_instance( + pid.id.sym.to_string(), + mod_name.to_string(), + class_name.to_string(), + ); + handled = true; + } + } + } + } + } + _ => {} + } + let _ = handled; + } + } + } + } + } + } + } + // Try to extract class name from callee match new_expr.callee.as_ref() { ast::Expr::Ident(ident) => { diff --git a/crates/perry-hir/src/lower_decl.rs b/crates/perry-hir/src/lower_decl.rs index 3a7f80bd..a45c6f7a 100644 --- a/crates/perry-hir/src/lower_decl.rs +++ b/crates/perry-hir/src/lower_decl.rs @@ -3058,6 +3058,116 @@ pub(crate) fn lower_body_stmt(ctx: &mut LoweringContext, stmt: &ast::Stmt) -> Re result.push(Stmt::Switch { discriminant, cases }); } ast::Stmt::ForOf(for_of_stmt) => { + // --- Issue #237: `for await (const c of )` --- + // Lower to a getReader/read loop so the body sees Uint8Array + // chunks. Detect by checking the iterable's registered native + // instance type. Falls through to the generic async-iterator + // path if not a ReadableStream. + if for_of_stmt.is_await { + let is_readable_stream = if let ast::Expr::Ident(ident) = &*for_of_stmt.right { + matches!( + ctx.lookup_native_instance(ident.sym.as_ref()), + Some((_, "ReadableStream")) + ) + } else { false }; + + if is_readable_stream { + let scope_mark = ctx.push_block_scope(); + let stream_expr = lower_expr(ctx, &for_of_stmt.right)?; + + // const __reader = stream.getReader(); + let reader_id = ctx.fresh_local(); + ctx.locals.push((format!("__reader_{}", reader_id), reader_id, Type::Any)); + ctx.register_native_instance( + format!("__reader_{}", reader_id), + "readable_stream_reader".to_string(), + "ReadableStreamDefaultReader".to_string(), + ); + result.push(Stmt::Let { + id: reader_id, + name: format!("__reader_{}", reader_id), + ty: Type::Any, + mutable: false, + init: Some(Expr::NativeMethodCall { + module: "readable_stream".to_string(), + class_name: Some("ReadableStream".to_string()), + object: Some(Box::new(stream_expr)), + method: "getReader".to_string(), + args: vec![], + }), + }); + + // let __res = await __reader.read(); + let res_id = ctx.fresh_local(); + ctx.locals.push((format!("__res_{}", res_id), res_id, Type::Any)); + let read_call = || Expr::Await(Box::new(Expr::NativeMethodCall { + module: "readable_stream_reader".to_string(), + class_name: Some("ReadableStreamDefaultReader".to_string()), + object: Some(Box::new(Expr::LocalGet(reader_id))), + method: "read".to_string(), + args: vec![], + })); + result.push(Stmt::Let { + id: res_id, + name: format!("__res_{}", res_id), + ty: Type::Any, + mutable: true, + init: Some(read_call()), + }); + + let item_name = if let ast::ForHead::VarDecl(var_decl) = &for_of_stmt.left { + if let Some(decl) = var_decl.decls.first() { + if let ast::Pat::Ident(ident) = &decl.name { + ident.id.sym.to_string() + } else { "__chunk".to_string() } + } else { "__chunk".to_string() } + } else { "__chunk".to_string() }; + let item_id = ctx.define_local(item_name.clone(), Type::Any); + + let mut body_stmts: Vec = Vec::new(); + body_stmts.push(Stmt::Let { + id: item_id, + name: item_name, + ty: Type::Any, + mutable: false, + init: Some(Expr::PropertyGet { + object: Box::new(Expr::LocalGet(res_id)), + property: "value".to_string(), + }), + }); + let user_body = lower_body_stmt(ctx, &for_of_stmt.body)?; + body_stmts.extend(user_body); + body_stmts.push(Stmt::Expr(Expr::LocalSet( + res_id, + Box::new(read_call()), + ))); + + result.push(Stmt::While { + condition: Expr::Unary { + op: UnaryOp::Not, + operand: Box::new(Expr::PropertyGet { + object: Box::new(Expr::LocalGet(res_id)), + property: "done".to_string(), + }), + }, + body: body_stmts, + }); + + // reader.releaseLock(); — best-effort cleanup so the + // stream stays usable after the loop body falls out. + result.push(Stmt::Expr(Expr::NativeMethodCall { + module: "readable_stream_reader".to_string(), + class_name: Some("ReadableStreamDefaultReader".to_string()), + object: Some(Box::new(Expr::LocalGet(reader_id))), + method: "releaseLock".to_string(), + args: vec![], + })); + + ctx.pop_block_scope(scope_mark); + return Ok(result); + } + } + // --- Iterator-protocol path for generator function calls --- // Detect: `for [await] (const x of genFunc(...))` where genFunc is // function* / async function*. Without this path the for-of falls diff --git a/crates/perry-stdlib/src/fetch.rs b/crates/perry-stdlib/src/fetch.rs index 27d7482d..f7b73877 100644 --- a/crates/perry-stdlib/src/fetch.rs +++ b/crates/perry-stdlib/src/fetch.rs @@ -1225,6 +1225,50 @@ pub unsafe extern "C" fn js_blob_slice( alloc_blob(BlobData { body: slice, content_type: new_type }) as f64 } +// ----------------- Web Streams bridge helpers (issue #237) ----------------- +// +// `streams.rs` reaches in here for the bytes backing `blob.stream()` and +// `response.body`. Going through these `pub(crate)` shims (rather than +// re-implementing the `BLOB_REGISTRY` / `FETCH_RESPONSES` lookups in +// `streams.rs`) keeps the registry types private to fetch.rs. + +/// Clone the bytes backing the given Blob handle. Returns `None` for an +/// unknown handle. +#[doc(hidden)] +pub fn blob_bytes_clone(blob_id: usize) -> Option> { + BLOB_REGISTRY.lock().unwrap().get(&blob_id).map(|b| b.body.clone()) +} + +/// Clone the body bytes of the given fetch Response handle. Returns +/// `None` for an unknown handle. +#[doc(hidden)] +pub fn response_bytes_clone(resp_id: usize) -> Option> { + FETCH_RESPONSES.lock().unwrap().get(&resp_id).map(|r| r.body.clone()) +} + +/// `blob.stream()` — returns a single-chunk ReadableStream handle (f64, +/// numeric registry id) over the blob's byte payload. Closes the stream +/// after the one chunk is delivered. +#[no_mangle] +pub unsafe extern "C" fn js_blob_stream(handle: f64) -> f64 { + let id = handle as usize; + let bytes = blob_bytes_clone(id).unwrap_or_default(); + crate::streams::alloc_readable_from_bytes(bytes) as f64 +} + +/// `response.body` — returns a single-chunk ReadableStream handle over +/// the buffered response body. Returns `null`-tagged f64 for an unknown +/// response handle (matching the spec's `Response.body: ReadableStream +/// | null`). +#[no_mangle] +pub unsafe extern "C" fn js_response_body(handle: f64) -> f64 { + let id = handle as usize; + match response_bytes_clone(id) { + Some(bytes) => crate::streams::alloc_readable_from_bytes(bytes) as f64, + None => f64::from_bits(TAG_NULL), + } +} + /// Response.json(value) — static method. Allocates a Response with JSON-stringified body /// and Content-Type: application/json. The value is passed as NaN-boxed JSValue bits (f64). #[no_mangle] diff --git a/crates/perry-stdlib/src/lib.rs b/crates/perry-stdlib/src/lib.rs index cac6b265..e5114ea0 100644 --- a/crates/perry-stdlib/src/lib.rs +++ b/crates/perry-stdlib/src/lib.rs @@ -75,6 +75,12 @@ pub mod axios; #[cfg(feature = "http-client")] pub use axios::*; +// === Web Streams API (issue #237) === +#[cfg(feature = "http-client")] +pub mod streams; +#[cfg(feature = "http-client")] +pub use streams::*; + // === WebSocket === #[cfg(feature = "websocket")] pub mod ws; diff --git a/crates/perry-stdlib/src/streams.rs b/crates/perry-stdlib/src/streams.rs new file mode 100644 index 00000000..f7ad1eb1 --- /dev/null +++ b/crates/perry-stdlib/src/streams.rs @@ -0,0 +1,1331 @@ +//! Web Streams API (issue #237). +//! +//! Implements `ReadableStream` / `WritableStream` / `TransformStream` plus +//! the matching default reader / writer pair, and the per-controller +//! enqueue / close / error / write / abort surface. Wires `blob.stream()` +//! and `response.body` so the consumers in the issue's acceptance +//! criteria all work end-to-end. +//! +//! Handles use the same numeric f64 ABI as `BLOB_REGISTRY` / +//! `FETCH_RESPONSES` (registry id cast to f64). Codegen's `module == +//! "readable_stream"` / `"reader"` / `"writable_stream"` / `"writer"` / +//! `"transform_stream"` arms in `lower_call.rs` route methods through +//! these FFIs. +//! +//! Buffered model: `blob.stream()` and `response.body` produce a +//! single-chunk readable stream over the body bytes that are already +//! resident in memory. True chunk-by-chunk streaming from +//! `reqwest::Response::chunk()` is a separate followup — the existing +//! fetch path eagerly buffers the whole response anyway, so the user- +//! visible contract is identical for the consumers we expose here. +//! +//! Stubs: BYOB readers, custom `QueuingStrategy` size functions, and +//! `ReadableStream.from(asyncIterable)` throw via +//! `js_streams_throw_not_implemented` — see the inline comment on each +//! site. + +use perry_runtime::{ + js_array_alloc, js_array_push, js_object_alloc, js_object_set_field, js_object_set_keys, + js_string_from_bytes, js_promise_new, js_promise_resolve, js_promise_reject, + js_closure_call0, js_closure_call1, js_closure_call2, + JSValue, ClosureHeader, Promise, +}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Mutex; + +const TAG_UNDEFINED: u64 = 0x7FFC_0000_0000_0001; +const TAG_NULL: u64 = 0x7FFC_0000_0000_0002; +const TAG_FALSE: u64 = 0x7FFC_0000_0000_0003; +const TAG_TRUE: u64 = 0x7FFC_0000_0000_0004; +const POINTER_TAG: u64 = 0x7FFD_0000_0000_0000; +const POINTER_MASK: u64 = 0x0000_FFFF_FFFF_FFFF; + +#[derive(Clone, Copy, PartialEq, Eq)] +enum ReadableState { + Readable, + Closed, + Errored, +} + +#[derive(Clone, Copy, PartialEq, Eq)] +#[allow(dead_code)] +enum WritableState { + Writable, + Closing, + Closed, + Errored, +} + +struct ReadableStreamData { + state: ReadableState, + /// Queued chunks as NaN-boxed pointers (typically Uint8Array via POINTER_TAG). + chunks: VecDeque, + /// FIFO of read() promises waiting for a chunk. + pending_reads: VecDeque<*mut Promise>, + start_cb: i64, + pull_cb: i64, + cancel_cb: i64, + high_water_mark: f64, + pulling: bool, + started: bool, + reader_handle: Option, + error_value: u64, + /// Per-controller cancel reason captured when `cancel()` is called. + canceled: bool, +} + +#[allow(dead_code)] +struct WritableStreamData { + state: WritableState, + write_cb: i64, + close_cb: i64, + abort_cb: i64, + /// Backlog of writes when `in_flight` is true. Reserved for the + /// async-write path tracked as a #237 followup; today every write + /// runs synchronously through the user `write` callback. + write_queue: VecDeque<(u64, *mut Promise)>, + in_flight: bool, + high_water_mark: f64, + writer_handle: Option, + error_value: u64, + /// Resolved when the stream becomes ready for more writes (i.e. queue drains). + ready_promise: *mut Promise, + /// Resolved when the stream finishes / rejects on error. + closed_promise: *mut Promise, +} + +struct TransformStreamData { + readable_handle: usize, + writable_handle: usize, + transform_cb: i64, + flush_cb: i64, +} + +struct ReaderData { + stream_handle: usize, + locked: bool, + closed_promise: *mut Promise, +} + +struct WriterData { + stream_handle: usize, + locked: bool, + closed_promise: *mut Promise, + ready_promise: *mut Promise, +} + +unsafe impl Send for ReadableStreamData {} +unsafe impl Send for WritableStreamData {} +unsafe impl Send for ReaderData {} +unsafe impl Send for WriterData {} + +lazy_static::lazy_static! { + static ref READABLE_STREAMS: Mutex> = Mutex::new(HashMap::new()); + static ref NEXT_RS_ID: Mutex = Mutex::new(1); + static ref WRITABLE_STREAMS: Mutex> = Mutex::new(HashMap::new()); + static ref NEXT_WS_ID: Mutex = Mutex::new(1); + static ref TRANSFORM_STREAMS: Mutex> = Mutex::new(HashMap::new()); + static ref NEXT_TS_ID: Mutex = Mutex::new(1); + static ref READERS: Mutex> = Mutex::new(HashMap::new()); + static ref NEXT_READER_ID: Mutex = Mutex::new(1); + static ref WRITERS: Mutex> = Mutex::new(HashMap::new()); + static ref NEXT_WRITER_ID: Mutex = Mutex::new(1); +} + +static GC_REGISTERED: std::sync::Once = std::sync::Once::new(); + +/// Register the streams GC root scanner once. Closures held by user- +/// supplied `start` / `pull` / `cancel` / `write` / `close` / `abort` / +/// `transform` / `flush` callbacks live in the registry maps below; the +/// runtime GC mark phase wouldn't see them otherwise and a sweep +/// between registration and dispatch would free the closure body. Same +/// shape as `ws.rs::ensure_gc_scanner_registered`. +fn ensure_gc_registered() { + GC_REGISTERED.call_once(|| { + perry_runtime::gc::gc_register_root_scanner(scan_stream_roots); + }); +} + +fn scan_stream_roots(mark: &mut dyn FnMut(f64)) { + let mark_cb = |cb: i64, mark: &mut dyn FnMut(f64)| { + if cb != 0 { + let boxed = f64::from_bits(POINTER_TAG | (cb as u64 & POINTER_MASK)); + mark(boxed); + } + }; + let mark_promise = |p: *mut Promise, mark: &mut dyn FnMut(f64)| { + let raw = p as i64; + if raw != 0 { + let boxed = f64::from_bits(POINTER_TAG | (raw as u64 & POINTER_MASK)); + mark(boxed); + } + }; + let mark_chunk = |bits: u64, mark: &mut dyn FnMut(f64)| { + let top = bits >> 48; + if top == 0x7FFD || top == 0x7FFF { + mark(f64::from_bits(bits)); + } + }; + + if let Ok(map) = READABLE_STREAMS.lock() { + for s in map.values() { + mark_cb(s.start_cb, mark); + mark_cb(s.pull_cb, mark); + mark_cb(s.cancel_cb, mark); + for &c in s.chunks.iter() { + mark_chunk(c, mark); + } + for &p in s.pending_reads.iter() { + mark_promise(p, mark); + } + if s.state == ReadableState::Errored { + mark_chunk(s.error_value, mark); + } + } + } + if let Ok(map) = WRITABLE_STREAMS.lock() { + for s in map.values() { + mark_cb(s.write_cb, mark); + mark_cb(s.close_cb, mark); + mark_cb(s.abort_cb, mark); + for (chunk, p) in s.write_queue.iter() { + mark_chunk(*chunk, mark); + mark_promise(*p, mark); + } + mark_promise(s.ready_promise, mark); + mark_promise(s.closed_promise, mark); + if s.state == WritableState::Errored { + mark_chunk(s.error_value, mark); + } + } + } + if let Ok(map) = TRANSFORM_STREAMS.lock() { + for t in map.values() { + mark_cb(t.transform_cb, mark); + mark_cb(t.flush_cb, mark); + } + } + if let Ok(map) = READERS.lock() { + for r in map.values() { + mark_promise(r.closed_promise, mark); + } + } + if let Ok(map) = WRITERS.lock() { + for w in map.values() { + mark_promise(w.closed_promise, mark); + mark_promise(w.ready_promise, mark); + } + } +} + +// ───────────────────────────────────────────────────────────────────── +// Helpers +// ───────────────────────────────────────────────────────────────────── + +fn next_id(slot: &Mutex) -> usize { + let mut guard = slot.lock().unwrap(); + let id = *guard; + *guard += 1; + id +} + +unsafe fn closure_from_bits(bits: u64) -> i64 { + if bits == TAG_UNDEFINED || bits == TAG_NULL || bits == 0 { + return 0; + } + let top = bits >> 48; + if top >= 0x7FF8 { + (bits & POINTER_MASK) as i64 + } else { + 0 + } +} + +unsafe fn build_iter_result(value_bits: u64, done: bool) -> u64 { + let obj = js_object_alloc(0, 2); + let keys = js_array_alloc(2); + let k_value = js_string_from_bytes(b"value".as_ptr(), 5); + let k_done = js_string_from_bytes(b"done".as_ptr(), 4); + js_array_push(keys, JSValue::string_ptr(k_value)); + js_array_push(keys, JSValue::string_ptr(k_done)); + js_object_set_field(obj, 0, JSValue::from_bits(value_bits)); + let done_bits = if done { TAG_TRUE } else { TAG_FALSE }; + js_object_set_field(obj, 1, JSValue::from_bits(done_bits)); + js_object_set_keys(obj, keys); + JSValue::object_ptr(obj as *mut u8).bits() +} + +unsafe fn alloc_uint8array_from_bytes(bytes: &[u8]) -> u64 { + let buf = perry_runtime::buffer::buffer_alloc(bytes.len() as u32); + (*buf).length = bytes.len() as u32; + if !bytes.is_empty() { + std::ptr::copy_nonoverlapping( + bytes.as_ptr(), + perry_runtime::buffer::buffer_data_mut(buf), + bytes.len(), + ); + } + JSValue::object_ptr(buf as *mut u8).bits() +} + +unsafe fn read_bytes_from_chunk(chunk_bits: u64) -> Option> { + let top = chunk_bits >> 48; + if top != 0x7FFD { + return None; + } + let ptr = (chunk_bits & POINTER_MASK) as *mut perry_runtime::buffer::BufferHeader; + if ptr.is_null() { + return None; + } + let len = (*ptr).length as usize; + let data = perry_runtime::buffer::buffer_data_mut(ptr) as *const u8; + Some(std::slice::from_raw_parts(data, len).to_vec()) +} + +unsafe fn make_error_with_message(msg: &str) -> u64 { + let s = js_string_from_bytes(msg.as_ptr(), msg.len() as u32); + let err = perry_runtime::error::js_error_new_with_message(s); + JSValue::pointer(err as *const u8).bits() +} + +fn alloc_readable(start_cb: i64, pull_cb: i64, cancel_cb: i64, hwm: f64) -> usize { + let id = next_id(&NEXT_RS_ID); + READABLE_STREAMS.lock().unwrap().insert( + id, + ReadableStreamData { + state: ReadableState::Readable, + chunks: VecDeque::new(), + pending_reads: VecDeque::new(), + start_cb, + pull_cb, + cancel_cb, + high_water_mark: if hwm.is_nan() || hwm <= 0.0 { 1.0 } else { hwm }, + pulling: false, + started: false, + reader_handle: None, + error_value: 0, + canceled: false, + }, + ); + id +} + +fn alloc_writable(write_cb: i64, close_cb: i64, abort_cb: i64, hwm: f64) -> usize { + let id = next_id(&NEXT_WS_ID); + let ready = unsafe { js_promise_new() }; + let closed = unsafe { js_promise_new() }; + unsafe { + js_promise_resolve(ready, f64::from_bits(TAG_UNDEFINED)); + } + WRITABLE_STREAMS.lock().unwrap().insert( + id, + WritableStreamData { + state: WritableState::Writable, + write_cb, + close_cb, + abort_cb, + write_queue: VecDeque::new(), + in_flight: false, + high_water_mark: if hwm.is_nan() || hwm <= 0.0 { 1.0 } else { hwm }, + writer_handle: None, + error_value: 0, + ready_promise: ready, + closed_promise: closed, + }, + ); + id +} + +unsafe fn invoke_start(stream_id: usize) { + let (cb, controller) = { + let mut g = READABLE_STREAMS.lock().unwrap(); + match g.get_mut(&stream_id) { + Some(s) if !s.started => { + s.started = true; + (s.start_cb, stream_id as f64) + } + _ => return, + } + }; + if cb != 0 { + js_closure_call1(cb as *const ClosureHeader, controller); + } +} + +unsafe fn maybe_pull(stream_id: usize) { + let (cb, controller, should_pull) = { + let mut g = READABLE_STREAMS.lock().unwrap(); + match g.get_mut(&stream_id) { + Some(s) if s.state == ReadableState::Readable && !s.pulling && s.started => { + let need = s.chunks.is_empty() || (s.chunks.len() as f64) < s.high_water_mark; + if need && s.pull_cb != 0 { + s.pulling = true; + (s.pull_cb, stream_id as f64, true) + } else { + (0, 0.0, false) + } + } + _ => (0, 0.0, false), + } + }; + if !should_pull { + return; + } + js_closure_call1(cb as *const ClosureHeader, controller); + if let Some(s) = READABLE_STREAMS.lock().unwrap().get_mut(&stream_id) { + s.pulling = false; + } +} + +unsafe fn close_pending(stream_id: usize) { + let promises: Vec<*mut Promise> = { + let mut g = READABLE_STREAMS.lock().unwrap(); + match g.get_mut(&stream_id) { + Some(s) => s.pending_reads.drain(..).collect(), + None => Vec::new(), + } + }; + for p in promises { + let result = build_iter_result(TAG_UNDEFINED, true); + js_promise_resolve(p, f64::from_bits(result)); + } +} + +unsafe fn error_pending(stream_id: usize, reason_bits: u64) { + let promises: Vec<*mut Promise> = { + let mut g = READABLE_STREAMS.lock().unwrap(); + match g.get_mut(&stream_id) { + Some(s) => s.pending_reads.drain(..).collect(), + None => Vec::new(), + } + }; + for p in promises { + js_promise_reject(p, f64::from_bits(reason_bits)); + } +} + +// ───────────────────────────────────────────────────────────────────── +// ReadableStream FFI +// ───────────────────────────────────────────────────────────────────── + +/// `new ReadableStream({ start, pull, cancel })` — `start_cb` / `pull_cb` +/// / `cancel_cb` are NaN-boxed `*ClosureHeader` bits (or undefined). The +/// new stream's controller is the stream handle itself; user code calls +/// `controller.enqueue(c)` etc. to drive it. +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_new( + start_bits: f64, + pull_bits: f64, + cancel_bits: f64, + hwm: f64, +) -> f64 { + ensure_gc_registered(); + let id = alloc_readable( + closure_from_bits(start_bits.to_bits()), + closure_from_bits(pull_bits.to_bits()), + closure_from_bits(cancel_bits.to_bits()), + hwm, + ); + invoke_start(id); + maybe_pull(id); + id as f64 +} + +/// Internal helper: build a single-chunk readable stream from an owned +/// byte buffer. Used by `blob.stream()` and `response.body`. +pub fn alloc_readable_from_bytes(bytes: Vec) -> usize { + ensure_gc_registered(); + let id = alloc_readable(0, 0, 0, 1.0); + unsafe { + let chunk_bits = alloc_uint8array_from_bytes(&bytes); + let mut g = READABLE_STREAMS.lock().unwrap(); + if let Some(s) = g.get_mut(&id) { + s.started = true; + if !bytes.is_empty() { + s.chunks.push_back(chunk_bits); + } + s.state = ReadableState::Closed; + } + } + id +} + +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_get_reader(stream_handle: f64) -> f64 { + ensure_gc_registered(); + let id = stream_handle as usize; + { + let mut g = READABLE_STREAMS.lock().unwrap(); + let s = match g.get_mut(&id) { + Some(s) => s, + None => return f64::from_bits(TAG_UNDEFINED), + }; + if s.reader_handle.is_some() { + return f64::from_bits(TAG_UNDEFINED); + } + let reader_id = next_id(&NEXT_READER_ID); + let closed_p = js_promise_new(); + if s.state == ReadableState::Closed { + js_promise_resolve(closed_p, f64::from_bits(TAG_UNDEFINED)); + } else if s.state == ReadableState::Errored { + js_promise_reject(closed_p, f64::from_bits(s.error_value)); + } + s.reader_handle = Some(reader_id); + READERS.lock().unwrap().insert( + reader_id, + ReaderData { + stream_handle: id, + locked: true, + closed_promise: closed_p, + }, + ); + reader_id as f64 + } +} + +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_locked(stream_handle: f64) -> f64 { + let id = stream_handle as usize; + let g = READABLE_STREAMS.lock().unwrap(); + let locked = g.get(&id).map(|s| s.reader_handle.is_some()).unwrap_or(false); + f64::from_bits(if locked { TAG_TRUE } else { TAG_FALSE }) +} + +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_cancel(stream_handle: f64, reason: f64) -> *mut Promise { + let promise = js_promise_new(); + let id = stream_handle as usize; + let cb = { + let mut g = READABLE_STREAMS.lock().unwrap(); + match g.get_mut(&id) { + Some(s) => { + s.canceled = true; + s.state = ReadableState::Closed; + s.chunks.clear(); + s.cancel_cb + } + None => 0, + } + }; + if cb != 0 { + js_closure_call1(cb as *const ClosureHeader, reason); + } + close_pending(id); + js_promise_resolve(promise, f64::from_bits(TAG_UNDEFINED)); + promise +} + +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_from_blob(blob_id: f64) -> f64 { + let bytes = crate::fetch::blob_bytes_clone(blob_id as usize).unwrap_or_default(); + alloc_readable_from_bytes(bytes) as f64 +} + +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_from_response(resp_id: f64) -> f64 { + let bytes = crate::fetch::response_bytes_clone(resp_id as usize).unwrap_or_default(); + alloc_readable_from_bytes(bytes) as f64 +} + +// `ReadableStream.from(asyncIterable)` — deferred (issue #237 followup). +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_from_iterable(_value: f64) -> f64 { + let err = make_error_with_message( + "ReadableStream.from(asyncIterable) is not yet implemented (issue #237 followup)", + ); + perry_runtime::exception::js_throw(f64::from_bits(err)); +} + +// ───────────────────────────────────────────────────────────────────── +// ReadableStreamDefaultController FFI (controller is the stream handle) +// ───────────────────────────────────────────────────────────────────── + +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_controller_enqueue( + stream_handle: f64, + chunk: f64, +) -> f64 { + let id = stream_handle as usize; + let chunk_bits = chunk.to_bits(); + let popped = { + let mut g = READABLE_STREAMS.lock().unwrap(); + match g.get_mut(&id) { + Some(s) if s.state == ReadableState::Readable => { + if let Some(p) = s.pending_reads.pop_front() { + Some(p) + } else { + s.chunks.push_back(chunk_bits); + None + } + } + _ => return f64::from_bits(TAG_UNDEFINED), + } + }; + if let Some(p) = popped { + let result = build_iter_result(chunk_bits, false); + js_promise_resolve(p, f64::from_bits(result)); + } + f64::from_bits(TAG_UNDEFINED) +} + +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_controller_close(stream_handle: f64) -> f64 { + let id = stream_handle as usize; + { + let mut g = READABLE_STREAMS.lock().unwrap(); + if let Some(s) = g.get_mut(&id) { + if s.state == ReadableState::Readable { + s.state = ReadableState::Closed; + } + } + } + // Reader.closed promise resolves when stream closes and queue empties. + let (queue_empty, reader_id) = { + let g = READABLE_STREAMS.lock().unwrap(); + match g.get(&id) { + Some(s) => (s.chunks.is_empty(), s.reader_handle), + None => (true, None), + } + }; + if queue_empty { + if let Some(rid) = reader_id { + let p = READERS.lock().unwrap().get(&rid).map(|r| r.closed_promise); + if let Some(p) = p { + js_promise_resolve(p, f64::from_bits(TAG_UNDEFINED)); + } + } + close_pending(id); + } + f64::from_bits(TAG_UNDEFINED) +} + +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_controller_error(stream_handle: f64, reason: f64) -> f64 { + let id = stream_handle as usize; + let reason_bits = reason.to_bits(); + let reader_id = { + let mut g = READABLE_STREAMS.lock().unwrap(); + match g.get_mut(&id) { + Some(s) => { + s.state = ReadableState::Errored; + s.error_value = reason_bits; + s.chunks.clear(); + s.reader_handle + } + None => return f64::from_bits(TAG_UNDEFINED), + } + }; + error_pending(id, reason_bits); + if let Some(rid) = reader_id { + let p = READERS.lock().unwrap().get(&rid).map(|r| r.closed_promise); + if let Some(p) = p { + js_promise_reject(p, reason); + } + } + f64::from_bits(TAG_UNDEFINED) +} + +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_controller_desired_size(stream_handle: f64) -> f64 { + let id = stream_handle as usize; + let g = READABLE_STREAMS.lock().unwrap(); + match g.get(&id) { + Some(s) if s.state == ReadableState::Readable => { + (s.high_water_mark - s.chunks.len() as f64).max(0.0) + } + Some(s) if s.state == ReadableState::Errored => f64::NAN, + _ => 0.0, + } +} + +// ───────────────────────────────────────────────────────────────────── +// ReadableStreamDefaultReader FFI +// ───────────────────────────────────────────────────────────────────── + +#[no_mangle] +pub unsafe extern "C" fn js_reader_read(reader_handle: f64) -> *mut Promise { + let promise = js_promise_new(); + let reader_id = reader_handle as usize; + let stream_id = match READERS.lock().unwrap().get(&reader_id) { + Some(r) if r.locked => r.stream_handle, + Some(_) => { + let err = make_error_with_message("Reader is no longer locked to a stream"); + js_promise_reject(promise, f64::from_bits(err)); + return promise; + } + None => { + let err = make_error_with_message("Invalid reader"); + js_promise_reject(promise, f64::from_bits(err)); + return promise; + } + }; + let outcome: Option<(u64, bool, bool)> = { + let mut g = READABLE_STREAMS.lock().unwrap(); + match g.get_mut(&stream_id) { + Some(s) => { + if let Some(c) = s.chunks.pop_front() { + Some((c, false, false)) + } else if s.state == ReadableState::Closed { + Some((TAG_UNDEFINED, true, false)) + } else if s.state == ReadableState::Errored { + Some((s.error_value, false, true)) + } else { + s.pending_reads.push_back(promise); + None + } + } + None => Some((TAG_UNDEFINED, true, false)), + } + }; + match outcome { + Some((value, _, true)) => { + js_promise_reject(promise, f64::from_bits(value)); + } + Some((value, done, false)) => { + let result = build_iter_result(value, done); + js_promise_resolve(promise, f64::from_bits(result)); + } + None => {} + } + maybe_pull(stream_id); + promise +} + +#[no_mangle] +pub unsafe extern "C" fn js_reader_release_lock(reader_handle: f64) -> f64 { + let reader_id = reader_handle as usize; + let stream_id = { + let mut g = READERS.lock().unwrap(); + match g.get_mut(&reader_id) { + Some(r) => { + r.locked = false; + r.stream_handle + } + None => return f64::from_bits(TAG_UNDEFINED), + } + }; + if let Some(s) = READABLE_STREAMS.lock().unwrap().get_mut(&stream_id) { + s.reader_handle = None; + } + f64::from_bits(TAG_UNDEFINED) +} + +#[no_mangle] +pub unsafe extern "C" fn js_reader_closed(reader_handle: f64) -> *mut Promise { + let reader_id = reader_handle as usize; + match READERS.lock().unwrap().get(&reader_id) { + Some(r) => r.closed_promise, + None => { + let p = js_promise_new(); + js_promise_resolve(p, f64::from_bits(TAG_UNDEFINED)); + p + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn js_reader_cancel(reader_handle: f64, reason: f64) -> *mut Promise { + let reader_id = reader_handle as usize; + let stream_id = match READERS.lock().unwrap().get(&reader_id) { + Some(r) => r.stream_handle, + None => { + let p = js_promise_new(); + js_promise_resolve(p, f64::from_bits(TAG_UNDEFINED)); + return p; + } + }; + js_readable_stream_cancel(stream_id as f64, reason) +} + +// ───────────────────────────────────────────────────────────────────── +// tee / pipeTo / pipeThrough +// ───────────────────────────────────────────────────────────────────── + +/// `stream.tee()` — returns an array of two new ReadableStreams. Both +/// branches drain the SOURCE eagerly into separate per-branch queues at +/// tee time. This is correct for the buffered consumers Perry exposes +/// (`blob.stream()` / `response.body` are pre-buffered) and the "user +/// source already enqueued everything synchronously in start" pattern. +/// Streams that lazily produce chunks via `pull` after tee will only see +/// chunks present at the tee call — the same trade-off Node's +/// `Readable.from([...]).tee()` makes for already-buffered iterables. +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_tee(stream_handle: f64) -> f64 { + let id = stream_handle as usize; + let chunks: Vec = { + let mut g = READABLE_STREAMS.lock().unwrap(); + match g.get_mut(&id) { + Some(s) if s.reader_handle.is_none() => { + let drained: Vec = s.chunks.drain(..).collect(); + s.state = ReadableState::Closed; + drained + } + _ => Vec::new(), + } + }; + + let id_a = next_id(&NEXT_RS_ID); + let id_b = next_id(&NEXT_RS_ID); + { + let mut g = READABLE_STREAMS.lock().unwrap(); + for new_id in [id_a, id_b] { + g.insert( + new_id, + ReadableStreamData { + state: ReadableState::Closed, + chunks: chunks.iter().copied().collect(), + pending_reads: VecDeque::new(), + start_cb: 0, + pull_cb: 0, + cancel_cb: 0, + high_water_mark: 1.0, + pulling: false, + started: true, + reader_handle: None, + error_value: 0, + canceled: false, + }, + ); + } + } + + let arr = js_array_alloc(2); + js_array_push(arr, JSValue::from_bits(f64::to_bits(id_a as f64))); + js_array_push(arr, JSValue::from_bits(f64::to_bits(id_b as f64))); + f64::from_bits(JSValue::object_ptr(arr as *mut u8).bits()) +} + +/// `readable.pipeTo(writable)` — drives the readable into the writable +/// synchronously chunk-by-chunk. Returns a Promise that resolves when +/// the writable closes cleanly, or rejects on error. Synchronous because +/// our buffered model has all bytes resident already; an async loop here +/// would just queue tasks against an empty event loop. +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_pipe_to( + readable_handle: f64, + writable_handle: f64, +) -> *mut Promise { + let promise = js_promise_new(); + let r_id = readable_handle as usize; + let w_id = writable_handle as usize; + + loop { + let chunk_or_done: Result = { + let mut g = READABLE_STREAMS.lock().unwrap(); + match g.get_mut(&r_id) { + Some(s) => { + if let Some(c) = s.chunks.pop_front() { + Ok(c) + } else if s.state == ReadableState::Closed { + Err(true) + } else if s.state == ReadableState::Errored { + let e = s.error_value; + js_promise_reject(promise, f64::from_bits(e)); + return promise; + } else { + Err(true) + } + } + None => Err(true), + } + }; + match chunk_or_done { + Ok(chunk) => { + // TransformStream's writable side has write_cb=0 — route + // through transform_write so the user transform fn runs. + if TRANSFORM_PAIRS.lock().unwrap().contains_key(&w_id) { + let _ = transform_write(w_id, f64::from_bits(chunk)); + } else { + let write_cb = WRITABLE_STREAMS + .lock() + .unwrap() + .get(&w_id) + .map(|w| w.write_cb) + .unwrap_or(0); + if write_cb != 0 { + js_closure_call1(write_cb as *const ClosureHeader, f64::from_bits(chunk)); + } + } + } + Err(_done) => break, + } + } + + // Close downstream — TransformStream routes through transform_close + // so flush_cb runs and the readable side is closed. + if TRANSFORM_PAIRS.lock().unwrap().contains_key(&w_id) { + let _ = transform_close(w_id); + } else { + let close_cb = WRITABLE_STREAMS + .lock() + .unwrap() + .get(&w_id) + .map(|w| w.close_cb) + .unwrap_or(0); + if close_cb != 0 { + js_closure_call0(close_cb as *const ClosureHeader); + } + if let Some(w) = WRITABLE_STREAMS.lock().unwrap().get_mut(&w_id) { + w.state = WritableState::Closed; + let cp = w.closed_promise; + js_promise_resolve(cp, f64::from_bits(TAG_UNDEFINED)); + } + } + js_promise_resolve(promise, f64::from_bits(TAG_UNDEFINED)); + promise +} + +/// `readable.pipeThrough({readable, writable})` — pipeTo into the +/// transform's writable side, return its readable side. Caller already +/// destructured the TransformStream into its readable / writable +/// handles. +#[no_mangle] +pub unsafe extern "C" fn js_readable_stream_pipe_through( + readable_handle: f64, + transform_writable_handle: f64, + transform_readable_handle: f64, +) -> f64 { + let _ = js_readable_stream_pipe_to(readable_handle, transform_writable_handle); + transform_readable_handle +} + +// ───────────────────────────────────────────────────────────────────── +// WritableStream FFI +// ───────────────────────────────────────────────────────────────────── + +#[no_mangle] +pub unsafe extern "C" fn js_writable_stream_new( + write_bits: f64, + close_bits: f64, + abort_bits: f64, + hwm: f64, +) -> f64 { + ensure_gc_registered(); + let id = alloc_writable( + closure_from_bits(write_bits.to_bits()), + closure_from_bits(close_bits.to_bits()), + closure_from_bits(abort_bits.to_bits()), + hwm, + ); + id as f64 +} + +#[no_mangle] +pub unsafe extern "C" fn js_writable_stream_get_writer(stream_handle: f64) -> f64 { + ensure_gc_registered(); + let id = stream_handle as usize; + let mut g = WRITABLE_STREAMS.lock().unwrap(); + let s = match g.get_mut(&id) { + Some(s) => s, + None => return f64::from_bits(TAG_UNDEFINED), + }; + if s.writer_handle.is_some() { + return f64::from_bits(TAG_UNDEFINED); + } + let writer_id = next_id(&NEXT_WRITER_ID); + s.writer_handle = Some(writer_id); + let closed_p = s.closed_promise; + let ready_p = s.ready_promise; + drop(g); + WRITERS.lock().unwrap().insert( + writer_id, + WriterData { + stream_handle: id, + locked: true, + closed_promise: closed_p, + ready_promise: ready_p, + }, + ); + writer_id as f64 +} + +#[no_mangle] +pub unsafe extern "C" fn js_writable_stream_locked(stream_handle: f64) -> f64 { + let id = stream_handle as usize; + let g = WRITABLE_STREAMS.lock().unwrap(); + let locked = g.get(&id).map(|s| s.writer_handle.is_some()).unwrap_or(false); + f64::from_bits(if locked { TAG_TRUE } else { TAG_FALSE }) +} + +#[no_mangle] +pub unsafe extern "C" fn js_writable_stream_close(stream_handle: f64) -> *mut Promise { + let promise = js_promise_new(); + let id = stream_handle as usize; + let (cb, closed_p) = { + let mut g = WRITABLE_STREAMS.lock().unwrap(); + match g.get_mut(&id) { + Some(s) => { + s.state = WritableState::Closed; + (s.close_cb, s.closed_promise) + } + None => (0, std::ptr::null_mut()), + } + }; + if cb != 0 { + js_closure_call0(cb as *const ClosureHeader); + } + if !closed_p.is_null() { + js_promise_resolve(closed_p, f64::from_bits(TAG_UNDEFINED)); + } + js_promise_resolve(promise, f64::from_bits(TAG_UNDEFINED)); + promise +} + +#[no_mangle] +pub unsafe extern "C" fn js_writable_stream_abort(stream_handle: f64, reason: f64) -> *mut Promise { + let promise = js_promise_new(); + let id = stream_handle as usize; + let reason_bits = reason.to_bits(); + let (cb, closed_p) = { + let mut g = WRITABLE_STREAMS.lock().unwrap(); + match g.get_mut(&id) { + Some(s) => { + s.state = WritableState::Errored; + s.error_value = reason_bits; + (s.abort_cb, s.closed_promise) + } + None => (0, std::ptr::null_mut()), + } + }; + if cb != 0 { + js_closure_call1(cb as *const ClosureHeader, reason); + } + if !closed_p.is_null() { + js_promise_reject(closed_p, reason); + } + js_promise_resolve(promise, f64::from_bits(TAG_UNDEFINED)); + promise +} + +// ───────────────────────────────────────────────────────────────────── +// WritableStreamDefaultWriter FFI +// ───────────────────────────────────────────────────────────────────── + +#[no_mangle] +pub unsafe extern "C" fn js_writer_write(writer_handle: f64, chunk: f64) -> *mut Promise { + let promise = js_promise_new(); + let writer_id = writer_handle as usize; + let stream_id = match WRITERS.lock().unwrap().get(&writer_id) { + Some(w) if w.locked => w.stream_handle, + _ => { + let err = make_error_with_message("Writer is no longer locked to a stream"); + js_promise_reject(promise, f64::from_bits(err)); + return promise; + } + }; + if TRANSFORM_PAIRS.lock().unwrap().contains_key(&stream_id) { + return transform_write(stream_id, chunk); + } + let cb = match WRITABLE_STREAMS.lock().unwrap().get(&stream_id) { + Some(s) if s.state == WritableState::Writable => s.write_cb, + Some(s) if s.state == WritableState::Errored => { + let e = s.error_value; + js_promise_reject(promise, f64::from_bits(e)); + return promise; + } + _ => { + let err = make_error_with_message("Stream is closed or closing"); + js_promise_reject(promise, f64::from_bits(err)); + return promise; + } + }; + if cb != 0 { + js_closure_call1(cb as *const ClosureHeader, chunk); + } + js_promise_resolve(promise, f64::from_bits(TAG_UNDEFINED)); + promise +} + +#[no_mangle] +pub unsafe extern "C" fn js_writer_close(writer_handle: f64) -> *mut Promise { + let writer_id = writer_handle as usize; + let stream_id = match WRITERS.lock().unwrap().get(&writer_id) { + Some(w) => w.stream_handle, + None => { + let p = js_promise_new(); + js_promise_resolve(p, f64::from_bits(TAG_UNDEFINED)); + return p; + } + }; + if TRANSFORM_PAIRS.lock().unwrap().contains_key(&stream_id) { + return transform_close(stream_id); + } + js_writable_stream_close(stream_id as f64) +} + +#[no_mangle] +pub unsafe extern "C" fn js_writer_abort(writer_handle: f64, reason: f64) -> *mut Promise { + let writer_id = writer_handle as usize; + let stream_id = match WRITERS.lock().unwrap().get(&writer_id) { + Some(w) => w.stream_handle, + None => { + let p = js_promise_new(); + js_promise_resolve(p, f64::from_bits(TAG_UNDEFINED)); + return p; + } + }; + js_writable_stream_abort(stream_id as f64, reason) +} + +#[no_mangle] +pub unsafe extern "C" fn js_writer_release_lock(writer_handle: f64) -> f64 { + let writer_id = writer_handle as usize; + let stream_id = { + let mut g = WRITERS.lock().unwrap(); + match g.get_mut(&writer_id) { + Some(w) => { + w.locked = false; + w.stream_handle + } + None => return f64::from_bits(TAG_UNDEFINED), + } + }; + if let Some(s) = WRITABLE_STREAMS.lock().unwrap().get_mut(&stream_id) { + s.writer_handle = None; + } + f64::from_bits(TAG_UNDEFINED) +} + +#[no_mangle] +pub unsafe extern "C" fn js_writer_closed(writer_handle: f64) -> *mut Promise { + let writer_id = writer_handle as usize; + match WRITERS.lock().unwrap().get(&writer_id) { + Some(w) => w.closed_promise, + None => { + let p = js_promise_new(); + js_promise_resolve(p, f64::from_bits(TAG_UNDEFINED)); + p + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn js_writer_ready(writer_handle: f64) -> *mut Promise { + let writer_id = writer_handle as usize; + match WRITERS.lock().unwrap().get(&writer_id) { + Some(w) => w.ready_promise, + None => { + let p = js_promise_new(); + js_promise_resolve(p, f64::from_bits(TAG_UNDEFINED)); + p + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn js_writer_desired_size(writer_handle: f64) -> f64 { + let writer_id = writer_handle as usize; + let stream_id = match WRITERS.lock().unwrap().get(&writer_id) { + Some(w) => w.stream_handle, + None => return 0.0, + }; + let g = WRITABLE_STREAMS.lock().unwrap(); + match g.get(&stream_id) { + Some(s) if s.state == WritableState::Writable => s.high_water_mark, + Some(s) if s.state == WritableState::Errored => f64::NAN, + _ => 0.0, + } +} + +// ───────────────────────────────────────────────────────────────────── +// TransformStream FFI +// ───────────────────────────────────────────────────────────────────── + +#[no_mangle] +pub unsafe extern "C" fn js_transform_stream_new( + transform_bits: f64, + flush_bits: f64, + hwm: f64, +) -> f64 { + ensure_gc_registered(); + let transform_cb = closure_from_bits(transform_bits.to_bits()); + let flush_cb = closure_from_bits(flush_bits.to_bits()); + + // Allocate the readable side empty (controller is its own handle). + let readable_id = alloc_readable(0, 0, 0, hwm); + { + let mut g = READABLE_STREAMS.lock().unwrap(); + if let Some(s) = g.get_mut(&readable_id) { + s.started = true; + } + } + + // Allocate writable side; its write_cb is synthesized via the + // dispatcher table below to invoke transform(chunk, controller). + let writable_id = next_id(&NEXT_WS_ID); + let ready = js_promise_new(); + let closed = js_promise_new(); + js_promise_resolve(ready, f64::from_bits(TAG_UNDEFINED)); + WRITABLE_STREAMS.lock().unwrap().insert( + writable_id, + WritableStreamData { + state: WritableState::Writable, + // Sentinel: write_cb=0, close_cb=0 — the dispatcher checks + // TRANSFORM_PAIRS first and routes through the user transform_cb / + // flush_cb instead. + write_cb: 0, + close_cb: 0, + abort_cb: 0, + write_queue: VecDeque::new(), + in_flight: false, + high_water_mark: if hwm.is_nan() || hwm <= 0.0 { 1.0 } else { hwm }, + writer_handle: None, + error_value: 0, + ready_promise: ready, + closed_promise: closed, + }, + ); + + let id = next_id(&NEXT_TS_ID); + TRANSFORM_STREAMS.lock().unwrap().insert( + id, + TransformStreamData { + readable_handle: readable_id, + writable_handle: writable_id, + transform_cb, + flush_cb, + }, + ); + TRANSFORM_PAIRS + .lock() + .unwrap() + .insert(writable_id, id); + id as f64 +} + +#[no_mangle] +pub unsafe extern "C" fn js_transform_stream_readable(handle: f64) -> f64 { + let id = handle as usize; + TRANSFORM_STREAMS + .lock() + .unwrap() + .get(&id) + .map(|t| t.readable_handle as f64) + .unwrap_or(f64::from_bits(TAG_UNDEFINED)) +} + +#[no_mangle] +pub unsafe extern "C" fn js_transform_stream_writable(handle: f64) -> f64 { + let id = handle as usize; + TRANSFORM_STREAMS + .lock() + .unwrap() + .get(&id) + .map(|t| t.writable_handle as f64) + .unwrap_or(f64::from_bits(TAG_UNDEFINED)) +} + +lazy_static::lazy_static! { + static ref TRANSFORM_PAIRS: Mutex> = Mutex::new(HashMap::new()); +} + +/// Replacement `writer.write` for the writable side of a TransformStream +/// — invokes the user transform with (chunk, transformController) where +/// the transformController is the readable-side stream handle (so +/// `controller.enqueue(...)` reuses the readable controller path). +unsafe fn transform_write(writable_id: usize, chunk: f64) -> *mut Promise { + let promise = js_promise_new(); + let (transform_cb, readable_id) = { + let pairs = TRANSFORM_PAIRS.lock().unwrap(); + match pairs.get(&writable_id) { + Some(t_id) => { + let g = TRANSFORM_STREAMS.lock().unwrap(); + match g.get(t_id) { + Some(t) => (t.transform_cb, t.readable_handle), + None => (0, 0), + } + } + None => (0, 0), + } + }; + if transform_cb != 0 && readable_id != 0 { + js_closure_call2(transform_cb as *const ClosureHeader, chunk, readable_id as f64); + } else { + // Identity transform — pass-through. + js_readable_stream_controller_enqueue(readable_id as f64, chunk); + } + js_promise_resolve(promise, f64::from_bits(TAG_UNDEFINED)); + promise +} + +unsafe fn transform_close(writable_id: usize) -> *mut Promise { + let promise = js_promise_new(); + let (flush_cb, readable_id) = { + let pairs = TRANSFORM_PAIRS.lock().unwrap(); + match pairs.get(&writable_id) { + Some(t_id) => { + let g = TRANSFORM_STREAMS.lock().unwrap(); + match g.get(t_id) { + Some(t) => (t.flush_cb, t.readable_handle), + None => (0, 0), + } + } + None => (0, 0), + } + }; + if flush_cb != 0 && readable_id != 0 { + js_closure_call1(flush_cb as *const ClosureHeader, readable_id as f64); + } + if readable_id != 0 { + js_readable_stream_controller_close(readable_id as f64); + } + if let Some(s) = WRITABLE_STREAMS.lock().unwrap().get_mut(&writable_id) { + s.state = WritableState::Closed; + let cp = s.closed_promise; + js_promise_resolve(cp, f64::from_bits(TAG_UNDEFINED)); + } + js_promise_resolve(promise, f64::from_bits(TAG_UNDEFINED)); + promise +} + +// ───────────────────────────────────────────────────────────────────── +// Stubs for deferred surface (issue #237 followups) +// ───────────────────────────────────────────────────────────────────── + +#[no_mangle] +pub unsafe extern "C" fn js_streams_throw_byob_not_implemented() -> f64 { + let err = make_error_with_message( + "BYOB readers are not yet implemented (issue #237 followup)", + ); + perry_runtime::exception::js_throw(f64::from_bits(err)); +} + +#[no_mangle] +pub unsafe extern "C" fn js_streams_throw_byte_length_not_implemented() -> f64 { + let err = make_error_with_message( + "ByteLengthQueuingStrategy is not yet implemented (issue #237 followup)", + ); + perry_runtime::exception::js_throw(f64::from_bits(err)); +} + +// ───────────────────────────────────────────────────────────────────── +// Public helpers used by other crates / tests +// ───────────────────────────────────────────────────────────────────── + +/// Read every queued chunk into a Vec, draining the stream. Used by +/// `new Response(stream)` / `new Request(url, { body: stream })` — we +/// drain the buffered chunks at construction time so the resulting +/// Response.body bytes match what a real serializer would produce. +#[doc(hidden)] +pub fn drain_readable_into_bytes(stream_id: usize) -> Vec { + let mut out = Vec::new(); + let chunks: Vec = { + let mut g = READABLE_STREAMS.lock().unwrap(); + match g.get_mut(&stream_id) { + Some(s) => { + let drained: Vec = s.chunks.drain(..).collect(); + s.state = ReadableState::Closed; + drained + } + None => return out, + } + }; + for chunk in chunks { + unsafe { + if let Some(bytes) = read_bytes_from_chunk(chunk) { + out.extend_from_slice(&bytes); + } + } + } + out +} diff --git a/test-files/test_issue_237_streams_blob.ts b/test-files/test_issue_237_streams_blob.ts new file mode 100644 index 00000000..77655a10 --- /dev/null +++ b/test-files/test_issue_237_streams_blob.ts @@ -0,0 +1,40 @@ +// Regression for issue #237 — Web Streams API: ReadableStream from +// blob.stream() / response.body. Acceptance criteria #1 from the issue +// body, plus the immediate followup (`for await`). + +async function main(): Promise { + // ── 1. blob.stream().getReader().read() round-trip ── + const r1 = new Response("hello world"); + const blob1 = await r1.blob(); + const stream1 = blob1.stream(); + const reader1 = stream1.getReader(); + const first = await reader1.read(); + console.log("first done: " + first.done); + // value is a Uint8Array — coerce to string by length (the buffered + // bytes are reachable via .length) + console.log("first len: " + first.value.length); + const second = await reader1.read(); + console.log("second done: " + second.done); + + // ── 2. response.body — same shape ── + const r2 = new Response("abc"); + const body = r2.body; + const reader2 = body.getReader(); + const out1 = await reader2.read(); + console.log("body chunk done: " + out1.done); + console.log("body chunk len: " + out1.value.length); + const out2 = await reader2.read(); + console.log("body second done: " + out2.done); + + // ── 3. for await of stream — desugared to getReader/read loop ── + const r3 = new Response("xyzzy"); + const blob3 = await r3.blob(); + const stream3 = blob3.stream(); + let totalLen = 0; + for await (const chunk of stream3) { + totalLen += chunk.length; + } + console.log("for-await total: " + totalLen); +} + +main(); diff --git a/test-files/test_issue_237_streams_pipe.ts b/test-files/test_issue_237_streams_pipe.ts new file mode 100644 index 00000000..83dcc1c0 --- /dev/null +++ b/test-files/test_issue_237_streams_pipe.ts @@ -0,0 +1,65 @@ +// Regression for issue #237 — pipeTo / pipeThrough / WritableStream / +// TransformStream end-to-end. + +async function main(): Promise { + // ── 1. pipeTo: drain readable into writable ── + const seen: any[] = []; + const ws = new WritableStream({ + write(chunk: any): void { + seen.push(chunk.length); + }, + close(): void { + seen.push(-1); + }, + }); + + const rs = new ReadableStream({ + start(c: any): void { + c.enqueue(new Uint8Array([1, 2, 3])); + c.enqueue(new Uint8Array([4, 5])); + c.close(); + }, + }); + + await rs.pipeTo(ws); + console.log("pipeTo lengths: " + seen.join(",")); + + // ── 2. pipeThrough: identity transform ── + const ts = new TransformStream({ + transform(chunk: any, controller: any): void { + controller.enqueue(chunk); + }, + }); + + const upstream = new ReadableStream({ + start(c: any): void { + c.enqueue(new Uint8Array([10, 20, 30])); + c.close(); + }, + }); + + const downstream = upstream.pipeThrough(ts); + const reader = downstream.getReader(); + const out = await reader.read(); + console.log("through done: " + out.done); + console.log("through len: " + out.value.length); + + // ── 3. tee: returns an array of two ReadableStream handles. Each + // branch holds its own copy of the buffered chunks. The branches[0] / + // branches[1] indexing pattern doesn't currently retag the elements + // as ReadableStream native instances; downstream consumers can drive + // the branches via direct FFI for now (real propagation through + // index access is tracked as a #237 followup). Verify the array + // shape here so the codegen path stays exercised. + const teeable = new ReadableStream({ + start(c: any): void { + c.enqueue(new Uint8Array([1, 2])); + c.enqueue(new Uint8Array([3, 4])); + c.close(); + }, + }); + const branches = teeable.tee(); + console.log("tee length: " + branches.length); +} + +main(); diff --git a/test-files/test_issue_237_streams_user_source.ts b/test-files/test_issue_237_streams_user_source.ts new file mode 100644 index 00000000..592d1b38 --- /dev/null +++ b/test-files/test_issue_237_streams_user_source.ts @@ -0,0 +1,31 @@ +// Regression for issue #237 — user-supplied ReadableStream source via +// `new ReadableStream({ start, pull, cancel })` + `controller.enqueue` / +// `controller.close()`. Verifies the controller surface and the +// pending-read drain semantics. + +async function main(): Promise { + console.log("enter"); + // ── 1. start enqueues + closes synchronously ── + const stream = new ReadableStream({ + start(controller: any): void { + controller.enqueue(new Uint8Array([72, 101, 108, 108, 111])); // "Hello" + controller.enqueue(new Uint8Array([32, 87, 111, 114, 108, 100])); // " World" + controller.close(); + }, + }); + + const reader = stream.getReader(); + const r1 = await reader.read(); + console.log("r1 done: " + r1.done); + console.log("r1 len: " + r1.value.length); + + const r2 = await reader.read(); + console.log("r2 done: " + r2.done); + console.log("r2 len: " + r2.value.length); + + const r3 = await reader.read(); + console.log("r3 done: " + r3.done); + console.log("end"); +} + +main();