Skip to content

Add batched variant of arrange for trace-less flattening#728

Open
DAlperin wants to merge 1 commit intoTimelyDataflow:master-nextfrom
DAlperin:dov/batched-arrange-variant
Open

Add batched variant of arrange for trace-less flattening#728
DAlperin wants to merge 1 commit intoTimelyDataflow:master-nextfrom
DAlperin:dov/batched-arrange-variant

Conversation

@DAlperin
Copy link
Copy Markdown

@DAlperin DAlperin commented Apr 24, 2026

Introduce a Batched trait that parallels Arrange but produces a stream of sealed batches without maintaining a shared trace. The stream is wrapped in a new Batches<Tr> type with inherent as_collection and flat_map_ref methods.

Split the batching half of arrange_core into a reusable BatchEngine so both batch_core and arrange_core share the capability/sealing logic. arrange_core also now goes through new_trace_writer for the trace-side half.

Add batch_by_key / batch_by_self convenience methods on Collection mirroring arrange_by_*.

Rewrite consolidate_named on top of batched: it previously built a shared trace only to immediately flatten it back into a collection, paying for a trace that nothing downstream consumed.

I spot checked a few programs compiled in release mode and the new behavior is well inlined into arranged_core, so it should be performance nuetral.

Introduce a `Batched` trait that parallels `Arrange` but produces a
stream of sealed batches without maintaining a shared trace. The stream
is wrapped in a new `Batches<Tr>` type with inherent `as_collection` and
`flat_map_ref` methods.

Split the batching half of `arrange_core` into a reusable `BatchEngine`
so both `batch_core` and `arrange_core` share the capability/sealing
logic. `arrange_core` also now goes through `new_trace_writer` for the
trace-side half.

Add `batch_by_key` / `batch_by_self` convenience methods on `Collection`
mirroring `arrange_by_*`.

Rewrite `consolidate_named` on top of `batched`: it previously built a
shared trace only to immediately flatten it back into a collection,
paying for trace maintenance that nothing downstream consumed.

I spot checked a few programs compiled in release mode and the new
behavior is well inlined into `arranged_core`, so it should be
performance nuetral.
Copy link
Copy Markdown
Member

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments left, but here's a design concern / take:

This introduces new opportunities for inconsistencies between the batches and the trace. They can now be observed "in parallel", with the arrangement former and an operator that reads the batches running not in sequence any more. The operator could receive a batch that is not yet present in the arrangement, except through good behavior of the scheduler. This is a bit unnerving, and imo would be great to avoid.

An alternate design, similar in spirit I think, is to have the Batches type not be a stream, but an interstitial moment that can be consumed to turn in to either a stream of batches or an Arranged (which also houses a stream of batches). I think it might be not unlike a shell for the batch engine, waiting to figure out what we want to do with it next.

I can imagine wanting the abstraction of "any stream of sensible batches should be able to become an arrangement", but I'm also spooked by the need to discover the new classes of concurrency that may arise in the operator implementations.

Let's discuss some more!

// Acquire a logger for arrange events.
let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
impl<'scope, Tr: TraceReader> Batches<'scope, Tr> {
/// Flattens the stream of batches into a `VecCollection` with one element per `(key, val)` pair.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the comment is correct; there may be many (time, diff) pairs for each key-val pair.

Comment on lines -407 to -417
// 1. If any held capabilities are not in advance of the new input frontier,
// we must carve out updates now in advance of the new input frontier and
// transmit them as batches, which requires appropriate *single* capabilities;
// Until timely dataflow supports multiple capabilities on messages, at least.
//
// 2. If there are no held capabilities in advance of the new input frontier,
// then there are no updates not in advance of the new input frontier and
// we can simply create an empty input batch with the new upper frontier
// and feed this to the trace agent (but not along the timely output).

// If there is at least one capability not in advance of the input frontier ...
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these comments seem to have been lost. Without wanting to presume, Claude has deleted them several times for me as well, so I suspect it's not intentional.

Comment on lines +322 to +325
pub struct Batches<'scope, Tr: TraceReader> {
/// The stream of sealed batches.
pub stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>,
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two thoughts here:

  1. TBD on whether this carries the weight of a new type; it might, or perhaps it should just be either a type Batches = .. or possibly just Stream in its uses.
  2. I think it would be better if possible to have B: BatchReader, avoiding having to name the trace that the type is meant to avoid.

/// shared trace, implementors emit sealed batches downstream as timely
/// output. Downstream operators consume the batches directly, skipping the
/// cost of maintaining a shared trace.
pub trait Batched<'scope, T: Timestamp+Lattice, C> : Sized {
Copy link
Copy Markdown
Member

@frankmcsherry frankmcsherry Apr 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this trait is carrying its weight either, a bit like Batches may not be. The Arranged trait just barely makes it, because folks do want to fluently arrange things, but .. I'm not sure there are more than a handful of moments where folks want to form batches without an arrangement, and the extension traits to make the formation fluent may not need to be in the core library. Tbd!

where
Ba: Batcher<Input=C, Time=T> + 'static,
Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=T> + 'static,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as with Bached, if we can use B: Batch rather than Tr: Trace then probably for the best if we don't otherwise reference the trace.

@frankmcsherry
Copy link
Copy Markdown
Member

Here's a concrete recommendation that I think makes sense: extend the Arrange trait with the batched method (perhaps batched_named, or just as it is) which is generic only wrt a Ba: Batcher<Input=C> (fingers crossed), and which produces a new type Batched that is .. unclear to me at the moment sorry .. but the intent is that it can either turn in to a Stream<B::Batch> or with an appropriate Bu, Tr into an Arranged<Tr>.

Perhaps simpler, though less fascinating: just a batched<Ba> method on Arrange that produces the stream, and then we solve for the software architecture outside of the public trait interface. I do like the idea that the parts be architected apart, but it's not clear that revealing this to users (e.g. handing a half-formed batched thing, from which they can complete the arrangement if they choose) helps them more than just saying "either batches or arrangement".

@DAlperin
Copy link
Copy Markdown
Author

Ahh I asked Claude to make me the new types, and checked the logic but not really the comments. Embarrassing!

The other points are interesting. Agree on being generic over Batch, not Tr::Batch, etc.

To give you a sense of why I landed on the design I did:

  • I wanted to be able to match the fluent composition (as_collection/map_batches) of Arranged. I originally had batched variants return a stream directly but then it was subject to the StreamExt trait extensions which didn't do what I wanted here.
  • I thought it was morally nice to open the future up to the "any stream of batches could be an arrangement" thing. But to be honest I had not at all considered the concurrency issues at all. I will have to think harder about this. An interstitial object that can be exchanged for either outcomes is interesting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants