Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
934a0f9
Merge pull request #700 from TimelyDataflow/master
frankmcsherry Mar 25, 2026
eba4fd9
Merge branch 'master' into master-next
frankmcsherry Mar 26, 2026
3d595ed
Slow but correct impls
frankmcsherry Mar 25, 2026
b25fd43
WIP
frankmcsherry Mar 26, 2026
a7e92f6
Wildly overcomplicated
frankmcsherry Mar 29, 2026
d50868e
Merge pull request #704 from frankmcsherry/columnar_work
frankmcsherry Mar 29, 2026
841ddb9
Merge branch 'master' into master-next
frankmcsherry Apr 2, 2026
ee08017
Remove commented code
frankmcsherry Apr 4, 2026
1535c4b
Removed one-off trait
frankmcsherry Apr 4, 2026
e6af458
Remove unread counters
frankmcsherry Apr 4, 2026
8f9e929
Convert silent errors to panics
frankmcsherry Apr 4, 2026
51aef09
Simplify logic
frankmcsherry Apr 4, 2026
3901fdb
More commented code removed
frankmcsherry Apr 4, 2026
b157359
Extract unconditional behavior
frankmcsherry Apr 4, 2026
fadcd7b
Idiomatic Rust
frankmcsherry Apr 4, 2026
2ec3293
Less time cloning
frankmcsherry Apr 4, 2026
8153387
Idiomatic capability use
frankmcsherry Apr 4, 2026
d021540
Ref keyword to borrow
frankmcsherry Apr 4, 2026
7a02834
Further tightening
frankmcsherry Apr 4, 2026
8e75167
Tidy explanatory text, and reflect frontier
frankmcsherry Apr 4, 2026
7c053ad
Tighten comments, remove mutable borrow
frankmcsherry Apr 4, 2026
5281266
Prefer insert_ref to insert
frankmcsherry Apr 4, 2026
33adafb
Merge pull request #709 from frankmcsherry/reduce_tidy
frankmcsherry Apr 4, 2026
671de13
Use containers for interesting (keys, time) (#710)
frankmcsherry Apr 4, 2026
4f86ef2
WIP caching trace wrapper
frankmcsherry Apr 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,353 changes: 937 additions & 416 deletions differential-dataflow/examples/columnar/columnar_support.rs

Large diffs are not rendered by default.

12 changes: 10 additions & 2 deletions differential-dataflow/examples/columnar/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,16 @@ mod reachability {
let result = combined_arr.reduce_abelian::<_,
ValBuilder<Node, (), IterTime, Diff>,
ValSpine<Node, (), IterTime, Diff>,
>("Distinct", |_node, _input, output| {
output.push(((), 1));
_,
>("Distinct", |_node, _input, output| { output.push(((), 1)); },
|col, key, upds| {
use columnar::{Clear, Push};
col.keys.clear();
col.vals.clear();
col.times.clear();
col.diffs.clear();
for (val, time, diff) in upds.drain(..) { col.push((key, &val, &time, &diff)); }
*col = std::mem::take(col).consolidate();
});

// Extract RecordedUpdates from the Arranged's batch stream.
Expand Down
28 changes: 22 additions & 6 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,11 @@ pub mod vec {
use crate::trace::implementations::{ValBuilder, ValSpine};

self.arrange_by_key_named(&format!("Arrange: {}", name))
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>>(name, logic)
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>,_>(
name,
logic,
|vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
)
.as_collection(|k,v| (k.clone(), v.clone()))
}

Expand Down Expand Up @@ -782,7 +786,7 @@ pub mod vec {
/// ```
pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>= &'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static,
T2: for<'a> Trace<Key<'a>= &'a K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static,
Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
{
Expand All @@ -801,12 +805,16 @@ pub mod vec {
pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
V: Clone+'static,
T2: for<'a> Trace<Key<'a>=&'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp>+'static,
T2: for<'a> Trace<Key<'a>=&'a K, ValOwn = V, Time=G::Timestamp>+'static,
Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
.reduce_core::<_,Bu,_>(name, logic)
.reduce_core::<_,Bu,_,_>(
name,
logic,
|vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
)
}
}

Expand Down Expand Up @@ -871,7 +879,11 @@ pub mod vec {
use crate::trace::implementations::{KeyBuilder, KeySpine};

self.arrange_by_self_named(&format!("Arrange: {}", name))
.reduce_abelian::<_,KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
.reduce_abelian::<_,KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>,_>(
name,
move |k,s,t| t.push(((), thresh(k, &s[0].1))),
|vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
)
.as_collection(|k,_| k.clone())
}

Expand Down Expand Up @@ -908,7 +920,11 @@ pub mod vec {
pub fn count_core<R2: Ord + Abelian + From<i8> + 'static>(self) -> Collection<G, (K, R), R2> {
use crate::trace::implementations::{ValBuilder, ValSpine};
self.arrange_by_self_named("Arrange: Count")
.reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
.reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>,_>(
"Count",
|_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))),
|vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
)
.as_collection(|k,c| (k.clone(), c.clone()))
}
}
Expand Down
6 changes: 5 additions & 1 deletion differential-dataflow/src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ impl<Tr: TraceReader+'static> TraceAgent<Tr> {
/// // create a second dataflow
/// worker.dataflow(move |scope| {
/// trace.import(scope)
/// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Reduce", |_key, src, dst| dst.push((*src[0].0, 1)))
/// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>,_>(
/// "Reduce",
/// |_key, src, dst| dst.push((*src[0].0, 1)),
/// |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
/// )
/// .as_collection(|k,v| (k.clone(), v.clone()));
/// });
///
Expand Down
37 changes: 20 additions & 17 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::{Data, VecCollection, AsCollection};
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
use crate::trace::implementations::merge_batcher::container::InternalMerge;

use trace::wrappers::enter::{TraceEnter, BatchEnter,};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
Expand Down Expand Up @@ -75,7 +74,6 @@ where
use ::timely::dataflow::scopes::Child;
use ::timely::progress::timestamp::Refines;
use timely::Container;
use timely::container::PushInto;

impl<G, Tr> Arranged<G, Tr>
where
Expand Down Expand Up @@ -169,12 +167,17 @@ where
/// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
/// and this method should only be used when the data need to be transformed or exchanged, rather than
/// supplied as arguments to an operator using the same key-value structure.
pub fn as_vecs(self) -> VecCollection<G, (Tr::KeyOwn, Tr::ValOwn), Tr::Diff>
///
/// The method takes `K` and `V` as generic arguments, in order to constrain the reference types to support
/// cloning into owned types. If this bound does not work, the `as_collection` method allows arbitrary logic
/// on the reference types.
pub fn as_vecs<K, V>(self) -> VecCollection<G, (K, V), Tr::Diff>
where
Tr::KeyOwn: crate::ExchangeData,
Tr::ValOwn: crate::ExchangeData,
K: crate::ExchangeData,
V: crate::ExchangeData,
Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V>,
{
self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))])
self.flat_map_ref(move |key, val| [(key.clone(), val.clone())])
}

