diff --git a/differential-dataflow/src/dynamic/pointstamp.rs b/differential-dataflow/src/dynamic/pointstamp.rs index cf2bdf4fe..b256ac906 100644 --- a/differential-dataflow/src/dynamic/pointstamp.rs +++ b/differential-dataflow/src/dynamic/pointstamp.rs @@ -243,6 +243,18 @@ impl Lattice for PointStamp { } Self::new(vector) } + #[inline] + fn join_assign(&mut self, other: &Self) { + let my_len = self.vector.len(); + let other_len = other.vector.len(); + let min_len = my_len.min(other_len); + for i in 0..min_len { + self.vector[i].join_assign(&other.vector[i]); + } + if other_len > my_len { + self.vector.extend(other.vector[my_len..].iter().cloned()); + } + } #[inline(always)] fn meet(&self, other: &Self) -> Self { let min_len = ::std::cmp::min(self.vector.len(), other.vector.len()); @@ -254,6 +266,15 @@ impl Lattice for PointStamp { // Remaining coordinates are `T::minimum()` in one input, and so in the output. Self::new(vector) } + #[inline] + fn meet_assign(&mut self, other: &Self) { + let min_len = ::std::cmp::min(self.vector.len(), other.vector.len()); + self.vector.truncate(min_len); + for (this, that) in self.vector.iter_mut().zip(other.vector.iter()) { + this.meet_assign(that); + } + while self.vector.last() == Some(&T::minimum()) { self.vector.pop(); } + } } mod columnation { diff --git a/differential-dataflow/src/lattice.rs b/differential-dataflow/src/lattice.rs index 938669bcd..4ecd763cb 100644 --- a/differential-dataflow/src/lattice.rs +++ b/differential-dataflow/src/lattice.rs @@ -133,13 +133,14 @@ pub trait Lattice : PartialOrder { /// ``` #[inline] fn advance_by(&mut self, frontier: AntichainRef) where Self: Sized { - let mut iter = frontier.iter(); - if let Some(first) = iter.next() { - let mut result = self.join(first); - for f in iter { - result.meet_assign(&self.join(f)); + match &*frontier { + [] => {} + [first] => self.join_assign(first), + [first, rest @ ..] => { + let mut result = self.join(first); + for f in rest { result.meet_assign(&self.join(f)); } + *self = result; } - *self = result; } } } @@ -155,12 +156,22 @@ impl Lattice for Product { } } #[inline] + fn join_assign(&mut self, other: &Self) { + self.outer.join_assign(&other.outer); + self.inner.join_assign(&other.inner); + } + #[inline] fn meet(&self, other: &Product) -> Product { Product { outer: self.outer.meet(&other.outer), inner: self.inner.meet(&other.inner), } } + #[inline] + fn meet_assign(&mut self, other: &Self) { + self.outer.meet_assign(&other.outer); + self.inner.meet_assign(&other.inner); + } } /// A type that has a unique maximum element. diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 3b5a63fb6..42f3d7b43 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -224,8 +224,7 @@ impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> { I: IntoIterator, L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>)->I+'static { - let mut result = move |k: Tr1::Key<'_>, v1: Tr1::Val<'_>, v2: Tr2::Val<'_>, t: &Tr1::Time, r1: &Tr1::Diff, r2: &Tr2::Diff| { - let t = t.clone(); + let mut result = move |k: Tr1::Key<'_>, v1: Tr1::Val<'_>, v2: Tr2::Val<'_>, t: Tr1::Time, r1: &Tr1::Diff, r2: &Tr2::Diff| { let r = (r1.clone()).multiply(r2); result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone())) }; diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index edf7120f6..1d2de9219 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -70,7 +70,7 @@ pub fn join_traces<'scope, Tr1, Tr2, L, CB>(arranged1: Arranged<'scope, Tr1>, ar where Tr1: TraceReader+'static, Tr2: for<'a> TraceReader=Tr1::Key<'a>, Time = Tr1::Time>+'static, - L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>,&Tr1::Time,&Tr1::Diff,&Tr2::Diff,&mut JoinSession>)+'static, + L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>,Tr1::Time,&Tr1::Diff,&Tr2::Diff,&mut JoinSession>)+'static, CB: ContainerBuilder, { // Rename traces for symmetry from here on out. @@ -347,7 +347,7 @@ where #[inline(never)] fn work(&mut self, output: &mut OutputBuilderSession>, mut logic: L, fuel: &mut usize) where - L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff, &mut JoinSession>), + L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, T, &C1::Diff, &C2::Diff, &mut JoinSession>), { let meet = self.capability.time(); @@ -379,7 +379,7 @@ where // populate `temp` with the results in the best way we know how. thinker.think(|v1,v2,t,r1,r2| { - logic(batch_key, v1, v2, &t, r1, r2, &mut session); + logic(batch_key, v1, v2, t, r1, r2, &mut session); }); // TODO: Effort isn't perfectly tracked as we might still have some data in the diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index eb7880aab..83f8010a7 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -182,7 +182,7 @@ impl<'history, V: Copy + Ord, T: Ord + Clone + Lattice, D: Clone + crate::differ } fn advance_buffer_by(&mut self, meet: &T) { for element in self.replay.buffer.iter_mut() { - (element.0).1 = (element.0).1.join(meet); + (element.0).1.join_assign(meet); } crate::consolidation::consolidate(&mut self.replay.buffer); }