diff --git a/differential-dataflow/examples/scc.rs b/differential-dataflow/examples/scc.rs new file mode 100644 index 000000000..83ed6c399 --- /dev/null +++ b/differential-dataflow/examples/scc.rs @@ -0,0 +1,96 @@ +//! Minimal SCC benchmark, no diagnostics or logging. +//! +//! Usage: scc [timely args] [nodes [edges [batch [rounds]]]] +//! +//! Mirrors `diagnostics/examples/scc-bench.rs` minus the logging dataflow, +//! for a clean baseline to compare against `interactive/examples/ddir_col`. + +use std::hash::{Hash, Hasher}; +use std::collections::hash_map::DefaultHasher; + +use timely::dataflow::operators::probe::Handle; + +use differential_dataflow::input::Input; +use differential_dataflow::algorithms::graphs::scc::strongly_connected; + +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +fn hash_to_u64(value: &T) -> u64 { + let mut hasher = DefaultHasher::new(); + value.hash(&mut hasher); + hasher.finish() +} + +fn edge_for(index: usize, nodes: usize) -> (usize, usize) { + let h1 = hash_to_u64(&index); + let h2 = hash_to_u64(&h1); + ((h1 as usize) % nodes, (h2 as usize) % nodes) +} + +fn main() { + let timer = std::time::Instant::now(); + + timely::execute_from_args(std::env::args(), move |worker| { + let positional: Vec = std::env::args() + .skip(1) + .filter(|a| !a.starts_with('-')) + .collect(); + let nodes: usize = positional.get(0).and_then(|s| s.parse().ok()).unwrap_or(100_000); + let edges: usize = positional.get(1).and_then(|s| s.parse().ok()).unwrap_or(200_000); + let batch: usize = positional.get(2).and_then(|s| s.parse().ok()).unwrap_or(1_000); + let rounds: usize = positional.get(3).and_then(|s| s.parse().ok()).unwrap_or(usize::MAX); + + if worker.index() == 0 { + println!("nodes: {nodes}, edges: {edges}, batch: {batch}, rounds: {}, workers: {}", + if rounds == usize::MAX { "∞".to_string() } else { rounds.to_string() }, + worker.peers()); + } + + let mut probe = Handle::new(); + let mut input = worker.dataflow(|scope| { + let (input, graph) = scope.new_collection::<(usize, usize), isize>(); + let _scc = strongly_connected(graph).probe_with(&mut probe); + input + }); + + let index = worker.index(); + let peers = worker.peers(); + + // Load initial edges (partitioned across workers). + let timer_load = std::time::Instant::now(); + for i in (0..edges).filter(|i| i % peers == index) { + input.insert(edge_for(i, nodes)); + } + input.advance_to(1); + input.flush(); + while probe.less_than(input.time()) { + worker.step(); + } + if index == 0 { + println!("{:?}\t{:?}\tloaded {edges} edges", timer.elapsed(), timer_load.elapsed()); + } + + // Apply changes in rounds. + for round in 0..rounds { + let timer_round = std::time::Instant::now(); + for i in (0..batch).filter(|i| i % peers == index) { + input.remove(edge_for(round * batch + i, nodes)); + input.insert(edge_for(edges + round * batch + i, nodes)); + } + input.advance_to(round + 2); + input.flush(); + while probe.less_than(input.time()) { + worker.step(); + } + if index == 0 { + println!("{:?}\t{:?}\tround {round} ({} changes)", + timer.elapsed(), timer_round.elapsed(), batch * 2); + } + } + }).unwrap(); + + println!("{:?}\tshut down", timer.elapsed()); +} diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 83f8010a7..2a6b6acee 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -143,13 +143,9 @@ impl V } self.history.sort_by(|x,y| y.cmp(x)); - for index in 1 .. self.history.len() { - self.history[index].1 = self.history[index].1.meet(&self.history[index-1].1); - } + self.history.iter_mut().reduce(|prev, cur| { cur.1.meet_assign(&prev.1); cur }); - HistoryReplay { - replay: self - } + HistoryReplay { replay: self } } }