/// Extracts elements from an arrangement as a `VecCollection`.
Expand Down Expand Up @@ -271,43 +274,43 @@ where
T1: TraceReader + Clone + 'static,
{
/// A direct implementation of `ReduceCore::reduce_abelian`.
pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_abelian<L, Bu, T2, P>(self, name: &str, mut logic: L, push: P) -> Arranged<G, TraceAgent<T2>>
where
T1: TraceReader<KeyOwn: Ord>,
T1: TraceReader,
T2: for<'a> Trace<
Key<'a>= T1::Key<'a>,
KeyOwn=T1::KeyOwn,
ValOwn: Data,
Time=T1::Time,
Diff: Abelian,
>+'static,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Default>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
P: FnMut(&mut Bu::Input, T1::Key<'_>, &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>) + 'static,
{
self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
self.reduce_core::<_,Bu,T2,_>(name, move |key, input, output, change| {
if !input.is_empty() {
logic(key, input, change);
}
change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
crate::consolidation::consolidate(change);
})
}, push)
}

/// A direct implementation of `ReduceCore::reduce_core`.
pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_core<L, Bu, T2, P>(self, name: &str, logic: L, push: P) -> Arranged<G, TraceAgent<T2>>
where
T1: TraceReader<KeyOwn: Ord>,
T1: TraceReader,
T2: for<'a> Trace<
Key<'a>=T1::Key<'a>,
KeyOwn=T1::KeyOwn,
ValOwn: Data,
Time=T1::Time,
>+'static,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Default>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
P: FnMut(&mut Bu::Input, T1::Key<'_>, &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>) + 'static,
{
use crate::operators::reduce::reduce_trace;
reduce_trace::<_,_,Bu,_,_>(self, name, logic)
reduce_trace::<_,_,Bu,_,_,_>(self, name, logic, push)
}
}

