diff --git a/differential-dataflow/examples/columnar/columnar_support.rs b/differential-dataflow/examples/columnar/columnar_support.rs index ced700bec..149a93b60 100644 --- a/differential-dataflow/examples/columnar/columnar_support.rs +++ b/differential-dataflow/examples/columnar/columnar_support.rs @@ -70,14 +70,17 @@ pub use updates::Updates; pub struct RecordedUpdates { pub updates: Updates, pub records: usize, + /// Whether `updates` is known to be sorted and consolidated + /// (no duplicate (key, val, time) triples, no zero diffs). + pub consolidated: bool, } impl Default for RecordedUpdates { - fn default() -> Self { Self { updates: Default::default(), records: 0 } } + fn default() -> Self { Self { updates: Default::default(), records: 0, consolidated: true } } } impl Clone for RecordedUpdates { - fn clone(&self) -> Self { Self { updates: self.updates.clone(), records: self.records } } + fn clone(&self) -> Self { Self { updates: self.updates.clone(), records: self.records, consolidated: self.consolidated } } } impl timely::Accountable for RecordedUpdates { @@ -133,7 +136,9 @@ mod container_impls { let t2 = T2::to_inner(t1_owned.clone()); new_times.push(&t2); } + // TODO: Assumes Enter (to_inner) is order-preserving on times. RecordedUpdates { + consolidated: self.consolidated, updates: Updates { keys: self.updates.keys, vals: self.updates.vals, @@ -168,6 +173,7 @@ mod container_impls { RecordedUpdates { updates: flat.consolidate(), records: self.records, + consolidated: true, } } } @@ -185,7 +191,9 @@ mod container_impls { output.push((k, v, &new_time, d)); } } - RecordedUpdates { updates: output, records: self.records } + // TODO: Time advancement may not be order preserving, but .. it could be. + // TODO: Before this is consolidated the above would need to be `form`ed. + RecordedUpdates { updates: output, records: self.records, consolidated: false } } } } @@ -222,7 +230,7 @@ mod column_builder { let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); let updates = Updates::form(refs.into_iter()); - self.pending.push_back(RecordedUpdates { updates, records }); + self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); self.current.clear(); } } @@ -260,7 +268,7 @@ mod column_builder { let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); let updates = Updates::form(refs.into_iter()); - self.pending.push_back(RecordedUpdates { updates, records }); + self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); self.current.clear(); } self.empty = self.pending.pop_front(); @@ -327,7 +335,7 @@ mod distributor { let mut first_records = total_records.saturating_sub(non_empty.saturating_sub(1)); for (pusher, output) in pushers.iter_mut().zip(outputs) { if !output.keys.values.is_empty() { - let recorded = RecordedUpdates { updates: output, records: first_records }; + let recorded = RecordedUpdates { updates: output, records: first_records, consolidated: container.consolidated }; first_records = 1; let mut recorded = recorded; Message::push_at(&mut recorded, time.clone(), pusher); @@ -433,8 +441,8 @@ pub mod arrangement { } use crate::{Updates, RecordedUpdates}; - use differential_dataflow::trace::implementations::merge_batcher::{MergeBatcher, InternalMerger}; - type ValBatcher2 = MergeBatcher, TrieChunker, InternalMerger>>; + use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; + type ValBatcher2 = MergeBatcher, TrieChunker, trie_merger::TrieMerger>; /// A chunker that unwraps `RecordedUpdates` into bare `Updates` for the merge batcher. /// The `records` accounting is discarded here — it has served its purpose for exchange. @@ -456,7 +464,9 @@ pub mod arrangement { impl<'a, U: crate::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates> for TrieChunker { fn push_into(&mut self, container: &'a mut RecordedUpdates) { - self.ready.push_back(std::mem::take(&mut container.updates)); + let mut updates = std::mem::take(&mut container.updates); + if !container.consolidated { updates = updates.consolidate(); } + if updates.len() > 0 { self.ready.push_back(updates); } } } @@ -478,8 +488,7 @@ pub mod arrangement { pub mod batcher { - use std::ops::Range; - use columnar::{Borrow, Columnar, Container, Index, Len, Push}; + use columnar::{Borrow, Columnar, Index, Len, Push}; use differential_dataflow::difference::{Semigroup, IsZero}; use timely::progress::frontier::{Antichain, AntichainRef}; use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; @@ -488,237 +497,652 @@ pub mod arrangement { use crate::Updates; impl timely::container::SizableContainer for Updates { - fn at_capacity(&self) -> bool { - use columnar::Len; - self.diffs.values.len() >= 64 * 1024 - } + fn at_capacity(&self) -> bool { self.diffs.values.len() >= 64 * 1024 } fn ensure_capacity(&mut self, _stash: &mut Option) { } } + /// Required by `reduce_abelian`'s bound `Builder::Input: InternalMerge`. + /// Not called at runtime — our batcher uses `TrieMerger` instead. + /// TODO: Relax the bound in DD's reduce to remove this requirement. impl InternalMerge for Updates { - type TimeOwned = U::Time; + fn len(&self) -> usize { unimplemented!() } + fn clear(&mut self) { + use columnar::Clear; + self.keys.clear(); + self.vals.clear(); + self.times.clear(); + self.diffs.clear(); + } + fn merge_from(&mut self, _others: &mut [Self], _positions: &mut [usize]) { unimplemented!() } + fn extract(&mut self, + _upper: AntichainRef, + _frontier: &mut Antichain, + _keep: &mut Self, + _ship: &mut Self, + ) { unimplemented!() } + } + } - fn len(&self) -> usize { self.diffs.values.len() } - fn clear(&mut self) { *self = Self::default(); } + pub mod trie_merger { - #[inline(never)] - fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) { - match others.len() { - 0 => {}, - 1 => { - // Bulk copy: take remaining keys from position onward. - let other = &mut others[0]; - let pos = &mut positions[0]; - if self.keys.values.len() == 0 && *pos == 0 { - std::mem::swap(self, other); - return; + use columnar::{Columnar, Len}; + use timely::PartialOrder; + use timely::progress::frontier::{Antichain, AntichainRef}; + use differential_dataflow::trace::implementations::merge_batcher::Merger; + + use crate::ColumnarUpdate as Update; + use crate::Updates; + + pub struct TrieMerger { + _marker: std::marker::PhantomData, + } + + impl Default for TrieMerger { + fn default() -> Self { Self { _marker: std::marker::PhantomData } } + } + + /// A merging iterator over two sorted iterators. + struct Merging { + iter1: std::iter::Peekable, + iter2: std::iter::Peekable, + } + + impl Iterator for Merging + where + K: Copy + Ord, + V: Copy + Ord, + T: Copy + Ord, + I1: Iterator, + I2: Iterator, + { + type Item = (K, V, T, D); + #[inline] + fn next(&mut self) -> Option { + match (self.iter1.peek(), self.iter2.peek()) { + (Some(a), Some(b)) => { + if (a.0, a.1, a.2) <= (b.0, b.1, b.2) { + self.iter1.next() + } else { + self.iter2.next() } - let other_len = other.keys.values.len(); - self.extend_from_keys(other, *pos .. other_len); - *pos = other_len; - }, - 2 => { - let mut this_sum = U::Diff::default(); - let mut that_sum = U::Diff::default(); - - let (left, right) = others.split_at_mut(1); - let this = &left[0]; - let that = &right[0]; - let this_keys = this.keys.values.borrow(); - let that_keys = that.keys.values.borrow(); - let mut this_key_range = positions[0] .. this_keys.len(); - let mut that_key_range = positions[1] .. that_keys.len(); - - while !this_key_range.is_empty() && !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let this_key = this_keys.get(this_key_range.start); - let that_key = that_keys.get(that_key_range.start); - match this_key.cmp(&that_key) { + } + (Some(_), None) => self.iter1.next(), + (None, Some(_)) => self.iter2.next(), + (None, None) => None, + } + } + } + + /// Build sorted `Updates` chunks from a sorted iterator of refs, + /// using `Updates::form` (which consolidates internally) on batches. + fn form_chunks<'a, U: Update>( + sorted: impl Iterator>>, + output: &mut Vec>, + ) { + let mut sorted = sorted.peekable(); + while sorted.peek().is_some() { + let chunk = Updates::::form((&mut sorted).take(64 * 1024)); + if chunk.len() > 0 { + output.push(chunk); + } + } + } + + impl Merger for TrieMerger + where + U::Time: Ord + PartialOrder + Clone + 'static, + { + type Chunk = Updates; + type Time = U::Time; + + fn merge( + &mut self, + list1: Vec>, + list2: Vec>, + output: &mut Vec>, + _stash: &mut Vec>, + ) { + Self::merge_batches(list1, list2, output, _stash); + } + + fn extract( + &mut self, + merged: Vec, + upper: AntichainRef, + frontier: &mut Antichain, + ship: &mut Vec, + kept: &mut Vec, + _stash: &mut Vec, + ) { + // Flatten the sorted, consolidated chain into refs. + let all = merged.iter().flat_map(|chunk| chunk.iter()); + + // Partition into two sorted streams by time. + let mut time_owned = U::Time::default(); + let mut keep_vec = Vec::new(); + let mut ship_vec = Vec::new(); + for (k, v, t, d) in all { + Columnar::copy_from(&mut time_owned, t); + if upper.less_equal(&time_owned) { + frontier.insert_ref(&time_owned); + keep_vec.push((k, v, t, d)); + } else { + ship_vec.push((k, v, t, d)); + } + } + + // Build chunks via form (which consolidates internally). + form_chunks::(keep_vec.into_iter(), kept); + form_chunks::(ship_vec.into_iter(), ship); + } + + fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { + use timely::Accountable; + (chunk.record_count() as usize, 0, 0, 0) + } + } + + impl TrieMerger + where + U::Time: Ord + PartialOrder + Clone + 'static, + { + /// Iterator-based merge: flatten, merge, consolidate, form. + /// Correct but slow — used as fallback. + #[allow(dead_code)] + fn merge_iterator( + list1: &[Updates], + list2: &[Updates], + output: &mut Vec>, + ) { + let iter1 = list1.iter().flat_map(|chunk| chunk.iter()); + let iter2 = list2.iter().flat_map(|chunk| chunk.iter()); + + let merged = Merging { + iter1: iter1.peekable(), + iter2: iter2.peekable(), + }; + + form_chunks::(merged, output); + } + + /// A merge implementation that operates batch-at-a-time. + #[inline(never)] + fn merge_batches( + list1: Vec>, + list2: Vec>, + output: &mut Vec>, + stash: &mut Vec>, + ) { + + // The design for efficient "batch" merginging of chains of links is: + // 0. We choose a target link size, K, and will keep the average link size at least K and the max size at 2k. + // K should be large enough to amortize some set-up, but not so large that one or two extra break the bank. + // 1. We will repeatedly consider pairs of links, and fully merge one with a prefix of the other. + // The last elements of each link will tell us which of the two suffixes must be held back. + // 2. We then have a chain of as many links as we started with, with potential defects to correct: + // a. A link may contain some number of zeros: we can remove them if we are eager, based on size. + // b. A link may contain more than 2K updates; we can split it. + // c. Two adjacent links may contain fewer than 2K updates; we can meld (careful append) them. + // 3. After a pass of the above, we should have restored the invariant. + // We can try and me smarter and fuse some of the above work rather than explicitly stage results. + // + // The challenging moment is the merge that can start with a suffix of one link, involving a prefix of one link. + // These could be the same link, different links, and generally there is the potential for complexity here. + + let mut builder = ChainBuilder::default(); + + let mut queue1: std::collections::VecDeque<_> = list1.into(); + let mut queue2: std::collections::VecDeque<_> = list2.into(); + + // The first unconsumed update in each block, via (k_idx, v_idx, t_idx), or None if exhausted. + // These are (0,0,0) for a new block, and should become None once there are no remaining updates. + let mut cursor1 = queue1.pop_front().map(|b| ((0,0,0), b)); + let mut cursor2 = queue2.pop_front().map(|b| ((0,0,0), b)); + + // For each pair of batches + while cursor1.is_some() && cursor2.is_some() { + Self::merge_batch(&mut cursor1, &mut cursor2, &mut builder, stash); + if cursor1.is_none() { cursor1 = queue1.pop_front().map(|b| ((0,0,0), b)); } + if cursor2.is_none() { cursor2 = queue2.pop_front().map(|b| ((0,0,0), b)); } + } + + // TODO: create batch for the non-empty cursor. + if let Some(((k,v,t),batch)) = cursor1 { + let mut out_batch = stash.pop().unwrap_or_default(); + let empty: Updates = Default::default(); + write_from_surveys( + &batch, + &empty, + &[Report::This(0, 1)], + &[Report::This(k, batch.keys.values.len())], + &[Report::This(v, batch.vals.values.len())], + &[Report::This(t, batch.times.values.len())], + &mut out_batch, + ); + builder.push(out_batch); + } + if let Some(((k,v,t),batch)) = cursor2 { + let mut out_batch = stash.pop().unwrap_or_default(); + let empty: Updates = Default::default(); + write_from_surveys( + &empty, + &batch, + &[Report::That(0, 1)], + &[Report::That(k, batch.keys.values.len())], + &[Report::That(v, batch.vals.values.len())], + &[Report::That(t, batch.times.values.len())], + &mut out_batch, + ); + builder.push(out_batch); + } + + builder.extend(queue1); + builder.extend(queue2); + *output = builder.done(); + // TODO: Tidy output to satisfy structural invariants. + } + + /// Merge two batches, one completely and another through the corresponding prefix. + /// + /// Each invocation determines the maximum amount of both batches we can merge, determined + /// by comparing the elements at the tails of each batch, and locating the lesser in other. + /// We will merge the whole of the batch containing the lesser, and the prefix up through + /// the lesser element in the other batch, setting the cursor to the first element strictly + /// greater than that lesser element. + /// + /// The algorithm uses a list of `Report` findings to map the interleavings of the layers. + /// Each indicates either a range exclusive to one of the inputs, or a one element common + /// to the layers from both inputs, which must be further explored. This map would normally + /// allow the full merge to happen, but we need to carefully start at each cursor, and end + /// just before the first element greater than the lesser bound. + /// + /// The consumed prefix and disjoint suffix should be single report entries, and it seems + /// fine to first produce all reports and then reflect on the cursors, rather than use the + /// cursors as part of the mapping. + #[inline(never)] + fn merge_batch( + batch1: &mut Option<((usize, usize, usize), Updates)>, + batch2: &mut Option<((usize, usize, usize), Updates)>, + builder: &mut ChainBuilder, + stash: &mut Vec>, + ) { + let ((k0_idx, v0_idx, t0_idx), updates0) = batch1.take().unwrap(); + let ((k1_idx, v1_idx, t1_idx), updates1) = batch2.take().unwrap(); + + use columnar::Borrow; + let keys0 = updates0.keys.borrow(); + let keys1 = updates1.keys.borrow(); + let vals0 = updates0.vals.borrow(); + let vals1 = updates1.vals.borrow(); + let times0 = updates0.times.borrow(); + let times1 = updates1.times.borrow(); + + // Survey the interleaving of the two inputs. + let mut key_survey = survey::>(keys0, keys1, &[Report::Both(0,0)]); + let mut val_survey = survey::>(vals0, vals1, &key_survey); + let mut time_survey = survey::>(times0, times1, &val_survey); + + // We now know enough to start writing into an output batch. + // We should update the input surveys to reflect the subset + // of data that we want. + // + // At most one cursor should be non-zero (assert!). + // A non-zero cursor must correspond to the first entry of the surveys, + // as there is at least one consumed update that precedes the other batch. + // We need to nudge that report forward to align with the cursor, potentially + // squeezing the report to nothing (to the upper bound). + + // We start by updating the surveys to reflect the cursors. + // If either cursor is set, then its batch has an element strictly less than the other batch. + // We therefore expect to find a prefix of This/That at the start of the survey. + if (k0_idx, v0_idx, t0_idx) != (0,0,0) { + let mut done = false; while !done { if let Report::This(l,u) = &mut key_survey[0] { if *u <= k0_idx { key_survey.remove(0); } else { *l = k0_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::This(l,u) = &mut val_survey[0] { if *u <= v0_idx { val_survey.remove(0); } else { *l = v0_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::This(l,u) = &mut time_survey[0] { if *u <= t0_idx { time_survey.remove(0); } else { *l = t0_idx; done = true; } } else { done = true; } } + } + + if (k1_idx, v1_idx, t1_idx) != (0,0,0) { + let mut done = false; while !done { if let Report::That(l,u) = &mut key_survey[0] { if *u <= k1_idx { key_survey.remove(0); } else { *l = k1_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::That(l,u) = &mut val_survey[0] { if *u <= v1_idx { val_survey.remove(0); } else { *l = v1_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::That(l,u) = &mut time_survey[0] { if *u <= t1_idx { time_survey.remove(0); } else { *l = t1_idx; done = true; } } else { done = true; } } + } + + // We want to trim the tails of the surveys to only cover ranges present in both inputs. + // We can determine which was "longer" by looking at the last entry of the bottom layer, + // which tells us which input (or both) contained the last element. + // + // From the bottom layer up, we'll identify the index of the last item, and then determine + // the index of the list it belongs to. We use that index in the next layer, to locate the + // index of the list it belongs to, on upward. + let next_cursor = match time_survey.last().unwrap() { + Report::This(_,_) => { + // Collect the last value indexes known to strictly exceed an entry in the other batch. + let mut t = times0.values.len(); + while let Some(Report::This(l,_)) = time_survey.last() { t = *l; time_survey.pop(); } + let mut v = vals0.values.len(); + while let Some(Report::This(l,_)) = val_survey.last() { v = *l; val_survey.pop(); } + let mut k = keys0.values.len(); + while let Some(Report::This(l,_)) = key_survey.last() { k = *l; key_survey.pop(); } + // Now we may need to correct by nudging down. + if v == times0.len() || times0.bounds.bounds(v).0 > t { v -= 1; } + if k == vals0.len() || vals0.bounds.bounds(k).0 > v { k -= 1; } + Some(Ok((k,v,t))) + } + Report::Both(_,_) => { None } + Report::That(_,_) => { + // Collect the last value indexes known to strictly exceed an entry in the other batch. + let mut t = times1.values.len(); + while let Some(Report::That(l,_)) = time_survey.last() { t = *l; time_survey.pop(); } + let mut v = vals1.values.len(); + while let Some(Report::That(l,_)) = val_survey.last() { v = *l; val_survey.pop(); } + let mut k = keys1.values.len(); + while let Some(Report::That(l,_)) = key_survey.last() { k = *l; key_survey.pop(); } + // Now we may need to correct by nudging down. + if v == times1.len() || times1.bounds.bounds(v).0 > t { v -= 1; } + if k == vals1.len() || vals1.bounds.bounds(k).0 > v { k -= 1; } + Some(Err((k,v,t))) + } + }; + + // Having updated the surveys, we now copy over the ranges they identify. + let mut out_batch = stash.pop().unwrap_or_default(); + // TODO: We should be able to size `out_batch` pretty accurately from the survey. + write_from_surveys(&updates0, &updates1, &[Report::Both(0,0)], &key_survey, &val_survey, &time_survey, &mut out_batch); + builder.push(out_batch); + + match next_cursor { + Some(Ok(kvt)) => { *batch1 = Some((kvt, updates0)); } + Some(Err(kvt)) => {*batch2 = Some((kvt, updates1)); } + None => { } + } + } + + } + + /// Write merged output from four levels of survey reports. + /// + /// Each layer is written independently: `write_layer` handles keys, vals, + /// and times; `write_diffs` handles diff consolidation. + #[inline(never)] + fn write_from_surveys( + updates0: &Updates, + updates1: &Updates, + root_survey: &[Report], + key_survey: &[Report], + val_survey: &[Report], + time_survey: &[Report], + output: &mut Updates, + ) { + use columnar::Borrow; + + write_layer(updates0.keys.borrow(), updates1.keys.borrow(), root_survey, key_survey, &mut output.keys); + write_layer(updates0.vals.borrow(), updates1.vals.borrow(), key_survey, val_survey, &mut output.vals); + write_layer(updates0.times.borrow(), updates1.times.borrow(), val_survey, time_survey, &mut output.times); + write_diffs::(updates0.diffs.borrow(), updates1.diffs.borrow(), time_survey, &mut output.diffs); + } + + /// From two sequences of interleaved lists, map out the interleaving of their values. + /// + /// The sequence of input reports identify constraints on the sorted order of lists in the two inputs, + /// callout out ranges of each that are exclusively order, and elements that have equal prefixes and + /// therefore "overlap" and should be further investigated through the values of the lists. + /// + /// The output should have the same form but for the next layer: subject to the ordering of `reports`, + /// a similar report for the values of the two lists, appropriate for the next layer. + #[inline(never)] + pub fn survey<'a, C: columnar::Container: Ord>>( + lists0: as columnar::Borrow>::Borrowed<'a>, + lists1: as columnar::Borrow>::Borrowed<'a>, + reports: &[Report], + ) -> Vec { + use columnar::Index; + let mut output = Vec::with_capacity(reports.len()); // may grow larger, but at least this large. + for report in reports.iter() { + match report { + Report::This(lower0, upper0) => { + let (new_lower, _) = lists0.bounds.bounds(*lower0); + let (_, new_upper) = lists0.bounds.bounds(*upper0-1); + output.push(Report::This(new_lower, new_upper)); + } + Report::Both(index0, index1) => { + + // Fetch the bounds from the layers. + let (mut lower0, upper0) = lists0.bounds.bounds(*index0); + let (mut lower1, upper1) = lists1.bounds.bounds(*index1); + + // Scour the intersecting range for matches. + while lower0 < upper0 && lower1 < upper1 { + let val0 = lists0.values.get(lower0); + let val1 = lists1.values.get(lower1); + match val0.cmp(&val1) { std::cmp::Ordering::Less => { - let lower = this_key_range.start; - gallop(this_keys, &mut this_key_range, |x| x < that_key); - self.extend_from_keys(this, lower .. this_key_range.start); + let start = lower0; + lower0 += 1; + gallop(lists0.values, &mut lower0, upper0, |x| x < val1); + output.push(Report::This(start, lower0)); }, std::cmp::Ordering::Equal => { - let values_len = self.vals.values.len(); - let mut this_val_range = this.vals_bounds(this_key_range.start .. this_key_range.start+1); - let mut that_val_range = that.vals_bounds(that_key_range.start .. that_key_range.start+1); - while !this_val_range.is_empty() && !that_val_range.is_empty() { - let this_val = this.vals.values.borrow().get(this_val_range.start); - let that_val = that.vals.values.borrow().get(that_val_range.start); - match this_val.cmp(&that_val) { - std::cmp::Ordering::Less => { - let lower = this_val_range.start; - gallop(this.vals.values.borrow(), &mut this_val_range, |x| x < that_val); - self.extend_from_vals(this, lower .. this_val_range.start); - }, - std::cmp::Ordering::Equal => { - let updates_len = self.times.values.len(); - let mut this_time_range = this.times_bounds(this_val_range.start .. this_val_range.start+1); - let mut that_time_range = that.times_bounds(that_val_range.start .. that_val_range.start+1); - while !this_time_range.is_empty() && !that_time_range.is_empty() { - let this_time = this.times.values.borrow().get(this_time_range.start); - let this_diff = this.diffs.values.borrow().get(this_time_range.start); - let that_time = that.times.values.borrow().get(that_time_range.start); - let that_diff = that.diffs.values.borrow().get(that_time_range.start); - match this_time.cmp(&that_time) { - std::cmp::Ordering::Less => { - let lower = this_time_range.start; - gallop(this.times.values.borrow(), &mut this_time_range, |x| x < that_time); - self.times.values.extend_from_self(this.times.values.borrow(), lower .. this_time_range.start); - self.diffs.extend_from_self(this.diffs.borrow(), lower .. this_time_range.start); - }, - std::cmp::Ordering::Equal => { - this_sum.copy_from(this_diff); - that_sum.copy_from(that_diff); - this_sum.plus_equals(&that_sum); - if !this_sum.is_zero() { - self.times.values.push(this_time); - self.diffs.values.push(&this_sum); - self.diffs.bounds.push(self.diffs.values.len() as u64); - } - this_time_range.start += 1; - that_time_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_time_range.start; - gallop(that.times.values.borrow(), &mut that_time_range, |x| x < this_time); - self.times.values.extend_from_self(that.times.values.borrow(), lower .. that_time_range.start); - self.diffs.extend_from_self(that.diffs.borrow(), lower .. that_time_range.start); - }, - } - } - // Remaining from this side - if !this_time_range.is_empty() { - self.times.values.extend_from_self(this.times.values.borrow(), this_time_range.clone()); - self.diffs.extend_from_self(this.diffs.borrow(), this_time_range.clone()); - } - // Remaining from that side - if !that_time_range.is_empty() { - self.times.values.extend_from_self(that.times.values.borrow(), that_time_range.clone()); - self.diffs.extend_from_self(that.diffs.borrow(), that_time_range.clone()); - } - if self.times.values.len() > updates_len { - self.times.bounds.push(self.times.values.len() as u64); - self.vals.values.push(this_val); - } - this_val_range.start += 1; - that_val_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_val_range.start; - gallop(that.vals.values.borrow(), &mut that_val_range, |x| x < this_val); - self.extend_from_vals(that, lower .. that_val_range.start); - }, - } - } - self.extend_from_vals(this, this_val_range); - self.extend_from_vals(that, that_val_range); - if self.vals.values.len() > values_len { - self.vals.bounds.push(self.vals.values.len() as u64); - self.keys.values.push(this_key); - } - this_key_range.start += 1; - that_key_range.start += 1; + output.push(Report::Both(lower0, lower1)); + lower0 += 1; + lower1 += 1; }, std::cmp::Ordering::Greater => { - let lower = that_key_range.start; - gallop(that_keys, &mut that_key_range, |x| x < this_key); - self.extend_from_keys(that, lower .. that_key_range.start); + let start = lower1; + lower1 += 1; + gallop(lists1.values, &mut lower1, upper1, |x| x < val0); + output.push(Report::That(start, lower1)); }, } } - // Copy remaining from whichever side has data, up to capacity. - while !this_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let lower = this_key_range.start; - this_key_range.start = this_key_range.end; // take all remaining - self.extend_from_keys(this, lower .. this_key_range.start); - } - while !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let lower = that_key_range.start; - that_key_range.start = that_key_range.end; - self.extend_from_keys(that, lower .. that_key_range.start); - } - positions[0] = this_key_range.start; - positions[1] = that_key_range.start; - }, - n => unimplemented!("{n}-way merge not supported"), + if lower0 < upper0 { output.push(Report::This(lower0, upper0)); } + if lower1 < upper1 { output.push(Report::That(lower1, upper1)); } + + } + Report::That(lower1, upper1) => { + let (new_lower, _) = lists1.bounds.bounds(*lower1); + let (_, new_upper) = lists1.bounds.bounds(*upper1-1); + output.push(Report::That(new_lower, new_upper)); + } } } - fn extract( - &mut self, - upper: AntichainRef, - frontier: &mut Antichain, - keep: &mut Self, - ship: &mut Self, - ) { - let mut time = U::Time::default(); - for key_idx in 0 .. self.keys.values.len() { - let key = self.keys.values.borrow().get(key_idx); - let keep_vals_len = keep.vals.values.len(); - let ship_vals_len = ship.vals.values.len(); - for val_idx in self.vals_bounds(key_idx..key_idx+1) { - let val = self.vals.values.borrow().get(val_idx); - let keep_times_len = keep.times.values.len(); - let ship_times_len = ship.times.values.len(); - for time_idx in self.times_bounds(val_idx..val_idx+1) { - let t = self.times.values.borrow().get(time_idx); - let diff = self.diffs.values.borrow().get(time_idx); - time.copy_from(t); - if upper.less_equal(&time) { - frontier.insert_ref(&time); - keep.times.values.push(t); - keep.diffs.values.push(diff); - keep.diffs.bounds.push(keep.diffs.values.len() as u64); - } - else { - ship.times.values.push(t); - ship.diffs.values.push(diff); - ship.diffs.bounds.push(ship.diffs.values.len() as u64); + output + } + + /// Write one layer of merged output from a list survey and item survey. + /// + /// The list survey describes which lists to produce (from the layer above). + /// The item survey describes how the items within those lists interleave. + /// Both surveys are consumed completely; a mismatch is a bug. + /// + /// Pruning (from cursor adjustments) can affect the first and last list + /// survey entries: the item survey's ranges may not match the natural + /// bounds of those lists. Middle entries are guaranteed unpruned and can + /// be bulk-copied. + #[inline(never)] + pub fn write_layer<'a, C: columnar::Container: Ord>>( + lists0: as columnar::Borrow>::Borrowed<'a>, + lists1: as columnar::Borrow>::Borrowed<'a>, + list_survey: &[Report], + item_survey: &[Report], + output: &mut crate::updates::Lists, + ) { + use columnar::{Container, Index, Len, Push}; + + let mut item_idx = 0; + + for (pos, list_report) in list_survey.iter().enumerate() { + let is_first = pos == 0; + let is_last = pos == list_survey.len() - 1; + let may_be_pruned = is_first || is_last; + + match list_report { + Report::This(lo, hi) => { + let Report::This(item_lo, item_hi) = item_survey[item_idx] else { unreachable!("Expected This in item survey for This list") }; + item_idx += 1; + if may_be_pruned { + // Item range may not match natural bounds; copy items in bulk + // but compute per-list bounds from natural bounds clamped to + // the item range. + let base = output.values.len(); + output.values.extend_from_self(lists0.values, item_lo..item_hi); + for i in *lo..*hi { + let (_, nat_hi) = lists0.bounds.bounds(i); + output.bounds.push((base + nat_hi.min(item_hi) - item_lo) as u64); } + } else { + output.extend_from_self(lists0, *lo..*hi); } - if keep.times.values.len() > keep_times_len { - keep.times.bounds.push(keep.times.values.len() as u64); - keep.vals.values.push(val); - } - if ship.times.values.len() > ship_times_len { - ship.times.bounds.push(ship.times.values.len() as u64); - ship.vals.values.push(val); + } + Report::That(lo, hi) => { + let Report::That(item_lo, item_hi) = item_survey[item_idx] else { unreachable!("Expected That in item survey for That list") }; + item_idx += 1; + if may_be_pruned { + let base = output.values.len(); + output.values.extend_from_self(lists1.values, item_lo..item_hi); + for i in *lo..*hi { + let (_, nat_hi) = lists1.bounds.bounds(i); + output.bounds.push((base + nat_hi.min(item_hi) - item_lo) as u64); + } + } else { + output.extend_from_self(lists1, *lo..*hi); } } - if keep.vals.values.len() > keep_vals_len { - keep.vals.bounds.push(keep.vals.values.len() as u64); - keep.keys.values.push(key); + Report::Both(i0, i1) => { + // Merge: consume item survey entries until both sides are covered. + let (mut c0, end0) = lists0.bounds.bounds(*i0); + let (mut c1, end1) = lists1.bounds.bounds(*i1); + while (c0 < end0 || c1 < end1) && item_idx < item_survey.len() { + match item_survey[item_idx] { + Report::This(lo, hi) => { + if lo >= end0 { break; } + output.values.extend_from_self(lists0.values, lo..hi); + c0 = hi; + } + Report::That(lo, hi) => { + if lo >= end1 { break; } + output.values.extend_from_self(lists1.values, lo..hi); + c1 = hi; + } + Report::Both(v0, v1) => { + if v0 >= end0 && v1 >= end1 { break; } + output.values.push(lists0.values.get(v0)); + c0 = v0 + 1; + c1 = v1 + 1; + } + } + item_idx += 1; + } + output.bounds.push(output.values.len() as u64); } - if ship.vals.values.len() > ship_vals_len { - ship.vals.bounds.push(ship.vals.values.len() as u64); - ship.keys.values.push(key); + } + } + } + + /// Write the diff layer from a time survey and two diff inputs. + /// + /// The time survey is the item-level survey for the time layer, which + /// doubles as the list survey for diffs (one diff list per time entry). + /// + /// - `This(lo, hi)`: bulk-copy diff lists from input 0. + /// - `That(lo, hi)`: bulk-copy diff lists from input 1. + /// - `Both(t0, t1)`: consolidate the two singleton diffs. Push `[sum]` + /// if non-zero, or an empty list `[]` if they cancel. + #[inline(never)] + pub fn write_diffs( + diffs0: > as columnar::Borrow>::Borrowed<'_>, + diffs1: > as columnar::Borrow>::Borrowed<'_>, + time_survey: &[Report], + output: &mut crate::updates::Lists>, + ) { + use columnar::{Columnar, Container, Index, Len, Push}; + use differential_dataflow::difference::{Semigroup, IsZero}; + + for report in time_survey.iter() { + match report { + Report::This(lo, hi) => { output.extend_from_self(diffs0, *lo..*hi); } + Report::That(lo, hi) => { output.extend_from_self(diffs1, *lo..*hi); } + Report::Both(t0, t1) => { + // Read singleton diffs via list bounds, consolidate. + let (d0_lo, d0_hi) = diffs0.bounds.bounds(*t0); + let (d1_lo, d1_hi) = diffs1.bounds.bounds(*t1); + assert_eq!(d0_hi - d0_lo, 1, "Expected singleton diff list at t0={t0}"); + assert_eq!(d1_hi - d1_lo, 1, "Expected singleton diff list at t1={t1}"); + let mut diff: U::Diff = Columnar::into_owned(diffs0.values.get(d0_lo)); + diff.plus_equals(&Columnar::into_owned(diffs1.values.get(d1_lo))); + if !diff.is_zero() { output.values.push(&diff); } + output.bounds.push(output.values.len() as u64); } } } } + /// Increments `index` until just after the last element of `input` to satisfy `cmp`. + /// + /// The method assumes that `cmp` is monotonic, never becoming true once it is false. + /// If an `upper` is supplied, it acts as a constraint on the interval of `input` explored. #[inline(always)] - pub(crate) fn gallop(input: TC, range: &mut Range, mut cmp: impl FnMut(::Ref) -> bool) { + pub(crate) fn gallop(input: C, lower: &mut usize, upper: usize, mut cmp: impl FnMut(::Ref) -> bool) { // if empty input, or already >= element, return - if !Range::::is_empty(range) && cmp(input.get(range.start)) { + if *lower < upper && cmp(input.get(*lower)) { let mut step = 1; - while range.start + step < range.end && cmp(input.get(range.start + step)) { - range.start += step; + while *lower + step < upper && cmp(input.get(*lower + step)) { + *lower += step; step <<= 1; } step >>= 1; while step > 0 { - if range.start + step < range.end && cmp(input.get(range.start + step)) { - range.start += step; + if *lower + step < upper && cmp(input.get(*lower + step)) { + *lower += step; } step >>= 1; } - range.start += 1; + *lower += 1; + } + } + + /// A report we would expect to see in a sequence about two layers. + /// + /// A sequence of these reports reveal an ordered traversal of the keys + /// of two layers, with ranges exclusive to one, ranges exclusive to the + /// other, and individual elements (not ranges) common to both. + #[derive(Copy, Clone, Columnar, Debug)] + pub enum Report { + /// Range of indices in this input. + This(usize, usize), + /// Range of indices in that input. + That(usize, usize), + /// Matching indices in both inputs. + Both(usize, usize), + } + + pub struct ChainBuilder { + updates: Vec>, + } + + impl Default for ChainBuilder { fn default() -> Self { Self { updates: Default::default() } } } + + impl ChainBuilder { + fn push(&mut self, mut link: Updates) { + link = link.filter_zero(); + if link.len() > 0 { + if let Some(last) = self.updates.last_mut() { + if last.len() + link.len() < 2 * 64 * 1024 { + let mut build = crate::updates::UpdatesBuilder::new_from(std::mem::take(last)); + build.meld(&link); + *last = build.done(); + } + else { self.updates.push(link); } + + } + else { self.updates.push(link); } + } } + fn extend(&mut self, iter: impl IntoIterator>) { for link in iter { self.push(link); }} + fn done(self) -> Vec> { self.updates } } } @@ -748,7 +1172,7 @@ pub mod arrangement { } pub struct ValMirror { - current: Updates, + chunks: Vec>, } impl differential_dataflow::trace::Builder for ValMirror { type Time = U::Time; @@ -756,64 +1180,60 @@ pub mod arrangement { type Output = OrdValBatch>; fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { - Self { current: Updates::default() } + Self { chunks: Vec::new() } } fn push(&mut self, chunk: &mut Self::Input) { - use columnar::Len; - let len = chunk.keys.values.len(); - if len > 0 { - self.current.extend_from_keys(chunk, 0..len); + if chunk.len() > 0 { + self.chunks.push(std::mem::take(chunk)); } } fn done(self, description: Description) -> Self::Output { - let mut chain = if self.current.len() > 0 { - vec![self.current] - } else { - vec![] - }; + let mut chain = self.chunks; Self::seal(&mut chain, description) } fn seal(chain: &mut Vec, description: Description) -> Self::Output { - if chain.len() == 0 { + use columnar::Len; + + // Meld sorted, consolidated chain entries in order. + // Pre-allocate to avoid reallocations during meld. + use columnar::{Borrow, Container}; + let mut updates = Updates::::default(); + updates.keys.reserve_for(chain.iter().map(|c| c.keys.borrow())); + updates.vals.reserve_for(chain.iter().map(|c| c.vals.borrow())); + updates.times.reserve_for(chain.iter().map(|c| c.times.borrow())); + updates.diffs.reserve_for(chain.iter().map(|c| c.diffs.borrow())); + let mut builder = crate::updates::UpdatesBuilder::new_from(updates); + for chunk in chain.iter() { + builder.meld(chunk); + } + let merged = builder.done(); + chain.clear(); + + let updates = Len::len(&merged.diffs.values); + if updates == 0 { let storage = OrdValStorage { keys: Default::default(), vals: Default::default(), upds: Default::default(), }; OrdValBatch { storage, description, updates: 0 } - } - else if chain.len() == 1 { - use columnar::Len; - let storage = chain.pop().unwrap(); - let updates = storage.diffs.values.len(); - let val_offs = strides_to_offset_list(&storage.vals.bounds, storage.keys.values.len()); - let time_offs = strides_to_offset_list(&storage.times.bounds, storage.vals.values.len()); + } else { + let val_offs = strides_to_offset_list(&merged.vals.bounds, Len::len(&merged.keys.values)); + let time_offs = strides_to_offset_list(&merged.times.bounds, Len::len(&merged.vals.values)); let storage = OrdValStorage { - keys: Coltainer { container: storage.keys.values }, + keys: Coltainer { container: merged.keys.values }, vals: Vals { offs: val_offs, - vals: Coltainer { container: storage.vals.values }, + vals: Coltainer { container: merged.vals.values }, }, upds: Upds { offs: time_offs, - times: Coltainer { container: storage.times.values }, - diffs: Coltainer { container: storage.diffs.values }, + times: Coltainer { container: merged.times.values }, + diffs: Coltainer { container: merged.diffs.values }, }, }; OrdValBatch { storage, description, updates } } - else { - use columnar::Len; - let mut merged = chain.remove(0); - for other in chain.drain(..) { - let len = other.keys.values.len(); - if len > 0 { - merged.extend_from_keys(&other, 0..len); - } - } - chain.push(merged); - Self::seal(chain, description) - } } } @@ -888,6 +1308,57 @@ pub mod updates { lower..upper } + /// A streaming consolidation iterator for sorted `(key, val, time, diff)` data. + /// + /// Accumulates diffs for equal `(key, val, time)` triples, yielding at most + /// one output per distinct triple, with a non-zero accumulated diff. + /// Input must be sorted by `(key, val, time)`. + pub struct Consolidating { + iter: std::iter::Peekable, + diff: D, + } + + impl Consolidating + where + K: Copy + Eq, + V: Copy + Eq, + T: Copy + Eq, + D: Semigroup + IsZero + Default, + I: Iterator, + { + pub fn new(iter: I) -> Self { + Self { iter: iter.peekable(), diff: D::default() } + } + } + + impl Iterator for Consolidating + where + K: Copy + Eq, + V: Copy + Eq, + T: Copy + Eq, + D: Semigroup + IsZero + Default + Clone, + I: Iterator, + { + type Item = (K, V, T, D); + fn next(&mut self) -> Option { + loop { + let (k, v, t, d) = self.iter.next()?; + self.diff = d; + while let Some(&(k2, v2, t2, _)) = self.iter.peek() { + if k2 == k && v2 == v && t2 == t { + let (_, _, _, d2) = self.iter.next().unwrap(); + self.diff.plus_equals(&d2); + } else { + break; + } + } + if !self.diff.is_zero() { + return Some((k, v, t, self.diff.clone())); + } + } + } + } + impl Updates { pub fn vals_bounds(&self, key_range: std::ops::Range) -> std::ops::Range { @@ -933,201 +1404,75 @@ pub mod updates { self.diffs.extend_from_self(other.diffs.borrow(), time_range); } - /// Forms a consolidated `Updates` from sorted `(key, val, time, diff)` refs. - /// - /// Tracks a `prev` reference to the previous element. On each new element, - /// compares against `prev` to detect key/val/time changes. Only pushes - /// accumulated diffs when they are nonzero, and only emits times/vals/keys - /// that have at least one nonzero diff beneath them. - pub fn form<'a>(mut sorted: impl Iterator>>) -> Self { - - let mut output = Self::default(); - let mut diff_stash = U::Diff::default(); - let mut diff_temp = U::Diff::default(); - - if let Some(first) = sorted.next() { - - let mut prev = first; - Columnar::copy_from(&mut diff_stash, prev.3); - - for curr in sorted { - let key_differs = ContainerOf::::reborrow_ref(curr.0) != ContainerOf::::reborrow_ref(prev.0); - let val_differs = key_differs || ContainerOf::::reborrow_ref(curr.1) != ContainerOf::::reborrow_ref(prev.1); - let time_differs = val_differs || ContainerOf::::reborrow_ref(curr.2) != ContainerOf::::reborrow_ref(prev.2); - - if time_differs { - // Flush the accumulated diff for prev's (key, val, time). - if !diff_stash.is_zero() { - // We have a real update to emit. Push time (and val/key - // if this is the first time under them). - let times_len = output.times.values.len(); - let vals_len = output.vals.values.len(); - - if val_differs { - // Seal the previous val's time list, if any times were emitted. - if times_len > 0 { - output.times.bounds.push(times_len as u64); - } - if key_differs { - // Seal the previous key's val list, if any vals were emitted. - if vals_len > 0 { - output.vals.bounds.push(vals_len as u64); - } - output.keys.values.push(prev.0); - } - output.vals.values.push(prev.1); - } - output.times.values.push(prev.2); - output.diffs.values.push(&diff_stash); - output.diffs.bounds.push(output.diffs.values.len() as u64); - } - Columnar::copy_from(&mut diff_stash, curr.3); - } else { - // Same (key, val, time): accumulate diff. - Columnar::copy_from(&mut diff_temp, curr.3); - diff_stash.plus_equals(&diff_temp); - } - prev = curr; - } - - // Flush the final accumulated diff. - if !diff_stash.is_zero() { - let keys_len = output.keys.values.len(); - let vals_len = output.vals.values.len(); - let times_len = output.times.values.len(); - let need_key = keys_len == 0 || ContainerOf::::reborrow_ref(prev.0) != output.keys.values.borrow().get(keys_len - 1); - let need_val = need_key || vals_len == 0 || ContainerOf::::reborrow_ref(prev.1) != output.vals.values.borrow().get(vals_len - 1); - - if need_val { - if times_len > 0 { - output.times.bounds.push(times_len as u64); - } - if need_key { - if vals_len > 0 { - output.vals.bounds.push(vals_len as u64); - } - output.keys.values.push(prev.0); - } - output.vals.values.push(prev.1); - } - output.times.values.push(prev.2); - output.diffs.values.push(&diff_stash); - output.diffs.bounds.push(output.diffs.values.len() as u64); - } - - // Seal the final groups at each level. - if !output.times.values.is_empty() { - output.times.bounds.push(output.times.values.len() as u64); - } - if !output.vals.values.is_empty() { - output.vals.bounds.push(output.vals.values.len() as u64); - } - if !output.keys.values.is_empty() { - output.keys.bounds.push(output.keys.values.len() as u64); - } - } - - output + /// Forms a consolidated `Updates` trie from unsorted `(key, val, time, diff)` refs. + pub fn form_unsorted<'a>(unsorted: impl Iterator>>) -> Self { + let mut data = unsorted.collect::>(); + data.sort(); + Self::form(data.into_iter()) } - /// Consolidates into canonical trie form: - /// single outer key list, all lists sorted and deduplicated, - /// diff lists are singletons (or absent if cancelled). - pub fn consolidate(self) -> Self { - - let Self { keys, vals, times, diffs } = self; - - let keys_b = keys.borrow(); - let vals_b = vals.borrow(); - let times_b = times.borrow(); - let diffs_b = diffs.borrow(); + /// Forms a consolidated `Updates` trie from sorted `(key, val, time, diff)` refs. + pub fn form<'a>(sorted: impl Iterator>>) -> Self { - // Flatten to index tuples: [key_abs, val_abs, time_abs, diff_abs]. - let mut tuples: Vec<[usize; 4]> = Vec::new(); - for outer in 0..Len::len(&keys_b) { - for k in child_range(keys_b.bounds, outer) { - for v in child_range(vals_b.bounds, k) { - for t in child_range(times_b.bounds, v) { - for d in child_range(diffs_b.bounds, t) { - tuples.push([k, v, t, d]); - } - } - } - } - } - - // Sort by (key, val, time). Diff is payload. - tuples.sort_by(|a, b| { - keys_b.values.get(a[0]).cmp(&keys_b.values.get(b[0])) - .then_with(|| vals_b.values.get(a[1]).cmp(&vals_b.values.get(b[1]))) - .then_with(|| times_b.values.get(a[2]).cmp(×_b.values.get(b[2]))) - }); + // Step 1: Streaming consolidation — accumulate diffs, drop zeros. + let consolidated = Consolidating::new( + sorted.map(|(k, v, t, d)| (k, v, t, ::into_owned(d))) + ); - // Build consolidated output, bottom-up cancellation. + // Step 2: Build the trie from consolidated, sorted, non-zero data. let mut output = Self::default(); - let mut diff_stash = U::Diff::default(); - let mut diff_temp = U::Diff::default(); - - let mut idx = 0; - while idx < tuples.len() { - let key_ref = keys_b.values.get(tuples[idx][0]); - let key_start_vals = output.vals.values.len(); - - // All entries with this key. - while idx < tuples.len() && keys_b.values.get(tuples[idx][0]) == key_ref { - let val_ref = vals_b.values.get(tuples[idx][1]); - let val_start_times = output.times.values.len(); - - // All entries with this (key, val). - while idx < tuples.len() - && keys_b.values.get(tuples[idx][0]) == key_ref - && vals_b.values.get(tuples[idx][1]) == val_ref - { - let time_ref = times_b.values.get(tuples[idx][2]); - - // Sum all diffs for this (key, val, time). - Columnar::copy_from(&mut diff_stash, diffs_b.values.get(tuples[idx][3])); - idx += 1; - while idx < tuples.len() - && keys_b.values.get(tuples[idx][0]) == key_ref - && vals_b.values.get(tuples[idx][1]) == val_ref - && times_b.values.get(tuples[idx][2]) == time_ref - { - Columnar::copy_from(&mut diff_temp, diffs_b.values.get(tuples[idx][3])); - diff_stash.plus_equals(&diff_temp); - idx += 1; - } - - // Emit time + singleton diff if nonzero. - if !diff_stash.is_zero() { - output.times.values.push(time_ref); - output.diffs.values.push(&diff_stash); - output.diffs.bounds.push(output.diffs.values.len() as u64); - } + let mut updates = consolidated; + if let Some((key, val, time, diff)) = updates.next() { + let mut prev = (key, val, time); + output.keys.values.push(key); + output.vals.values.push(val); + output.times.values.push(time); + output.diffs.values.push(&diff); + output.diffs.bounds.push(output.diffs.values.len() as u64); + + // As we proceed, seal up known complete runs. + for (key, val, time, diff) in updates { + + // If keys differ, record key and seal vals and times. + if key != prev.0 { + output.vals.bounds.push(output.vals.values.len() as u64); + output.times.bounds.push(output.times.values.len() as u64); + output.keys.values.push(key); + output.vals.values.push(val); } - - // Seal time list for this val; emit val if any times survived. - if output.times.values.len() > val_start_times { + // If vals differ, record val and seal times. + else if val != prev.1 { output.times.bounds.push(output.times.values.len() as u64); - output.vals.values.push(val_ref); + output.vals.values.push(val); + } + else { + // We better not find a duplicate time. + assert!(time != prev.2); } - } - // Seal val list for this key; emit key if any vals survived. - if output.vals.values.len() > key_start_vals { - output.vals.bounds.push(output.vals.values.len() as u64); - output.keys.values.push(key_ref); + // Always record (time, diff). + output.times.values.push(time); + output.diffs.values.push(&diff); + output.diffs.bounds.push(output.diffs.values.len() as u64); + + prev = (key, val, time); } - } - // Seal the single outer key list. - if !output.keys.values.is_empty() { + // Seal up open lists. output.keys.bounds.push(output.keys.values.len() as u64); + output.vals.bounds.push(output.vals.values.len() as u64); + output.times.bounds.push(output.times.values.len() as u64); } output } + /// Consolidates into canonical trie form: + /// single outer key list, all lists sorted and deduplicated, + /// diff lists are singletons (or absent if cancelled). + pub fn consolidate(self) -> Self { Self::form_unsorted(self.iter()) } + pub fn filter_zero(self) -> Self { Self::form(self.iter()) } + /// The number of leaf-level diff entries (total updates). pub fn len(&self) -> usize { self.diffs.values.len() } } @@ -1203,6 +1548,179 @@ pub mod updates { fn into_bytes(&self, _writer: &mut W) { unimplemented!() } } + /// An incremental trie builder that accepts sorted, consolidated `Updates` chunks + /// and melds them into a single `Updates` trie. + /// + /// The internal `Updates` has open (unsealed) bounds at the keys, vals, and times + /// levels — the last group at each level has its values pushed but no corresponding + /// bounds entry. `diffs.bounds` is always 1:1 with `times.values`. + /// + /// `meld` accepts a consolidated `Updates` whose first `(key, val, time)` is + /// strictly greater than the builder's last `(key, val, time)`. The key and val + /// may equal the builder's current open key/val, as long as the time is greater. + /// + /// `done` seals all open bounds and returns the completed `Updates`. + pub struct UpdatesBuilder { + /// Non-empty, consolidated updates. + updates: Updates, + } + + impl UpdatesBuilder { + /// Construct a new builder from consolidated, sealed updates. + /// + /// Unseals the last group at keys, vals, and times levels so that + /// subsequent `meld` calls can extend the open groups. + /// If the updates are not consolidated none of this works. + pub fn new_from(mut updates: Updates) -> Self { + use columnar::Len; + if Len::len(&updates.keys.values) > 0 { + updates.keys.bounds.pop(); + updates.vals.bounds.pop(); + updates.times.bounds.pop(); + } + Self { updates } + } + + /// Meld a sorted, consolidated `Updates` chunk into this builder. + /// + /// The chunk's first `(key, val, time)` must be strictly greater than + /// the builder's last `(key, val, time)`. Keys and vals may overlap + /// (continue the current group), but times must be strictly increasing + /// within the same `(key, val)`. + pub fn meld(&mut self, chunk: &Updates) { + use columnar::{Borrow, Index, Len}; + + if chunk.len() == 0 { return; } + + // Empty builder: clone the chunk and unseal it. + if Len::len(&self.updates.keys.values) == 0 { + self.updates = chunk.clone(); + self.updates.keys.bounds.pop(); + self.updates.vals.bounds.pop(); + self.updates.times.bounds.pop(); + return; + } + + // Pre-compute boundary comparisons before mutating. + let keys_match = { + let skb = self.updates.keys.values.borrow(); + let ckb = chunk.keys.values.borrow(); + skb.get(Len::len(&skb) - 1) == ckb.get(0) + }; + let vals_match = keys_match && { + let svb = self.updates.vals.values.borrow(); + let cvb = chunk.vals.values.borrow(); + svb.get(Len::len(&svb) - 1) == cvb.get(0) + }; + + let chunk_num_keys = Len::len(&chunk.keys.values); + let chunk_num_vals = Len::len(&chunk.vals.values); + let chunk_num_times = Len::len(&chunk.times.values); + + // Child ranges for the first element at each level of the chunk. + let first_key_vals = child_range(chunk.vals.borrow().bounds, 0); + let first_val_times = child_range(chunk.times.borrow().bounds, 0); + + // There is a first position where coordinates disagree. + // Strictly beyond that position: seal bounds, extend lists, re-open the last bound. + // At that position: meld the first list, extend subsequent lists, re-open. + let mut differ = false; + + // --- Keys --- + if keys_match { + // Skip the duplicate first key; add remaining keys. + if chunk_num_keys > 1 { + self.updates.keys.values.extend_from_self(chunk.keys.values.borrow(), 1..chunk_num_keys); + } + } else { + // All keys are new. + self.updates.keys.values.extend_from_self(chunk.keys.values.borrow(), 0..chunk_num_keys); + differ = true; + } + + // --- Vals --- + if differ { + // Keys differed: seal open val group, extend all val lists, unseal last. + self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64); + self.updates.vals.extend_from_self(chunk.vals.borrow(), 0..chunk_num_keys); + self.updates.vals.bounds.pop(); + } else { + // Keys matched: meld vals for the shared key. + if vals_match { + // Skip the duplicate first val; add remaining vals from the first key's list. + if first_key_vals.len() > 1 { + self.updates.vals.values.extend_from_self( + chunk.vals.values.borrow(), + (first_key_vals.start + 1)..first_key_vals.end, + ); + } + } else { + // First val differs: add all vals from the first key's list. + self.updates.vals.values.extend_from_self( + chunk.vals.values.borrow(), + first_key_vals.clone(), + ); + differ = true; + } + // Seal the matched key's val group, extend remaining keys' val lists, unseal. + if chunk_num_keys > 1 { + self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64); + self.updates.vals.extend_from_self(chunk.vals.borrow(), 1..chunk_num_keys); + self.updates.vals.bounds.pop(); + } + } + + // --- Times --- + if differ { + // Seal open time group, extend all time lists, unseal last. + self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64); + self.updates.times.extend_from_self(chunk.times.borrow(), 0..chunk_num_vals); + self.updates.times.bounds.pop(); + } else { + // Keys and vals matched. Times must be strictly greater (precondition), + // so we always set differ = true here. + debug_assert!({ + let stb = self.updates.times.values.borrow(); + let ctb = chunk.times.values.borrow(); + stb.get(Len::len(&stb) - 1) != ctb.get(0) + }, "meld: duplicate time within same (key, val)"); + // Add times from the first val's time list into the open group. + self.updates.times.values.extend_from_self( + chunk.times.values.borrow(), + first_val_times.clone(), + ); + differ = true; + // Seal the matched val's time group, extend remaining vals' time lists, unseal. + if chunk_num_vals > 1 { + self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64); + self.updates.times.extend_from_self(chunk.times.borrow(), 1..chunk_num_vals); + self.updates.times.bounds.pop(); + } + } + + // --- Diffs --- + // Diffs are always sealed (1:1 with times). By the precondition that + // times are strictly increasing for the same (key, val), differ is + // always true by this point — just extend all diff lists. + debug_assert!(differ); + self.updates.diffs.extend_from_self(chunk.diffs.borrow(), 0..chunk_num_times); + } + + /// Seal all open bounds and return the completed `Updates`. + pub fn done(mut self) -> Updates { + use columnar::Len; + if Len::len(&self.updates.keys.values) > 0 { + // Seal the open time group. + self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64); + // Seal the open val group. + self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64); + // Seal the outer key group. + self.updates.keys.bounds.push(Len::len(&self.updates.keys.values) as u64); + } + self.updates + } + } + #[cfg(test)] mod tests { use super::*; @@ -1347,11 +1865,13 @@ where .inner .unary::, _, _, _>(Pipeline, "JoinFunction", move |_, _| { move |input, output| { + let mut t1o = U::Time::default(); + let mut d1o = U::Diff::default(); input.for_each(|time, data| { let mut session = output.session_with_builder(&time); for (k1, v1, t1, d1) in data.updates.iter() { - let t1o: U::Time = Columnar::into_owned(t1); - let d1o: U::Diff = Columnar::into_owned(d1); + Columnar::copy_from(&mut t1o, t1); + Columnar::copy_from(&mut d1o, d1); for (k2, v2, t2, d2) in logic(k1, v1, t1, d1) { let t3 = t2.join(&t1o); let d3 = d2.multiply(&d1o); @@ -1413,6 +1933,7 @@ where builder.build(move |_capability| { let mut col_builder = ValColBuilder::<(K, V, DynTime, R)>::default(); + let mut time = DynTime::default(); move |_frontier| { let mut output = output.activate(); op_input.for_each(|cap, data| { @@ -1429,7 +1950,7 @@ where // that accepts pre-sorted, potentially-collapsing timestamps // could avoid the re-sort inside the builder. for (k, v, t, d) in data.updates.iter() { - let mut time: DynTime = Columnar::into_owned(t); + Columnar::copy_from(&mut time, t); let mut inner_vec = std::mem::take(&mut time.inner).into_inner(); inner_vec.truncate(level - 1); time.inner = PointStamp::new(inner_vec); diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs index 8404e7aa4..85b573157 100644 --- a/differential-dataflow/examples/columnar/main.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -168,8 +168,16 @@ mod reachability { let result = combined_arr.reduce_abelian::<_, ValBuilder, ValSpine, - >("Distinct", |_node, _input, output| { - output.push(((), 1)); + _, + >("Distinct", |_node, _input, output| { output.push(((), 1)); }, + |col, key, upds| { + use columnar::{Clear, Push}; + col.keys.clear(); + col.vals.clear(); + col.times.clear(); + col.diffs.clear(); + for (val, time, diff) in upds.drain(..) { col.push((key, &val, &time, &diff)); } + *col = std::mem::take(col).consolidate(); }); // Extract RecordedUpdates from the Arranged's batch stream. diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index c13227f2c..6981cc3da 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -752,7 +752,11 @@ pub mod vec { use crate::trace::implementations::{ValBuilder, ValSpine}; self.arrange_by_key_named(&format!("Arrange: {}", name)) - .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine>(name, logic) + .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine,_>( + name, + logic, + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ) .as_collection(|k,v| (k.clone(), v.clone())) } @@ -782,7 +786,7 @@ pub mod vec { /// ``` pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> where - T2: for<'a> Trace= &'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static, + T2: for<'a> Trace= &'a K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static, Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { @@ -801,12 +805,16 @@ pub mod vec { pub fn reduce_core(self, name: &str, logic: L) -> Arranged> where V: Clone+'static, - T2: for<'a> Trace=&'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp>+'static, + T2: for<'a> Trace=&'a K, ValOwn = V, Time=G::Timestamp>+'static, Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) - .reduce_core::<_,Bu,_>(name, logic) + .reduce_core::<_,Bu,_,_>( + name, + logic, + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ) } } @@ -871,7 +879,11 @@ pub mod vec { use crate::trace::implementations::{KeyBuilder, KeySpine}; self.arrange_by_self_named(&format!("Arrange: {}", name)) - .reduce_abelian::<_,KeyBuilder,KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) + .reduce_abelian::<_,KeyBuilder,KeySpine,_>( + name, + move |k,s,t| t.push(((), thresh(k, &s[0].1))), + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ) .as_collection(|k,_| k.clone()) } @@ -908,7 +920,11 @@ pub mod vec { pub fn count_core + 'static>(self) -> Collection { use crate::trace::implementations::{ValBuilder, ValSpine}; self.arrange_by_self_named("Arrange: Count") - .reduce_abelian::<_,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) + .reduce_abelian::<_,ValBuilder,ValSpine,_>( + "Count", + |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))), + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ) .as_collection(|k,c| (k.clone(), c.clone())) } } diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index e1c2fde5f..715542f55 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -205,7 +205,11 @@ impl TraceAgent { /// // create a second dataflow /// worker.dataflow(move |scope| { /// trace.import(scope) - /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Reduce", |_key, src, dst| dst.push((*src[0].0, 1))) + /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>,_>( + /// "Reduce", + /// |_key, src, dst| dst.push((*src[0].0, 1)), + /// |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + /// ) /// .as_collection(|k,v| (k.clone(), v.clone())); /// }); /// diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 69965e65b..cd1bd0894 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -30,7 +30,6 @@ use crate::{Data, VecCollection, AsCollection}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; -use crate::trace::implementations::merge_batcher::container::InternalMerge; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -75,7 +74,6 @@ where use ::timely::dataflow::scopes::Child; use ::timely::progress::timestamp::Refines; use timely::Container; -use timely::container::PushInto; impl Arranged where @@ -169,12 +167,17 @@ where /// The underlying `Stream>>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than /// supplied as arguments to an operator using the same key-value structure. - pub fn as_vecs(self) -> VecCollection + /// + /// The method takes `K` and `V` as generic arguments, in order to constrain the reference types to support + /// cloning into owned types. If this bound does not work, the `as_collection` method allows arbitrary logic + /// on the reference types. + pub fn as_vecs(self) -> VecCollection where - Tr::KeyOwn: crate::ExchangeData, - Tr::ValOwn: crate::ExchangeData, + K: crate::ExchangeData, + V: crate::ExchangeData, + Tr: for<'a> TraceReader = &'a K, Val<'a> = &'a V>, { - self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))]) + self.flat_map_ref(move |key, val| [(key.clone(), val.clone())]) } /// Extracts elements from an arrangement as a `VecCollection`. @@ -271,43 +274,43 @@ where T1: TraceReader + Clone + 'static, { /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(self, name: &str, mut logic: L, push: P) -> Arranged> where - T1: TraceReader, + T1: TraceReader, T2: for<'a> Trace< Key<'a>= T1::Key<'a>, - KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time, Diff: Abelian, >+'static, - Bu: Builder>, + Bu: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, + P: FnMut(&mut Bu::Input, T1::Key<'_>, &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>) + 'static, { - self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| { + self.reduce_core::<_,Bu,T2,_>(name, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) })); crate::consolidation::consolidate(change); - }) + }, push) } /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(self, name: &str, logic: L, push: P) -> Arranged> where - T1: TraceReader, + T1: TraceReader, T2: for<'a> Trace< Key<'a>=T1::Key<'a>, - KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time, >+'static, - Bu: Builder>, + Bu: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, + P: FnMut(&mut Bu::Input, T1::Key<'_>, &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>) + 'static, { use crate::operators::reduce::reduce_trace; - reduce_trace::<_,_,Bu,_,_>(self, name, logic) + reduce_trace::<_,_,Bu,_,_,_>(self, name, logic, push) } } diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index e9dbe9cdb..a8322eaa3 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -59,7 +59,7 @@ //! use differential_dataflow::operators::arrange::upsert; //! //! let stream = scope.input_from(&mut input); -//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder, ValSpine>(stream, &"test"); +//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder, ValSpine,String,String>(stream, &"test"); //! //! arranged //! .as_collection(|k,v| (k.clone(), v.clone())) @@ -127,19 +127,21 @@ use super::TraceAgent; /// This method is only implemented for totally ordered times, as we do not yet /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. -pub fn arrange_from_upsert( - stream: Stream, G::Timestamp)>>, +pub fn arrange_from_upsert( + stream: Stream, G::Timestamp)>>, name: &str, ) -> Arranged> where G: Scope, + K: ExchangeData+Hashable+std::hash::Hash, + V: ExchangeData, Tr: for<'a> Trace< - KeyOwn: ExchangeData+Hashable+std::hash::Hash, - ValOwn: ExchangeData, + Key<'a> = &'a K, + Val<'a> = &'a V, Time: TotalOrder+ExchangeData, Diff=isize, >+'static, - Bu: Builder, Output = Tr::Batch>, + Bu: Builder, Output = Tr::Batch>, { let mut reader: Option> = None; @@ -148,7 +150,7 @@ where let reader = &mut reader; - let exchange = Exchange::new(move |update: &(Tr::KeyOwn,Option,G::Timestamp)| (update.0).hashed().into()); + let exchange = Exchange::new(move |update: &(K,Option,G::Timestamp)| (update.0).hashed().into()); let scope = stream.scope(); stream.unary_frontier(exchange, name, move |_capability, info| { @@ -174,7 +176,7 @@ where let mut prev_frontier = Antichain::from_elem(::minimum()); // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap). - let mut priority_queue = BinaryHeap::)>>::new(); + let mut priority_queue = BinaryHeap::)>>::new(); let mut updates = Vec::new(); move |(input, frontier), output| { @@ -237,10 +239,10 @@ where let mut key_con = Tr::KeyContainer::with_capacity(1); for (key, mut list) in to_process { - key_con.clear(); key_con.push_own(&key); + key_con.clear(); key_con.push_ref(&key); // The prior value associated with the key. - let mut prev_value: Option = None; + let mut prev_value: Option = None; // Attempt to find the key in the trace. trace_cursor.seek_key(&trace_storage, key_con.index(0)); @@ -252,7 +254,7 @@ where assert!(count == 0 || count == 1); if count == 1 { assert!(prev_value.is_none()); - prev_value = Some(Tr::owned_val(val)); + prev_value = Some(val.clone()); } trace_cursor.step_val(&trace_storage); } diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index fe3622d74..909199295 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -5,34 +5,41 @@ //! to the key and the list of values. //! The function is expected to populate a list of output values. -use timely::container::PushInto; use crate::Data; -use timely::order::PartialOrder; use timely::progress::frontier::Antichain; use timely::progress::Timestamp; use timely::dataflow::*; use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; -use timely::dataflow::operators::Capability; use crate::operators::arrange::{Arranged, TraceAgent}; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; use crate::trace::implementations::containers::BatchContainer; -use crate::trace::implementations::merge_batcher::container::InternalMerge; use crate::trace::TraceReader; /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. -pub fn reduce_trace(trace: Arranged, name: &str, mut logic: L) -> Arranged> +/// +/// The `logic` closure is expected to take a key, accumulated input, and tentative accumulated output, +/// and populate its final argument with whatever it feels to be appopriate updates. The behavior and +/// correctness of the implementation rely on this making sense, and e.g. ideally the updates would if +/// applied to the tentative output bring it in line with some function applied to the input. +/// +/// The `push` closure is expected to clear its first argument, then populate it with the key and drain +/// the value updates, as appropriate for the container. It is critical that it clear the container as +/// the operator has no ability to do this otherwise, and failing to do so represents a leak from one +/// key's computation to another, and will likely introduce non-determinism. +pub fn reduce_trace(trace: Arranged, name: &str, mut logic: L, mut push: P) -> Arranged> where G: Scope, - T1: TraceReader + Clone + 'static, - T2: for<'a> Trace=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static, - Bu: Builder>, + T1: TraceReader + Clone + 'static, + T2: for<'a> Trace=T1::Key<'a>, ValOwn: Data, Time=T1::Time> + 'static, + Bu: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, + P: FnMut(&mut Bu::Input, T1::Key<'_>, &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>) + 'static, { let mut result_trace = None; @@ -53,22 +60,21 @@ where empty.set_exert_logic(exert_logic); } - let mut source_trace = trace.trace.clone(); let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); - // let mut output_trace = TraceRc::make_from(agent).0; *result_trace = Some(output_reader.clone()); - // let mut thinker1 = history_replay_prior::HistoryReplayer::::new(); - // let mut thinker = history_replay::HistoryReplayer::::new(); let mut new_interesting_times = Vec::::new(); // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times, - // as well as capabilities for these times (or their lower envelope, at least). - let mut interesting = Vec::<(T1::KeyOwn, G::Timestamp)>::new(); - let mut capabilities = Vec::>::new(); + // sorted by (key, time), as well as capabilities for the lower envelope of the times. + let mut pending_keys = T1::KeyContainer::with_capacity(0); + let mut pending_time = T1::TimeContainer::with_capacity(0); + let mut next_pending_keys = T1::KeyContainer::with_capacity(0); + let mut next_pending_time = T1::TimeContainer::with_capacity(0); + let mut capabilities = timely::dataflow::operators::CapabilitySet::::default(); // buffers and logic for computing per-key interesting times "efficiently". let mut interesting_times = Vec::::new(); @@ -81,27 +87,15 @@ where let mut output_upper = Antichain::from_elem(::minimum()); let mut output_lower = Antichain::from_elem(::minimum()); - let id = scope.index(); + move |(input, frontier), output| { - move |(input, _frontier), output| { - - // The `reduce` operator receives fully formed batches, which each serve as an indication - // that the frontier has advanced to the upper bound of their description. - // - // Although we could act on each individually, several may have been sent, and it makes - // sense to accumulate them first to coordinate their re-evaluation. We will need to pay - // attention to which times need to be collected under which capability, so that we can - // assemble output batches correctly. We will maintain several builders concurrently, and - // place output updates into the appropriate builder. + // The operator receives input batches, which it treats as contiguous and will collect and + // then process as one batch. It captures the input frontier from the batches, from the upstream + // trace, and from the input frontier, and retires the work through that interval. // - // It turns out we must use notificators, as we cannot await empty batches from arrange to - // indicate progress, as the arrange may not hold the capability to send such. Instead, we - // must watch for progress here (and the upper bound of received batches) to tell us how - // far we can process work. - // - // We really want to retire all batches we receive, so we want a frontier which reflects - // both information from batches as well as progress information. I think this means that - // we keep times that are greater than or equal to a time in the other frontier, deduplicated. + // Reduce may retain capabilities and need to perform work and produce output at times that + // may not be seen in its input. The standard example is that updates at `(0, 1)` and `(1, 0)` + // may result in outputs at `(1, 1)` as well, even with no input at that time. let mut batch_cursors = Vec::new(); let mut batch_storage = Vec::new(); @@ -110,60 +104,37 @@ where lower_limit.clear(); lower_limit.extend(upper_limit.borrow().iter().cloned()); - // Drain the input stream of batches, validating the contiguity of the batch descriptions and - // capturing a cursor for each of the batches as well as ensuring we hold a capability for the - // times in the batch. + // Drain input batches in order, capturing capabilities and the last upper. input.for_each(|capability, batches| { - + capabilities.insert(capability.retain(0)); for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); batch_cursors.push(batch.cursor()); batch_storage.push(batch); } - - // Ensure that `capabilities` covers the capability of the batch. - capabilities.retain(|cap| !capability.time().less_than(cap.time())); - if !capabilities.iter().any(|cap| cap.time().less_equal(capability.time())) { - capabilities.push(capability.retain(0)); - } }); // Pull in any subsequent empty batches we believe to exist. source_trace.advance_upper(&mut upper_limit); + // Incorporate the input frontier guarantees as well. + let mut joined = Antichain::new(); + crate::lattice::antichain_join_into(&upper_limit.borrow()[..], &frontier.frontier()[..], &mut joined); + upper_limit = joined; - // Only if our upper limit has advanced should we do work. + // We plan to retire the interval [lower_limit, upper_limit), which should be non-empty to proceed. if upper_limit != lower_limit { - // If we have no capabilities, then we (i) should not produce any outputs and (ii) could not send - // any produced outputs even if they were (incorrectly) produced. We cannot even send empty batches - // to indicate forward progress, and must hope that downstream operators look at progress frontiers - // as well as batch descriptions. - // - // We can (and should) advance source and output traces if `upper_limit` indicates this is possible. + // If we hold no capabilities in the interval [lower_limit, upper_limit) then we have no compute needs, + // and could not transmit the outputs even if they were (incorrectly) non-zero. + // We do have maintenance work after this logic, and should not fuse this test with the above test. if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) { - // `interesting` contains "warnings" about keys and times that may need to be re-considered. - // We first extract those times from this list that lie in the interval we will process. - sort_dedup(&mut interesting); - // `exposed` contains interesting (key, time)s now below `upper_limit` - let mut exposed_keys = T1::KeyContainer::with_capacity(0); - let mut exposed_time = T1::TimeContainer::with_capacity(0); - // Keep pairs greater or equal to `upper_limit`, and "expose" other pairs. - interesting.retain(|(key, time)| { - if upper_limit.less_equal(time) { true } else { - exposed_keys.push_own(key); - exposed_time.push_own(time); - false - } - }); + // cursors for navigating input and output traces. + let (mut source_cursor, ref source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); + let (mut output_cursor, ref output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor"); + let (mut batch_cursor, ref batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage); // Prepare an output buffer and builder for each capability. - // - // We buffer and build separately, as outputs are produced grouped by time, whereas the - // builder wants to see outputs grouped by value. While the per-key computation could - // do the re-sorting itself, buffering per-key outputs lets us double check the results - // against other implementations for accuracy. - // // TODO: It would be better if all updates went into one batch, but timely dataflow prevents // this as long as it requires that there is only one capability for each message. let mut buffers = Vec::<(G::Timestamp, Vec<(T2::ValOwn, G::Timestamp, T2::Diff)>)>::new(); @@ -172,29 +143,21 @@ where buffers.push((cap.time().clone(), Vec::new())); builders.push(Bu::new()); } - + // Temporary staging for output building. let mut buffer = Bu::Input::default(); - // cursors for navigating input and output traces. - let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); - let source_storage = &source_storage; - let (mut output_cursor, output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor"); - let output_storage = &output_storage; - let (mut batch_cursor, batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage); - let batch_storage = &batch_storage; - + // Reuseable state for performing the computation. let mut thinker = history_replay::HistoryReplayer::new(); - // We now march through the keys we must work on, drawing from `batch_cursors` and `exposed`. - // - // We only keep valid cursors (those with more data) in `batch_cursors`, and so its length - // indicates whether more data remain. We move through `exposed` using (index) `exposed_position`. - // There could perhaps be a less provocative variable name. - let mut exposed_position = 0; - while batch_cursor.key_valid(batch_storage) || exposed_position < exposed_keys.len() { + // Merge the received batch cursor with our list of interesting (key, time) moments. + // The interesting moments need to be in the interval to prompt work. + + // March through the keys we must work on, merging `batch_cursors` and `exposed`. + let mut pending_pos = 0; + while batch_cursor.key_valid(batch_storage) || pending_pos < pending_keys.len() { // Determine the next key we will work on; could be synthetic, could be from a batch. - let key1 = exposed_keys.get(exposed_position); + let key1 = pending_keys.get(pending_pos); let key2 = batch_cursor.get_key(batch_storage); let key = match (key1, key2) { (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), @@ -203,57 +166,72 @@ where (None, None) => unreachable!(), }; - // `interesting_times` contains those times between `lower_issued` and `upper_limit` - // that we need to re-consider. We now populate it, but perhaps this should be left - // to the per-key computation, which may be able to avoid examining the times of some - // values (for example, in the case of min/max/topk). + // Populate `interesting_times` with interesting times not beyond `upper_limit`. + // TODO: This could just be `pending_time` and indexes within `lower .. upper`. + let prior_pos = pending_pos; interesting_times.clear(); - - // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. - while exposed_keys.get(exposed_position) == Some(key) { - interesting_times.push(T1::owned_time(exposed_time.index(exposed_position))); - exposed_position += 1; + while pending_keys.get(pending_pos) == Some(key) { + let owned_time = T1::owned_time(pending_time.index(pending_pos)); + if !upper_limit.less_equal(&owned_time) { interesting_times.push(owned_time); } + pending_pos += 1; } // tidy up times, removing redundancy. sort_dedup(&mut interesting_times); - // do the per-key computation. - let _counters = thinker.compute( - key, - (&mut source_cursor, source_storage), - (&mut output_cursor, output_storage), - (&mut batch_cursor, batch_storage), - &mut interesting_times, - &mut logic, - &upper_limit, - &mut buffers[..], - &mut new_interesting_times, - ); - - if batch_cursor.get_key(batch_storage) == Some(key) { - batch_cursor.step_key(batch_storage); - } - - // Record future warnings about interesting times (and assert they should be "future"). - for time in new_interesting_times.drain(..) { - debug_assert!(upper_limit.less_equal(&time)); - interesting.push((T1::owned_key(key), time)); - } + // If there are new updates, or pending times, we must investigate! + if batch_cursor.get_key(batch_storage) == Some(key) || !interesting_times.is_empty() { + + // do the per-key computation. + thinker.compute( + key, + (&mut source_cursor, source_storage), + (&mut output_cursor, output_storage), + (&mut batch_cursor, batch_storage), + &interesting_times, + &mut logic, + &upper_limit, + &mut buffers[..], + &mut new_interesting_times, + ); + + // Advance the cursor if this key, so that the loop's validity check registers the work as done. + if batch_cursor.get_key(batch_storage) == Some(key) { batch_cursor.step_key(batch_storage); } + + // Merge novel pending times with any prior pending times we did not process. + // TODO: This could be a merge, not a sort_dedup, because both lists should be sorted. + for pos in prior_pos .. pending_pos { + let owned_time = T1::owned_time(pending_time.index(pos)); + if upper_limit.less_equal(&owned_time) { new_interesting_times.push(owned_time); } + } + sort_dedup(&mut new_interesting_times); + for time in new_interesting_times.drain(..) { + next_pending_keys.push_ref(key); + next_pending_time.push_own(&time); + } - // Sort each buffer by value and move into the corresponding builder. - // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`, - // (ii) that the buffers are time-ordered, and (iii) that the builders accept - // arbitrarily ordered times. - for index in 0 .. buffers.len() { - buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); - for (val, time, diff) in buffers[index].1.drain(..) { - buffer.push_into(((T1::owned_key(key), val), time, diff)); + // Sort each buffer by value and move into the corresponding builder. + // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`, + // (ii) that the buffers are time-ordered, and (iii) that the builders accept + // arbitrarily ordered times. + for index in 0 .. buffers.len() { + buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); + push(&mut buffer, key, &mut buffers[index].1); + buffers[index].1.clear(); builders[index].push(&mut buffer); - buffer.clear(); + + } + } + else { + // copy over the pending key and times. + for pos in prior_pos .. pending_pos { + next_pending_keys.push_ref(pending_keys.index(pos)); + next_pending_time.push_ref(pending_time.index(pos)); } } } + // Drop to avoid lifetime issues that would lock `pending_{keys, time}`. + drop(thinker); // We start sealing output batches from the lower limit (previous upper limit). // In principle, we could update `lower_limit` itself, and it should arrive at @@ -269,7 +247,7 @@ where output_upper.clear(); output_upper.extend(upper_limit.borrow().iter().cloned()); for capability in &capabilities[index + 1 ..] { - output_upper.insert(capability.time().clone()); + output_upper.insert_ref(capability.time()); } if output_upper.borrow() != output_lower.borrow() { @@ -285,39 +263,27 @@ where output_lower.extend(output_upper.borrow().iter().cloned()); } } - // This should be true, as the final iteration introduces no capabilities, and // uses exactly `upper_limit` to determine the upper bound. Good to check though. assert!(output_upper.borrow() == upper_limit.borrow()); - // Determine the frontier of our interesting times. - let mut frontier = Antichain::::new(); - for (_, time) in &interesting { - frontier.insert_ref(time); - } + // Refresh pending keys and times, then downgrade capabilities to the frontier of times. + pending_keys.clear(); std::mem::swap(&mut next_pending_keys, &mut pending_keys); + pending_time.clear(); std::mem::swap(&mut next_pending_time, &mut pending_time); - // Update `capabilities` to reflect interesting pairs described by `frontier`. - let mut new_capabilities = Vec::new(); - for time in frontier.borrow().iter() { - if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(time)) { - new_capabilities.push(cap.delayed(time)); - } - else { - println!("{}:\tfailed to find capability less than new frontier time:", id); - println!("{}:\t time: {:?}", id, time); - println!("{}:\t caps: {:?}", id, capabilities); - println!("{}:\t uppr: {:?}", id, upper_limit); - } + // Update `capabilities` to reflect pending times. + let mut frontier = Antichain::::new(); + let mut owned_time = T1::Time::minimum(); + for pos in 0 .. pending_time.len() { + T1::clone_time_onto(pending_time.index(pos), &mut owned_time); + frontier.insert_ref(&owned_time); } - capabilities = new_capabilities; - - // ensure that observed progress is reflected in the output. - output_writer.seal(upper_limit.clone()); - } - else { - output_writer.seal(upper_limit.clone()); + capabilities.downgrade(frontier); } + // ensure that observed progress is reflected in the output. + output_writer.seal(upper_limit.clone()); + // We only anticipate future times in advance of `upper_limit`. source_trace.set_logical_compaction(upper_limit.borrow()); output_reader.set_logical_compaction(upper_limit.borrow()); @@ -345,35 +311,6 @@ fn sort_dedup(list: &mut Vec) { list.dedup(); } -trait PerKeyCompute<'a, C1, C2, C3, V> -where - C1: Cursor, - C2: for<'b> Cursor = C1::Key<'a>, ValOwn = V, Time = C1::Time>, - C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, - V: Clone + Ord, -{ - fn new() -> Self; - fn compute( - &mut self, - key: C1::Key<'a>, - source_cursor: (&mut C1, &'a C1::Storage), - output_cursor: (&mut C2, &'a C2::Storage), - batch_cursor: (&mut C3, &'a C3::Storage), - times: &mut Vec, - logic: &mut L, - upper_limit: &Antichain, - outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)], - new_interesting: &mut Vec) -> (usize, usize) - where - L: FnMut( - C1::Key<'a>, - &[(C1::Val<'a>, C1::Diff)], - &mut Vec<(V, C2::Diff)>, - &mut Vec<(V, C2::Diff)>, - ); -} - - /// Implementation based on replaying historical and new updates together. mod history_replay { @@ -384,7 +321,7 @@ mod history_replay { use crate::trace::Cursor; use crate::operators::ValueHistory; - use super::{PerKeyCompute, sort_dedup}; + use super::sort_dedup; /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in /// time order, maintaining consolidated representations of updates with respect to future interesting times. @@ -408,14 +345,14 @@ mod history_replay { temporary: Vec, } - impl<'a, C1, C2, C3, V> PerKeyCompute<'a, C1, C2, C3, V> for HistoryReplayer<'a, C1, C2, C3, V> + impl<'a, C1, C2, C3, V> HistoryReplayer<'a, C1, C2, C3, V> where C1: Cursor, C2: for<'b> Cursor = C1::Key<'a>, ValOwn = V, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, V: Clone + Ord, { - fn new() -> Self { + pub fn new() -> Self { HistoryReplayer { input_history: ValueHistory::new(), output_history: ValueHistory::new(), @@ -431,17 +368,17 @@ mod history_replay { } } #[inline(never)] - fn compute( + pub fn compute( &mut self, key: C1::Key<'a>, (source_cursor, source_storage): (&mut C1, &'a C1::Storage), (output_cursor, output_storage): (&mut C2, &'a C2::Storage), (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage), - times: &mut Vec, + times: &Vec, logic: &mut L, upper_limit: &Antichain, outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)], - new_interesting: &mut Vec) -> (usize, usize) + new_interesting: &mut Vec) where L: FnMut( C1::Key<'a>, @@ -477,44 +414,24 @@ mod history_replay { let mut meet = None; update_meet(&mut meet, self.meets.get(0)); update_meet(&mut meet, batch_replay.meet()); - // if let Some(time) = self.meets.get(0) { - // meet = match meet { - // None => Some(self.meets[0].clone()), - // Some(x) => Some(x.meet(&self.meets[0])), - // }; - // } - // if let Some(time) = batch_replay.meet() { - // meet = match meet { - // None => Some(time.clone()), - // Some(x) => Some(x.meet(&time)), - // }; - // } // Having determined the meet, we can load the input and output histories, where we // advance all times by joining them with `meet`. The resulting times are more compact // and guaranteed to accumulate identically for times greater or equal to `meet`. // Load the input and output histories. - let mut input_replay = if let Some(meet) = meet.as_ref() { - self.input_history.replay_key(source_cursor, source_storage, key, |time| { - let mut time = C1::owned_time(time); - time.join_assign(meet); - time - }) - } - else { - self.input_history.replay_key(source_cursor, source_storage, key, |time| C1::owned_time(time)) - }; - let mut output_replay = if let Some(meet) = meet.as_ref() { - self.output_history.replay_key(output_cursor, output_storage, key, |time| { - let mut time = C2::owned_time(time); - time.join_assign(meet); - time - }) - } - else { - self.output_history.replay_key(output_cursor, output_storage, key, |time| C2::owned_time(time)) - }; + let mut input_replay = + self.input_history.replay_key(source_cursor, source_storage, key, |time| { + let mut time = C1::owned_time(time); + if let Some(meet) = meet.as_ref() { time.join_assign(meet); } + time + }); + let mut output_replay = + self.output_history.replay_key(output_cursor, output_storage, key, |time| { + let mut time = C2::owned_time(time); + if let Some(meet) = meet.as_ref() { time.join_assign(meet); } + time + }); self.synth_times.clear(); self.times_current.clear(); @@ -526,9 +443,6 @@ mod history_replay { let mut times_slice = ×[..]; let mut meets_slice = &self.meets[..]; - let mut compute_counter = 0; - let mut output_counter = 0; - // We have candidate times from `batch` and `times`, as well as times identified by either // `input` or `output`. Finally, we may have synthetic times produced as the join of times // we consider in the course of evaluation. As long as any of these times exist, we need to @@ -538,7 +452,7 @@ mod history_replay { input_replay.time(), output_replay.time(), self.synth_times.last(), - ].iter().cloned().flatten().min().cloned() { + ].into_iter().flatten().min().cloned() { // Advance input and output history replayers. This marks applicable updates as active. input_replay.step_while_time_is(&next_time); @@ -551,11 +465,7 @@ mod history_replay { // Advance batch history, and capture whether an update exists at `next_time`. let mut interesting = batch_replay.step_while_time_is(&next_time); - if interesting { - if let Some(meet) = meet.as_ref() { - batch_replay.advance_buffer_by(meet); - } - } + if interesting { if let Some(meet) = meet.as_ref() { batch_replay.advance_buffer_by(meet); } } // advance both `synth_times` and `times_slice`, marking this time interesting if in either. while self.synth_times.last() == Some(&next_time) { @@ -576,7 +486,7 @@ mod history_replay { // and become the time itself. They may not equal the current time because whatever frontier we // are tracking may not have advanced far enough. // TODO: `batch_history` may or may not be super compact at this point, and so this check might - // yield false positives if not sufficiently compact. Maybe we should into this and see. + // yield false positives if not sufficiently compact. Maybe we should look into this and see. interesting = interesting || batch_replay.buffer().iter().any(|&((_, ref t),_)| t.less_equal(&next_time)); interesting = interesting || self.times_current.iter().any(|t| t.less_equal(&next_time)); @@ -592,106 +502,65 @@ mod history_replay { // output produced. This sounds like a good test to have for debug builds! if interesting { - compute_counter += 1; - // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use). debug_assert!(self.input_buffer.is_empty()); - meet.as_ref().map(|meet| input_replay.advance_buffer_by(meet)); - for &((value, ref time), ref diff) in input_replay.buffer().iter() { - if time.less_equal(&next_time) { - self.input_buffer.push((value, diff.clone())); - } - else { - self.temporary.push(next_time.join(time)); - } + if let Some(meet) = meet.as_ref() { input_replay.advance_buffer_by(meet) }; + for ((value, time), diff) in input_replay.buffer().iter() { + if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); } + else { self.temporary.push(next_time.join(time)); } } - for &((value, ref time), ref diff) in batch_replay.buffer().iter() { - if time.less_equal(&next_time) { - self.input_buffer.push((value, diff.clone())); - } - else { - self.temporary.push(next_time.join(time)); - } + for ((value, time), diff) in batch_replay.buffer().iter() { + if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); } + else { self.temporary.push(next_time.join(time)); } } crate::consolidation::consolidate(&mut self.input_buffer); - meet.as_ref().map(|meet| output_replay.advance_buffer_by(meet)); - for &((value, ref time), ref diff) in output_replay.buffer().iter() { - if time.less_equal(&next_time) { - self.output_buffer.push((C2::owned_val(value), diff.clone())); - } - else { - self.temporary.push(next_time.join(time)); - } + // Assemble the output collection at `next_time`. (`self.output_buffer` cleared just after use). + if let Some(meet) = meet.as_ref() { output_replay.advance_buffer_by(meet) }; + for ((value, time), diff) in output_replay.buffer().iter() { + if time.less_equal(&next_time) { self.output_buffer.push((C2::owned_val(*value), diff.clone())); } + else { self.temporary.push(next_time.join(time)); } } - for &((ref value, ref time), ref diff) in self.output_produced.iter() { - if time.less_equal(&next_time) { - self.output_buffer.push(((*value).to_owned(), diff.clone())); - } - else { - self.temporary.push(next_time.join(time)); - } + for ((value, time), diff) in self.output_produced.iter() { + if time.less_equal(&next_time) { self.output_buffer.push(((*value).to_owned(), diff.clone())); } + else { self.temporary.push(next_time.join(time)); } } crate::consolidation::consolidate(&mut self.output_buffer); - // Apply user logic if non-empty input and see what happens! + // Apply user logic if non-empty input or output and see what happens! if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() { logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer); self.input_buffer.clear(); self.output_buffer.clear(); - } - - // output_replay.advance_buffer_by(&meet); - // for &((ref value, ref time), diff) in output_replay.buffer().iter() { - // if time.less_equal(&next_time) { - // self.output_buffer.push(((*value).clone(), -diff)); - // } - // else { - // self.temporary.push(next_time.join(time)); - // } - // } - // for &((ref value, ref time), diff) in self.output_produced.iter() { - // if time.less_equal(&next_time) { - // self.output_buffer.push(((*value).clone(), -diff)); - // } - // else { - // self.temporary.push(next_time.join(&time)); - // } - // } - - // Having subtracted output updates from user output, consolidate the results to determine - // if there is anything worth reporting. Note: this also orders the results by value, so - // that could make the above merging plan even easier. - crate::consolidation::consolidate(&mut self.update_buffer); - - // Stash produced updates into both capability-indexed buffers and `output_produced`. - // The two locations are important, in that we will compact `output_produced` as we move - // through times, but we cannot compact the output buffers because we need their actual - // times. - if !self.update_buffer.is_empty() { - - output_counter += 1; - - // We *should* be able to find a capability for `next_time`. Any thing else would - // indicate a logical error somewhere along the way; either we release a capability - // we should have kept, or we have computed the output incorrectly (or both!) - let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time)); - let idx = outputs.len() - idx.expect("failed to find index") - 1; - for (val, diff) in self.update_buffer.drain(..) { - self.output_produced.push(((val.clone(), next_time.clone()), diff.clone())); - outputs[idx].1.push((val, next_time.clone(), diff)); - } - // Advance times in `self.output_produced` and consolidate the representation. - // NOTE: We only do this when we add records; it could be that there are situations - // where we want to consolidate even without changes (because an initially - // large collection can now be collapsed). - if let Some(meet) = meet.as_ref() { - for entry in &mut self.output_produced { - (entry.0).1 = (entry.0).1.join(meet); + // Having subtracted output updates from user output, consolidate the results to determine + // if there is anything worth reporting. Note: this also orders the results by value, so + // that could make the above merging plan even easier. + // + // Stash produced updates into both capability-indexed buffers and `output_produced`. + // The two locations are important, in that we will compact `output_produced` as we move + // through times, but we cannot compact the output buffers because we need their actual + // times. + crate::consolidation::consolidate(&mut self.update_buffer); + if !self.update_buffer.is_empty() { + + // We *should* be able to find a capability for `next_time`. Any thing else would + // indicate a logical error somewhere along the way; either we release a capability + // we should have kept, or we have computed the output incorrectly (or both!) + let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time)); + let idx = outputs.len() - idx.expect("failed to find index") - 1; + for (val, diff) in self.update_buffer.drain(..) { + self.output_produced.push(((val.clone(), next_time.clone()), diff.clone())); + outputs[idx].1.push((val, next_time.clone(), diff)); } + + // Advance times in `self.output_produced` and consolidate the representation. + // NOTE: We only do this when we add records; it could be that there are situations + // where we want to consolidate even without changes (because an initially + // large collection can now be collapsed). + if let Some(meet) = meet.as_ref() { for entry in &mut self.output_produced { (entry.0).1.join_assign(meet); } } + crate::consolidation::consolidate(&mut self.output_produced); } - crate::consolidation::consolidate(&mut self.output_produced); } } @@ -705,17 +574,8 @@ mod history_replay { // Any time, even uninteresting times, must be joined with the current accumulation of // batch times as well as the current accumulation of `times_current`. - for &((_, ref time), _) in batch_replay.buffer().iter() { - if !time.less_equal(&next_time) { - self.temporary.push(time.join(&next_time)); - } - } - for time in self.times_current.iter() { - if !time.less_equal(&next_time) { - self.temporary.push(time.join(&next_time)); - } - } - + self.temporary.extend(batch_replay.buffer().iter().map(|((_,time),_)| time).filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time))); + self.temporary.extend(self.times_current.iter().filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time))); sort_dedup(&mut self.temporary); // Introduce synthetic times, and re-organize if we add any. @@ -745,19 +605,13 @@ mod history_replay { debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time))) } - // Update `meet` to track the meet of each source of times. - meet = None;//T::maximum(); + meet = None; update_meet(&mut meet, batch_replay.meet()); update_meet(&mut meet, input_replay.meet()); update_meet(&mut meet, output_replay.meet()); for time in self.synth_times.iter() { update_meet(&mut meet, Some(time)); } - // if let Some(time) = batch_replay.meet() { meet = meet.meet(time); } - // if let Some(time) = input_replay.meet() { meet = meet.meet(time); } - // if let Some(time) = output_replay.meet() { meet = meet.meet(time); } - // for time in self.synth_times.iter() { meet = meet.meet(time); } update_meet(&mut meet, meets_slice.first()); - // if let Some(time) = meets_slice.first() { meet = meet.meet(time); } // Update `times_current` by the frontier. if let Some(meet) = meet.as_ref() { @@ -771,20 +625,14 @@ mod history_replay { // Normalize the representation of `new_interesting`, deduplicating and ordering. sort_dedup(new_interesting); - - (compute_counter, output_counter) } } /// Updates an optional meet by an optional time. fn update_meet(meet: &mut Option, other: Option<&T>) { if let Some(time) = other { - if let Some(meet) = meet.as_mut() { - *meet = meet.meet(time); - } - if meet.is_none() { - *meet = Some(time.clone()); - } + if let Some(meet) = meet.as_mut() { meet.meet_assign(time); } + else { *meet = Some(time.clone()); } } } } diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index d73eb71f4..082af6812 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -113,8 +113,6 @@ pub trait WithLayout { /// Automatically implemented trait for types with layouts. pub trait LayoutExt : WithLayout> { - /// Alias for an owned key of a layout. - type KeyOwn; /// Alias for an borrowed key of a layout. type Key<'a>: Copy + Ord; /// Alias for an owned val of a layout. @@ -131,7 +129,7 @@ pub trait LayoutExt : WithLayout: Copy + Ord; /// Container for update keys. - type KeyContainer: for<'a> BatchContainer = Self::Key<'a>, Owned = Self::KeyOwn>; + type KeyContainer: for<'a> BatchContainer = Self::Key<'a>>; /// Container for update vals. type ValContainer: for<'a> BatchContainer = Self::Val<'a>, Owned = Self::ValOwn>; /// Container for times. @@ -139,8 +137,6 @@ pub trait LayoutExt : WithLayout BatchContainer = Self::DiffGat<'a>, Owned = Self::Diff>; - /// Construct an owned key from a reference. - fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn; /// Construct an owned val from a reference. fn owned_val(val: Self::Val<'_>) -> Self::ValOwn; /// Construct an owned time from a reference. @@ -153,7 +149,6 @@ pub trait LayoutExt : WithLayout LayoutExt for L { - type KeyOwn = <::KeyContainer as BatchContainer>::Owned; type Key<'a> = <::KeyContainer as BatchContainer>::ReadItem<'a>; type ValOwn = <::ValContainer as BatchContainer>::Owned; type Val<'a> = <::ValContainer as BatchContainer>::ReadItem<'a>; @@ -167,7 +162,6 @@ impl LayoutExt for L { type TimeContainer = ::TimeContainer; type DiffContainer = ::DiffContainer; - #[inline(always)] fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn { ::KeyContainer::into_owned(key) } #[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { ::ValContainer::into_owned(val) } #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { ::TimeContainer::into_owned(time) } #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { ::DiffContainer::into_owned(diff) } diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 0197e09c0..3f6ee03ab 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -54,7 +54,6 @@ pub trait TraceReader : LayoutExt { WithLayout + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, - KeyOwn = Self::KeyOwn, Val<'a> = Self::Val<'a>, ValOwn = Self::ValOwn, Time = Self::Time, @@ -77,7 +76,6 @@ pub trait TraceReader : LayoutExt { WithLayout + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, - KeyOwn = Self::KeyOwn, Val<'a> = Self::Val<'a>, ValOwn = Self::ValOwn, Time = Self::Time, @@ -110,6 +108,15 @@ pub trait TraceReader : LayoutExt { /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses. fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)>; + /// Acquires a cursor for a known set of keys. + /// + /// The default implementation ignores the keys and returns a full cursor via `cursor_through`. + /// Implementations may override this to optimize for the specified key set (e.g., serving + /// from a cache or performing bulk key lookups). + fn cursor_through_keyed(&mut self, upper: AntichainRef, _keys: &Self::KeyContainer) -> Option<(Self::Cursor, Self::Storage)> { + self.cursor_through(upper) + } + /// Advances the frontier that constrains logical compaction. /// /// Logical compaction is the ability of the trace to change the times of the updates it contains. @@ -253,7 +260,6 @@ pub trait BatchReader : LayoutExt + Sized { WithLayout + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, - KeyOwn = Self::KeyOwn, Val<'a> = Self::Val<'a>, ValOwn = Self::ValOwn, Time = Self::Time, diff --git a/differential-dataflow/src/trace/wrappers/cached.rs b/differential-dataflow/src/trace/wrappers/cached.rs new file mode 100644 index 000000000..e9f3bbf22 --- /dev/null +++ b/differential-dataflow/src/trace/wrappers/cached.rs @@ -0,0 +1,251 @@ +//! A spine wrapper that caches keys on demand. +//! +//! `CachedSpine` wraps a `Spine` and maintains a hot cache of batches +//! for keys that have been explicitly requested. Cache misses are +//! loaded from the spine using the provided builder and push closure. +//! +//! Since it wraps `Spine` directly, `Storage = Vec` and +//! `Cursor = CursorList` are known concretely, avoiding +//! opaque associated type issues. + +use std::cell::RefCell; +use std::marker::PhantomData; +use std::rc::Rc; + +use timely::progress::{Antichain, frontier::AntichainRef}; + +use crate::trace::{Batch, Builder, Description, TraceReader}; +use crate::trace::cursor::{Cursor, CursorList}; +use crate::trace::implementations::WithLayout; +use crate::trace::implementations::containers::BatchContainer; +use crate::trace::implementations::spine_fueled::Spine; +use crate::trace::implementations::Layout; + +/// Key container type for a batch's layout. +type KeyCon = <::Layout as Layout>::KeyContainer; + +/// A spine with a hot cache for frequently-accessed keys. +/// +/// - `B`: batch type +/// - `Bu`: builder type for constructing cache batches +/// - `P`: closure that loads a key's data from a cursor into `Bu::Input` +pub struct CachedSpine +where + B: Batch + Clone + 'static, +{ + /// The backing spine, containing all data. + spine: Spine, + /// Cached batches for loaded keys. + hot: Vec, + /// The set of keys currently in the cache, sorted. + cached_keys: KeyCon, + /// The frontier through which cached data is complete. + /// Cached keys have data for `[0, hot_physical)`. + hot_physical: Antichain<::Time>, + /// Closure to push cursor data into a builder's input container. + /// Behind Rc so clones share it. + push: Rc>, + _marker: PhantomData, +} + +impl CachedSpine +where + B: Batch + Clone + 'static, +{ + /// Wraps a spine with caching support. + pub fn new(spine: Spine, push: P) -> Self { + CachedSpine { + spine, + hot: Vec::new(), + cached_keys: KeyCon::::with_capacity(0), + hot_physical: Antichain::from_elem(<::Time as timely::progress::Timestamp>::minimum()), + push: Rc::new(RefCell::new(push)), + _marker: PhantomData, + } + } + + /// Returns true if the given key is in the cache. + pub fn contains_key(&self, key: as BatchContainer>::ReadItem<'_>) -> bool { + (0..self.cached_keys.len()).any(|i| self.cached_keys.index(i) == as BatchContainer>::reborrow(key)) + } + + /// Installs a batch of cached key data. + pub fn insert_hot(&mut self, batch: B) { + if !batch.is_empty() { + self.hot.push(batch); + } + } + + /// Clears all cached data and the key set. + pub fn clear_hot(&mut self) { + self.hot.clear(); + self.cached_keys.clear(); + self.hot_physical = Antichain::from_elem(<::Time as timely::progress::Timestamp>::minimum()); + } + + /// The number of cached keys. + pub fn cached_key_count(&self) -> usize { + self.cached_keys.len() + } + + /// The number of hot batches. + pub fn hot_batch_count(&self) -> usize { + self.hot.len() + } + + /// Returns a reference to the inner spine. + pub fn spine(&self) -> &Spine { &self.spine } + + /// Returns a mutable reference to the inner spine. + pub fn spine_mut(&mut self) -> &mut Spine { &mut self.spine } +} + +// CachedSpine is not Clone — it wraps a mutable Spine. +// Use TraceAgent> for shared access. + +impl WithLayout for CachedSpine +where + B: Batch + Clone + 'static, +{ + type Layout = B::Layout; +} + +impl TraceReader for CachedSpine +where + B: Batch + Clone + 'static, + Bu: Builder