Add batched variant of arrange for trace-less flattening#728
Add batched variant of arrange for trace-less flattening#728DAlperin wants to merge 1 commit intoTimelyDataflow:master-nextfrom
Conversation
Introduce a `Batched` trait that parallels `Arrange` but produces a stream of sealed batches without maintaining a shared trace. The stream is wrapped in a new `Batches<Tr>` type with inherent `as_collection` and `flat_map_ref` methods. Split the batching half of `arrange_core` into a reusable `BatchEngine` so both `batch_core` and `arrange_core` share the capability/sealing logic. `arrange_core` also now goes through `new_trace_writer` for the trace-side half. Add `batch_by_key` / `batch_by_self` convenience methods on `Collection` mirroring `arrange_by_*`. Rewrite `consolidate_named` on top of `batched`: it previously built a shared trace only to immediately flatten it back into a collection, paying for trace maintenance that nothing downstream consumed. I spot checked a few programs compiled in release mode and the new behavior is well inlined into `arranged_core`, so it should be performance nuetral.
frankmcsherry
left a comment
There was a problem hiding this comment.
A few comments left, but here's a design concern / take:
This introduces new opportunities for inconsistencies between the batches and the trace. They can now be observed "in parallel", with the arrangement former and an operator that reads the batches running not in sequence any more. The operator could receive a batch that is not yet present in the arrangement, except through good behavior of the scheduler. This is a bit unnerving, and imo would be great to avoid.
An alternate design, similar in spirit I think, is to have the Batches type not be a stream, but an interstitial moment that can be consumed to turn in to either a stream of batches or an Arranged (which also houses a stream of batches). I think it might be not unlike a shell for the batch engine, waiting to figure out what we want to do with it next.
I can imagine wanting the abstraction of "any stream of sensible batches should be able to become an arrangement", but I'm also spooked by the need to discover the new classes of concurrency that may arise in the operator implementations.
Let's discuss some more!
| // Acquire a logger for arrange events. | ||
| let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into); | ||
| impl<'scope, Tr: TraceReader> Batches<'scope, Tr> { | ||
| /// Flattens the stream of batches into a `VecCollection` with one element per `(key, val)` pair. |
There was a problem hiding this comment.
I don't think the comment is correct; there may be many (time, diff) pairs for each key-val pair.
| // 1. If any held capabilities are not in advance of the new input frontier, | ||
| // we must carve out updates now in advance of the new input frontier and | ||
| // transmit them as batches, which requires appropriate *single* capabilities; | ||
| // Until timely dataflow supports multiple capabilities on messages, at least. | ||
| // | ||
| // 2. If there are no held capabilities in advance of the new input frontier, | ||
| // then there are no updates not in advance of the new input frontier and | ||
| // we can simply create an empty input batch with the new upper frontier | ||
| // and feed this to the trace agent (but not along the timely output). | ||
|
|
||
| // If there is at least one capability not in advance of the input frontier ... |
There was a problem hiding this comment.
All of these comments seem to have been lost. Without wanting to presume, Claude has deleted them several times for me as well, so I suspect it's not intentional.
| pub struct Batches<'scope, Tr: TraceReader> { | ||
| /// The stream of sealed batches. | ||
| pub stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>, | ||
| } |
There was a problem hiding this comment.
Two thoughts here:
- TBD on whether this carries the weight of a new type; it might, or perhaps it should just be either a
type Batches = ..or possibly justStreamin its uses. - I think it would be better if possible to have
B: BatchReader, avoiding having to name the trace that the type is meant to avoid.
| /// shared trace, implementors emit sealed batches downstream as timely | ||
| /// output. Downstream operators consume the batches directly, skipping the | ||
| /// cost of maintaining a shared trace. | ||
| pub trait Batched<'scope, T: Timestamp+Lattice, C> : Sized { |
There was a problem hiding this comment.
I'm not sure this trait is carrying its weight either, a bit like Batches may not be. The Arranged trait just barely makes it, because folks do want to fluently arrange things, but .. I'm not sure there are more than a handful of moments where folks want to form batches without an arrangement, and the extension traits to make the formation fluent may not need to be in the core library. Tbd!
| where | ||
| Ba: Batcher<Input=C, Time=T> + 'static, | ||
| Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>, | ||
| Tr: Trace<Time=T> + 'static, |
There was a problem hiding this comment.
Same as with Bached, if we can use B: Batch rather than Tr: Trace then probably for the best if we don't otherwise reference the trace.
|
Here's a concrete recommendation that I think makes sense: extend the Perhaps simpler, though less fascinating: just a |
|
Ahh I asked Claude to make me the new types, and checked the logic but not really the comments. Embarrassing! The other points are interesting. Agree on being generic over Batch, not Tr::Batch, etc. To give you a sense of why I landed on the design I did:
|
Introduce a
Batchedtrait that parallelsArrangebut produces a stream of sealed batches without maintaining a shared trace. The stream is wrapped in a newBatches<Tr>type with inherentas_collectionandflat_map_refmethods.Split the batching half of
arrange_coreinto a reusableBatchEngineso bothbatch_coreandarrange_coreshare the capability/sealing logic.arrange_corealso now goes throughnew_trace_writerfor the trace-side half.Add
batch_by_key/batch_by_selfconvenience methods onCollectionmirroringarrange_by_*.Rewrite
consolidate_namedon top ofbatched: it previously built a shared trace only to immediately flatten it back into a collection, paying for a trace that nothing downstream consumed.I spot checked a few programs compiled in release mode and the new behavior is well inlined into
arranged_core, so it should be performance nuetral.