Expand Down
24 changes: 13 additions & 11 deletions differential-dataflow/src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
//! use differential_dataflow::operators::arrange::upsert;
//!
//! let stream = scope.input_from(&mut input);
//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(stream, &"test");
//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>,String,String>(stream, &"test");
//!
//! arranged
//! .as_collection(|k,v| (k.clone(), v.clone()))
Expand Down Expand Up @@ -127,19 +127,21 @@ use super::TraceAgent;
/// This method is only implemented for totally ordered times, as we do not yet
/// understand what a "sequence" of upserts would mean for partially ordered
/// timestamps.
pub fn arrange_from_upsert<G, Bu, Tr>(
stream: Stream<G, Vec<(Tr::KeyOwn, Option<Tr::ValOwn>, G::Timestamp)>>,
pub fn arrange_from_upsert<G, Bu, Tr, K, V>(
stream: Stream<G, Vec<(K, Option<V>, G::Timestamp)>>,
name: &str,
) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp=Tr::Time>,
K: ExchangeData+Hashable+std::hash::Hash,
V: ExchangeData,
Tr: for<'a> Trace<
KeyOwn: ExchangeData+Hashable+std::hash::Hash,
ValOwn: ExchangeData,
Key<'a> = &'a K,
Val<'a> = &'a V,
Time: TotalOrder+ExchangeData,
Diff=isize,
>+'static,
Bu: Builder<Time=G::Timestamp, Input = Vec<((Tr::KeyOwn, Tr::ValOwn), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
Bu: Builder<Time=G::Timestamp, Input = Vec<((K, V), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand All @@ -148,7 +150,7 @@ where

let reader = &mut reader;

let exchange = Exchange::new(move |update: &(Tr::KeyOwn,Option<Tr::ValOwn>,G::Timestamp)| (update.0).hashed().into());
let exchange = Exchange::new(move |update: &(K,Option<V>,G::Timestamp)| (update.0).hashed().into());

let scope = stream.scope();
stream.unary_frontier(exchange, name, move |_capability, info| {
Expand All @@ -174,7 +176,7 @@ where
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());

// For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, Tr::KeyOwn, Option<Tr::ValOwn>)>>::new();
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, K, Option<V>)>>::new();
let mut updates = Vec::new();

move |(input, frontier), output| {
Expand Down Expand Up @@ -237,10 +239,10 @@ where
let mut key_con = Tr::KeyContainer::with_capacity(1);
for (key, mut list) in to_process {

key_con.clear(); key_con.push_own(&key);
key_con.clear(); key_con.push_ref(&key);

// The prior value associated with the key.
let mut prev_value: Option<Tr::ValOwn> = None;
let mut prev_value: Option<V> = None;

// Attempt to find the key in the trace.
trace_cursor.seek_key(&trace_storage, key_con.index(0));
Expand All @@ -252,7 +254,7 @@ where
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
prev_value = Some(Tr::owned_val(val));
prev_value = Some(val.clone());
}
trace_cursor.step_val(&trace_storage);
}
Expand Down
Loading