diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 16755c27e..bfa04d4cd 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -964,11 +964,11 @@ pub mod vec { Ba: crate::trace::Batcher, Time=T> + 'static, Tr: for<'a> crate::trace::Trace+'static, Bu: crate::trace::Builder, - F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static, + F: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static, { - use crate::operators::arrange::arrangement::Arrange; + use crate::operators::arrange::Batched; self.map(|k| (k, ())) - .arrange_named::(name) + .batched_named::(name) .as_collection(reify) } @@ -1018,7 +1018,7 @@ pub mod vec { use crate::trace::implementations::{ValSpine, ValBatcher, ValBuilder}; use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder}; - use crate::operators::arrange::Arrange; + use crate::operators::arrange::{Arrange, Batched, Batches}; impl<'scope, T, K, V, R> Arrange<'scope, T, Vec<((K, V), T, R)>> for Collection<'scope, T, (K, V), R> where @@ -1053,6 +1053,39 @@ pub mod vec { } } + impl<'scope, T, K, V, R> Batched<'scope, T, Vec<((K, V), T, R)>> for Collection<'scope, T, (K, V), R> + where + T: Timestamp + Lattice, + K: crate::ExchangeData + Hashable, + V: crate::ExchangeData, + R: crate::ExchangeData + Semigroup, + { + fn batched_named(self, name: &str) -> Batches<'scope, Tr> + where + Ba: crate::trace::Batcher, Time=T> + 'static, + Bu: crate::trace::Builder, + Tr: crate::trace::Trace + 'static, + { + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),T,R)| (update.0).0.hashed().into()); + crate::operators::arrange::arrangement::batch_core::<_, Ba, Bu, Tr>(self.inner, exchange, name) + } + } + + impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Batched<'scope, T, Vec<((K, ()), T, R)>> for Collection<'scope, T, K, R> + where + T: Timestamp + Lattice + Ord, + { + fn batched_named(self, name: &str) -> Batches<'scope, Tr> + where + Ba: crate::trace::Batcher, Time=T> + 'static, + Bu: crate::trace::Builder, + Tr: crate::trace::Trace + 'static, + { + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),T,R)| (update.0).0.hashed().into()); + crate::operators::arrange::arrangement::batch_core::<_, Ba, Bu, Tr>(self.map(|k| (k, ())).inner, exchange, name) + } + } + impl<'scope, T, K: crate::ExchangeData+Hashable, V: crate::ExchangeData, R: crate::ExchangeData+Semigroup> Collection<'scope, T, (K,V), R> where @@ -1071,6 +1104,20 @@ pub mod vec { pub fn arrange_by_key_named(self, name: &str) -> Arranged<'scope, TraceAgent>> { self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) } + + /// Produces a stream of sealed batches from a collection of `(Key, Val)` records, keyed by `Key`. + /// + /// Unlike `arrange_by_key`, this operator does not maintain a shared trace; it only emits the + /// sealed batches downstream. Use this when downstream consumers want to process batches + /// directly rather than pay for the shared trace. + pub fn batch_by_key(self) -> Batches<'scope, ValSpine> { + self.batch_by_key_named("BatchByKey") + } + + /// As `batch_by_key` but with the ability to name the operator. + pub fn batch_by_key_named(self, name: &str) -> Batches<'scope, ValSpine> { + self.batched_named::, ValBuilder<_,_,_,_>, ValSpine>(name) + } } impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Collection<'scope, T, K, R> @@ -1091,6 +1138,19 @@ pub mod vec { self.map(|k| (k, ())) .arrange_named::,KeyBuilder<_,_,_>,_>(name) } + + /// Produces a stream of sealed batches from a collection of `Key` records, keyed by `Key`. + /// + /// Unlike `arrange_by_self`, this operator does not maintain a shared trace; it only emits + /// the sealed batches downstream. + pub fn batch_by_self(self) -> Batches<'scope, KeySpine> { + self.batch_by_self_named("BatchBySelf") + } + + /// As `batch_by_self` but with the ability to name the operator. + pub fn batch_by_self_named(self, name: &str) -> Batches<'scope, KeySpine> { + self.batched_named::, KeyBuilder<_,_,_>, KeySpine>(name) + } } impl<'scope, T, K, V, R> Collection<'scope, T, (K, V), R> diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 42f3d7b43..5e7c54dc9 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -171,42 +171,7 @@ impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { I: IntoIterator, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, { - Self::flat_map_batches(self.stream, logic) - } - - /// Extracts elements from a stream of batches as a `VecCollection`. - /// - /// The supplied logic may produce an iterator over output values, allowing either - /// filtering or flat mapping as part of the extraction. - /// - /// This method exists for streams of batches without the corresponding arrangement. - /// If you have the arrangement, its `flat_map_ref` method is equivalent to this. - pub fn flat_map_batches(stream: Stream<'scope, Tr::Time, Vec>, mut logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff> - where - I: IntoIterator, - L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, - { - stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| { - input.for_each(|time, data| { - let mut session = output.session(&time); - for wrapper in data.iter() { - let batch = &wrapper; - let mut cursor = batch.cursor(); - while let Some(key) = cursor.get_key(batch) { - while let Some(val) = cursor.get_val(batch) { - for datum in logic(key, val) { - cursor.map_times(batch, |time, diff| { - session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff))); - }); - } - cursor.step_val(batch); - } - cursor.step_key(batch); - } - } - }); - }) - .as_collection() + Batches { stream: self.stream }.flat_map_ref(logic) } } @@ -321,157 +286,264 @@ pub trait Arrange<'scope, T: Timestamp+Lattice, C> : Sized { ; } -/// Arranges a stream of updates by a key, configured with a name and a parallelization contract. +/// A type that can be converted into a stream of sealed trace batches. /// -/// This operator arranges a stream of values into a shared trace, whose contents it maintains. -/// It uses the supplied parallelization contract to distribute the data, which does not need to -/// be consistently by key (though this is the most common). -pub fn arrange_core<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str) -> Arranged<'scope, TraceAgent> -where - P: ParallelizationContract, - Ba: Batcher + 'static, - Bu: Builder, - Tr: Trace+'static, -{ - // The `Arrange` operator is tasked with reacting to an advancing input - // frontier by producing the sequence of batches whose lower and upper - // bounds are those frontiers, containing updates at times greater or - // equal to lower and not greater or equal to upper. - // - // The operator uses its batch type's `Batcher`, which accepts update - // triples and responds to requests to "seal" batches (presented as new - // upper frontiers). - // - // Each sealed batch is presented to the trace, and if at all possible - // transmitted along the outgoing channel. Empty batches may not have - // a corresponding capability, as they are only retained for actual data - // held by the batcher, which may prevents the operator from sending an - // empty batch. - - let mut reader: Option> = None; +/// This is the batching analogue of [`Arrange`]: instead of maintaining a +/// shared trace, implementors emit sealed batches downstream as timely +/// output. Downstream operators consume the batches directly, skipping the +/// cost of maintaining a shared trace. +pub trait Batched<'scope, T: Timestamp+Lattice, C> : Sized { + /// Produces a stream of sealed batches from `self`. + fn batched(self) -> Batches<'scope, Tr> + where + Ba: Batcher + 'static, + Bu: Builder, + Tr: Trace + 'static, + { + self.batched_named::("Batch") + } - // fabricate a data-parallel operator using the `unary_notify` pattern. - let reader_ref = &mut reader; - let scope = stream.scope(); + /// As `batched` but with the ability to name the operator. + fn batched_named(self, name: &str) -> Batches<'scope, Tr> + where + Ba: Batcher + 'static, + Bu: Builder, + Tr: Trace + 'static, + ; +} - let stream = stream.unary_frontier(pact, name, move |_capability, info| { +/// A stream of sealed trace batches, produced by [`Batched`] or [`batch_core`]. +/// +/// This is the batching analogue of [`Arranged`]: it carries the batch stream +/// without the shared trace. Downstream operators can flatten the batches back +/// into a `VecCollection` via [`Batches::as_collection`] or +/// [`Batches::flat_map_ref`]. +#[derive(Clone)] +pub struct Batches<'scope, Tr: TraceReader> { + /// The stream of sealed batches. + pub stream: Stream<'scope, Tr::Time, Vec>, +} - // Acquire a logger for arrange events. - let logger = scope.worker().logger_for::("differential/arrange").map(Into::into); +impl<'scope, Tr: TraceReader> Batches<'scope, Tr> { + /// Flattens the stream of batches into a `VecCollection` with one element per `(key, val)` pair. + pub fn as_collection(self, mut logic: L) -> VecCollection<'scope, Tr::Time, D, Tr::Diff> + where + D: Data, + L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static, + { + self.flat_map_ref(move |key, val| Some(logic(key, val))) + } - // Where we will deposit received updates, and from which we extract batches. - let mut batcher = Ba::new(logger.clone(), info.global_id); + /// Flattens the stream of batches into a `VecCollection` via an iterator-producing closure. + pub fn flat_map_ref(self, mut logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff> + where + I: IntoIterator, + L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I + 'static, + { + self.stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| { + input.for_each(|time, data| { + let mut session = output.session(&time); + for wrapper in data.iter() { + let batch = &wrapper; + let mut cursor = batch.cursor(); + while let Some(key) = cursor.get_key(batch) { + while let Some(val) = cursor.get_val(batch) { + for datum in logic(key, val) { + cursor.map_times(batch, |time, diff| { + session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff))); + }); + } + cursor.step_val(batch); + } + cursor.step_key(batch); + } + } + }); + }) + .as_collection() + } +} - // Capabilities for the lower envelope of updates in `batcher`. - let mut capabilities = Antichain::>::new(); +/// Stateful logic for producing batches from a stream of timestamped updates. +/// +/// This encapsulates the half of `arrange_core` that buffers updates in a +/// `Batcher`, tracks held capabilities, and carves out sealed batches as the +/// input frontier advances. It is intended to be driven from a `unary_frontier` +/// operator: call `push` in the input loop for each received message, then +/// call `step` with the input frontier to emit any newly complete batches +/// via the supplied callback. +pub struct BatchEngine { + batcher: Ba, + capabilities: Antichain>, + prev_frontier: Antichain, +} - let activator = Some(scope.activator_for(std::rc::Rc::clone(&info.address))); - let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - // If there is default exertion logic set, install it. - if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { - empty_trace.set_exert_logic(exert_logic); +impl BatchEngine +where + Ba: Batcher, +{ + /// Creates a new engine for an operator with the given logger and global id. + #[inline] + pub fn new(logger: Option, operator_id: usize) -> Self { + Self { + batcher: Ba::new(logger, operator_id), + capabilities: Antichain::new(), + prev_frontier: Antichain::from_elem(Ba::Time::minimum()), } + } - let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); - - *reader_ref = Some(reader_local); - - // Initialize to the minimal input frontier. - let mut prev_frontier = Antichain::from_elem(Tr::Time::minimum()); + /// Stashes a received message's capability and pushes its data into the batcher. + #[inline] + pub fn push(&mut self, capability: Capability, data: &mut Ba::Input) { + self.capabilities.insert(capability); + self.batcher.push_container(data); + } - move |(input, frontier), output| { + /// Reacts to a new input frontier, emitting any newly complete batches. + /// + /// Each emitted batch is passed to `on_batch` along with a capability at + /// which the batch may be transmitted. When the frontier advances without + /// any held capabilities in its past, an empty progress batch is sealed + /// but not emitted through `on_batch`, matching the convention that empty + /// batches do not carry capabilities. + #[inline] + pub fn step( + &mut self, + frontier: &timely::progress::frontier::MutableAntichain, + mut on_batch: impl FnMut(&Capability, Bu::Output), + ) + where + Bu: Builder, + { + // Assert that the frontier never regresses. + assert!(PartialOrder::less_equal(&self.prev_frontier.borrow(), &frontier.frontier())); - // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. - // We don't have to keep all capabilities, but we need to be able to form output messages - // when we realize that time intervals are complete. + // Only act on strict progress. + if self.prev_frontier.borrow() == frontier.frontier() { + return; + } - input.for_each(|cap, data| { - capabilities.insert(cap.retain(0)); - batcher.push_container(data); - }); + // If any held capability is no longer in advance of the input frontier, + // carve out batches for each such capability. Otherwise seal an empty + // progress batch. + if self.capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) { + let mut upper = Antichain::new(); + for (index, capability) in self.capabilities.elements().iter().enumerate() { + if !frontier.less_equal(capability.time()) { + // Upper bound for this capability: the input frontier plus + // any subsequent capabilities we have yet to retire. + upper.clear(); + for time in frontier.frontier().iter() { + upper.insert(time.clone()); + } + for other_capability in &self.capabilities.elements()[(index + 1)..] { + upper.insert(other_capability.time().clone()); + } - // The frontier may have advanced by multiple elements, which is an issue because - // timely dataflow currently only allows one capability per message. This means we - // must pretend to process the frontier advances one element at a time, batching - // and sending smaller bites than we might have otherwise done. - - // Assert that the frontier never regresses. - assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier())); - - // Test to see if strict progress has occurred, which happens whenever the new - // frontier isn't equal to the previous. It is only in this case that we have any - // data processing to do. - if prev_frontier.borrow() != frontier.frontier() { - // There are two cases to handle with some care: - // - // 1. If any held capabilities are not in advance of the new input frontier, - // we must carve out updates now in advance of the new input frontier and - // transmit them as batches, which requires appropriate *single* capabilities; - // Until timely dataflow supports multiple capabilities on messages, at least. - // - // 2. If there are no held capabilities in advance of the new input frontier, - // then there are no updates not in advance of the new input frontier and - // we can simply create an empty input batch with the new upper frontier - // and feed this to the trace agent (but not along the timely output). - - // If there is at least one capability not in advance of the input frontier ... - if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) { - - let mut upper = Antichain::new(); // re-used allocation for sealing batches. - - // For each capability not in advance of the input frontier ... - for (index, capability) in capabilities.elements().iter().enumerate() { - - if !frontier.less_equal(capability.time()) { - - // Assemble the upper bound on times we can commit with this capabilities. - // We must respect the input frontier, and *subsequent* capabilities, as - // we are pretending to retire the capability changes one by one. - upper.clear(); - for time in frontier.frontier().iter() { - upper.insert(time.clone()); - } - for other_capability in &capabilities.elements()[(index + 1) .. ] { - upper.insert(other_capability.time().clone()); - } + let batch = self.batcher.seal::(upper.clone()); + on_batch(&self.capabilities.elements()[index], batch); + } + } - // Extract updates not in advance of `upper`. - let batch = batcher.seal::(upper.clone()); + // Downgrade capabilities to match the batcher's lower update frontier. + let mut new_capabilities = Antichain::new(); + for time in self.batcher.frontier().iter() { + if let Some(capability) = self.capabilities.elements().iter().find(|c| c.time().less_equal(time)) { + new_capabilities.insert(capability.delayed(time)); + } else { + panic!("failed to find capability"); + } + } + self.capabilities = new_capabilities; + } else { + // Announce progress updates, even without data. + let _batch = self.batcher.seal::(frontier.frontier().to_owned()); + } - writer.insert(batch.clone(), Some(capability.time().clone())); + self.prev_frontier.clear(); + self.prev_frontier.extend(frontier.frontier().iter().cloned()); + } +} - // send the batch to downstream consumers, empty or not. - output.session(&capabilities.elements()[index]).give(batch); - } - } +/// Constructs a shared trace and the write endpoint used to feed it. +/// +/// This is the trace-side counterpart to [`BatchEngine`]. Callers are +/// responsible for driving the returned `TraceWriter`: inserting batches as +/// they are produced and calling `seal` as the input frontier advances. +pub fn new_trace_writer<'scope, Tr>( + scope: &Scope<'scope, Tr::Time>, + info: timely::dataflow::operators::generic::OperatorInfo, + logger: Option, +) -> (TraceAgent, super::writer::TraceWriter) +where + Tr: Trace + 'static, +{ + let activator = Some(scope.activator_for(std::rc::Rc::clone(&info.address))); + let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); + if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { + empty_trace.set_exert_logic(exert_logic); + } + TraceAgent::new(empty_trace, info, logger) +} - // Having extracted and sent batches between each capability and the input frontier, - // we should downgrade all capabilities to match the batcher's lower update frontier. - // This may involve discarding capabilities, which is fine as any new updates arrive - // in messages with new capabilities. +/// Arranges a stream of updates by a key, configured with a name and a parallelization contract. +/// +/// This operator crucially does not arrange the data into a trace, but instead produces batches +/// downstream, where callers can do as they want with it. If having a trace is desired, the caller +/// can maintain them separately, or use `arrange_core`, which maintains the trace for you. +pub fn batch_core<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str) -> Batches<'scope, Tr> +where + P: ParallelizationContract, + Ba: Batcher + 'static, + Bu: Builder, + Tr: Trace+'static, +{ + let scope = stream.scope(); + let stream = stream.unary_frontier(pact, name, move |_capability, info| { + let logger = scope.worker().logger_for::("differential/arrange").map(Into::into); + let mut engine = BatchEngine::::new(logger, info.global_id); - let mut new_capabilities = Antichain::new(); - for time in batcher.frontier().iter() { - if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) { - new_capabilities.insert(capability.delayed(time)); - } - else { - panic!("failed to find capability"); - } - } + move |(input, frontier), output| { + input.for_each(|cap, data| engine.push(cap.retain(0), data)); + engine.step::(frontier, |cap, batch| { + output.session(cap).give(batch); + }); + } + }); + Batches { stream } +} - capabilities = new_capabilities; - } - else { - // Announce progress updates, even without data. - let _batch = batcher.seal::(frontier.frontier().to_owned()); - writer.seal(frontier.frontier().to_owned()); - } +/// Arranges a stream of updates by a key, configured with a name and a parallelization contract. +/// +/// This operator arranges a stream of values into a shared trace, whose contents it maintains. +/// It uses the supplied parallelization contract to distribute the data, which does not need to +/// be consistently by key (though this is the most common). +pub fn arrange_core<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str) -> Arranged<'scope, TraceAgent> +where + P: ParallelizationContract, + Ba: Batcher + 'static, + Bu: Builder, + Tr: Trace+'static, +{ + let scope = stream.scope(); + let mut reader: Option> = None; + let reader_ref = &mut reader; - prev_frontier.clear(); - prev_frontier.extend(frontier.frontier().iter().cloned()); - } + let stream = stream.unary_frontier(pact, name, move |_capability, info| { + let logger: Option = scope.worker().logger_for::("differential/arrange").map(Into::into); + let mut engine = BatchEngine::::new(logger.clone(), info.global_id); + let (reader_local, mut writer) = new_trace_writer::(&scope, info, logger); + *reader_ref = Some(reader_local); + move |(input, frontier), output| { + input.for_each(|cap, data| engine.push(cap.retain(0), data)); + engine.step::(frontier, |cap, batch| { + writer.insert(batch.clone(), Some(cap.time().clone())); + output.session(cap).give(batch); + }); + // Advance the trace when progress advances without data; `seal` is + // a no-op if the trace upper already matches. + writer.seal(frontier.frontier().to_owned()); writer.exert(); } }); diff --git a/differential-dataflow/src/operators/arrange/mod.rs b/differential-dataflow/src/operators/arrange/mod.rs index fedbfdc23..08ed1fd27 100644 --- a/differential-dataflow/src/operators/arrange/mod.rs +++ b/differential-dataflow/src/operators/arrange/mod.rs @@ -69,4 +69,4 @@ pub mod upsert; pub use self::writer::TraceWriter; pub use self::agent::{TraceAgent, ShutdownButton}; -pub use self::arrangement::{Arranged, Arrange}; +pub use self::arrangement::{Arranged, Arrange, Batched, Batches};