Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 64 additions & 4 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -964,11 +964,11 @@ pub mod vec {
Ba: crate::trace::Batcher<Input=Vec<((D,()),T,R)>, Time=T> + 'static,
Tr: for<'a> crate::trace::Trace<Time=T,Diff=R>+'static,
Bu: crate::trace::Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
F: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
{
use crate::operators::arrange::arrangement::Arrange;
use crate::operators::arrange::Batched;
self.map(|k| (k, ()))
.arrange_named::<Ba, Bu, Tr>(name)
.batched_named::<Ba, Bu, Tr>(name)
.as_collection(reify)
}

Expand Down Expand Up @@ -1018,7 +1018,7 @@ pub mod vec {

use crate::trace::implementations::{ValSpine, ValBatcher, ValBuilder};
use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder};
use crate::operators::arrange::Arrange;
use crate::operators::arrange::{Arrange, Batched, Batches};

impl<'scope, T, K, V, R> Arrange<'scope, T, Vec<((K, V), T, R)>> for Collection<'scope, T, (K, V), R>
where
Expand Down Expand Up @@ -1053,6 +1053,39 @@ pub mod vec {
}
}

impl<'scope, T, K, V, R> Batched<'scope, T, Vec<((K, V), T, R)>> for Collection<'scope, T, (K, V), R>
where
T: Timestamp + Lattice,
K: crate::ExchangeData + Hashable,
V: crate::ExchangeData,
R: crate::ExchangeData + Semigroup,
{
fn batched_named<Ba, Bu, Tr>(self, name: &str) -> Batches<'scope, Tr>
where
Ba: crate::trace::Batcher<Input=Vec<((K, V), T, R)>, Time=T> + 'static,
Bu: crate::trace::Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
Tr: crate::trace::Trace<Time=T> + 'static,
{
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),T,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::batch_core::<_, Ba, Bu, Tr>(self.inner, exchange, name)
}
}

impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Batched<'scope, T, Vec<((K, ()), T, R)>> for Collection<'scope, T, K, R>
where
T: Timestamp + Lattice + Ord,
{
fn batched_named<Ba, Bu, Tr>(self, name: &str) -> Batches<'scope, Tr>
where
Ba: crate::trace::Batcher<Input=Vec<((K,()),T,R)>, Time=T> + 'static,
Bu: crate::trace::Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
Tr: crate::trace::Trace<Time=T> + 'static,
{
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),T,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::batch_core::<_, Ba, Bu, Tr>(self.map(|k| (k, ())).inner, exchange, name)
}
}


impl<'scope, T, K: crate::ExchangeData+Hashable, V: crate::ExchangeData, R: crate::ExchangeData+Semigroup> Collection<'scope, T, (K,V), R>
where
Expand All @@ -1071,6 +1104,20 @@ pub mod vec {
pub fn arrange_by_key_named(self, name: &str) -> Arranged<'scope, TraceAgent<ValSpine<K, V, T, R>>> {
self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
}

/// Produces a stream of sealed batches from a collection of `(Key, Val)` records, keyed by `Key`.
///
/// Unlike `arrange_by_key`, this operator does not maintain a shared trace; it only emits the
/// sealed batches downstream. Use this when downstream consumers want to process batches
/// directly rather than pay for the shared trace.
pub fn batch_by_key(self) -> Batches<'scope, ValSpine<K, V, T, R>> {
self.batch_by_key_named("BatchByKey")
}

/// As `batch_by_key` but with the ability to name the operator.
pub fn batch_by_key_named(self, name: &str) -> Batches<'scope, ValSpine<K, V, T, R>> {
self.batched_named::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<K, V, T, R>>(name)
}
}

impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Collection<'scope, T, K, R>
Expand All @@ -1091,6 +1138,19 @@ pub mod vec {
self.map(|k| (k, ()))
.arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
}

/// Produces a stream of sealed batches from a collection of `Key` records, keyed by `Key`.
///
/// Unlike `arrange_by_self`, this operator does not maintain a shared trace; it only emits
/// the sealed batches downstream.
pub fn batch_by_self(self) -> Batches<'scope, KeySpine<K, T, R>> {
self.batch_by_self_named("BatchBySelf")
}

/// As `batch_by_self` but with the ability to name the operator.
pub fn batch_by_self_named(self, name: &str) -> Batches<'scope, KeySpine<K, T, R>> {
self.batched_named::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<K, T, R>>(name)
}
}

impl<'scope, T, K, V, R> Collection<'scope, T, (K, V), R>
Expand Down
Loading
Loading