Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions differential-dataflow/src/dynamic/pointstamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@
Self { vector: Default::default() }
}
fn to_outer(self) -> () {
()

Check warning on line 109 in differential-dataflow/src/dynamic/pointstamp.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

unneeded unit expression
}
fn summarize(_summary: <Self>::Summary) -> () {
()

Check warning on line 112 in differential-dataflow/src/dynamic/pointstamp.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

unneeded unit expression
}
}

Expand Down Expand Up @@ -243,6 +243,18 @@
}
Self::new(vector)
}
#[inline]
fn join_assign(&mut self, other: &Self) {
let my_len = self.vector.len();
let other_len = other.vector.len();
let min_len = my_len.min(other_len);
for i in 0..min_len {
self.vector[i].join_assign(&other.vector[i]);
}
if other_len > my_len {
self.vector.extend(other.vector[my_len..].iter().cloned());
}
}
#[inline(always)]
fn meet(&self, other: &Self) -> Self {
let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
Expand All @@ -254,6 +266,15 @@
// Remaining coordinates are `T::minimum()` in one input, and so in the output.
Self::new(vector)
}
#[inline]
fn meet_assign(&mut self, other: &Self) {
let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
self.vector.truncate(min_len);
for (this, that) in self.vector.iter_mut().zip(other.vector.iter()) {
this.meet_assign(that);
}
while self.vector.last() == Some(&T::minimum()) { self.vector.pop(); }
}
}

mod columnation {
Expand Down
23 changes: 17 additions & 6 deletions differential-dataflow/src/lattice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@
/// ```
#[inline]
fn advance_by(&mut self, frontier: AntichainRef<Self>) where Self: Sized {
let mut iter = frontier.iter();
if let Some(first) = iter.next() {
let mut result = self.join(first);
for f in iter {
result.meet_assign(&self.join(f));
match &*frontier {
[] => {}
[first] => self.join_assign(first),
[first, rest @ ..] => {
let mut result = self.join(first);
for f in rest { result.meet_assign(&self.join(f)); }
*self = result;
}
*self = result;
}
}
}
Expand All @@ -155,12 +156,22 @@
}
}
#[inline]
fn join_assign(&mut self, other: &Self) {
self.outer.join_assign(&other.outer);
self.inner.join_assign(&other.inner);
}
#[inline]
fn meet(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
Product {
outer: self.outer.meet(&other.outer),
inner: self.inner.meet(&other.inner),
}
}
#[inline]
fn meet_assign(&mut self, other: &Self) {
self.outer.meet_assign(&other.outer);
self.inner.meet_assign(&other.inner);
}
}

/// A type that has a unique maximum element.
Expand All @@ -181,7 +192,7 @@
}

implement_maximum!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8, Duration,);
impl Maximum for () { fn maximum() -> () { () }}

Check warning on line 195 in differential-dataflow/src/lattice.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

unneeded unit expression

use timely::progress::Timestamp;

Expand Down
3 changes: 1 addition & 2 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@
while let Some(key) = cursor.get_key(batch) {
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {

Check warning on line 198 in differential-dataflow/src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`time` shadows a previous, unrelated binding
session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
});
}
Expand Down Expand Up @@ -224,8 +224,7 @@
I: IntoIterator<Item: Data>,
L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>)->I+'static
{
let mut result = move |k: Tr1::Key<'_>, v1: Tr1::Val<'_>, v2: Tr2::Val<'_>, t: &Tr1::Time, r1: &Tr1::Diff, r2: &Tr2::Diff| {
let t = t.clone();
let mut result = move |k: Tr1::Key<'_>, v1: Tr1::Val<'_>, v2: Tr2::Val<'_>, t: Tr1::Time, r1: &Tr1::Diff, r2: &Tr2::Diff| {
let r = (r1.clone()).multiply(r2);
result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
};
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub fn join_traces<'scope, Tr1, Tr2, L, CB>(arranged1: Arranged<'scope, Tr1>, ar
where
Tr1: TraceReader+'static,
Tr2: for<'a> TraceReader<Key<'a>=Tr1::Key<'a>, Time = Tr1::Time>+'static,
L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>,&Tr1::Time,&Tr1::Diff,&Tr2::Diff,&mut JoinSession<Tr1::Time, CB, Capability<Tr1::Time>>)+'static,
L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>,Tr1::Time,&Tr1::Diff,&Tr2::Diff,&mut JoinSession<Tr1::Time, CB, Capability<Tr1::Time>>)+'static,
CB: ContainerBuilder,
{
// Rename traces for symmetry from here on out.
Expand Down Expand Up @@ -347,7 +347,7 @@ where
#[inline(never)]
fn work<L, CB: ContainerBuilder>(&mut self, output: &mut OutputBuilderSession<T, EffortBuilder<CB>>, mut logic: L, fuel: &mut usize)
where
L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff, &mut JoinSession<T, CB, Capability<T>>),
L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, T, &C1::Diff, &C2::Diff, &mut JoinSession<T, CB, Capability<T>>),
{

let meet = self.capability.time();
Expand Down Expand Up @@ -379,7 +379,7 @@ where

// populate `temp` with the results in the best way we know how.
thinker.think(|v1,v2,t,r1,r2| {
logic(batch_key, v1, v2, &t, r1, r2, &mut session);
logic(batch_key, v1, v2, t, r1, r2, &mut session);
});

// TODO: Effort isn't perfectly tracked as we might still have some data in the
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl<'history, V: Copy + Ord, T: Ord + Clone + Lattice, D: Clone + crate::differ
}
fn advance_buffer_by(&mut self, meet: &T) {
for element in self.replay.buffer.iter_mut() {
(element.0).1 = (element.0).1.join(meet);
(element.0).1.join_assign(meet);
}
crate::consolidation::consolidate(&mut self.replay.buffer);
}
Expand Down
Loading