Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
while let Some(key) = cursor.get_key(batch) {
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {

Check warning on line 208 in differential-dataflow/src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`time` shadows a previous, unrelated binding
session.give((datum.clone(), <BatchCursor<Tr> as Cursor>::owned_time(time), <BatchCursor<Tr> as Cursor>::owned_diff(diff)));
});
}
Expand Down Expand Up @@ -296,7 +296,6 @@
}
}


impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> {
/// Brings an arranged collection out of a nested region.
///
Expand Down
55 changes: 55 additions & 0 deletions differential-dataflow/src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,24 @@ impl<V: Copy + Ord, T: Ord + Clone + Lattice, D: crate::difference::Semigroup> V
self.replay()
}

/// Wraps the already-built, sorted `history` for a fresh walk, WITHOUT rebuilding or re-sorting
/// it. Valid whenever `history` is intact — e.g. after a `replay_key` whose returned replay was
/// only read through [`HistoryReplay::times`] (which does not step it). Used by reduce's
/// `reference` tactic, whose determination reads times and whose application then walks values.
fn walk<'history>(&'history mut self) -> HistoryReplay<'history, V, T, D> {
self.buffer.clear();
HistoryReplay { replay: self }
}

/// A time-only, non-destructive walk over the already-built `history` (see [`TimeReplay`]). It
/// reads times and their precomputed meets and accumulates only *times* into the caller-supplied
/// `buffer`, leaving `history` intact for a later [`walk`](Self::walk) over values. The buffer is
/// owned by the caller so the standard value walk pays nothing for this reference-only capability.
fn replay_times<'history>(&'history self, buffer: &'history mut Vec<T>) -> TimeReplay<'history, T> {
buffer.clear();
TimeReplay { history: &self.history[..], buffer }
}

/// Organizes history based on current contents of edits.
fn replay<'history>(&'history mut self) -> HistoryReplay<'history, V, T, D> {

Expand Down Expand Up @@ -181,3 +199,40 @@ impl<'history, V: Copy + Ord, T: Ord + Clone + Lattice, D: Clone + crate::differ
}
fn is_done(&self) -> bool { self.replay.history.is_empty() }
}

/// A time-only, non-destructive walk over an already-built [`ValueHistory`] history.
///
/// It mirrors the time-facing half of [`HistoryReplay`] — `time`/`meet`/`step`/`step_while_time_is`
/// /`advance_buffer_by`/`buffer` — but it carries only *times*, and it walks a shrinking *view* of
/// `history` rather than popping it, so the underlying history stays intact for a later value walk.
/// `advance_buffer_by` compacts the accumulated times by joining with `meet` and deduplicating (the
/// time-only analogue of consolidation), which is what keeps a join-closure over these times an
/// antichain — and hence non-quadratic.
struct TimeReplay<'history, T> {
history: &'history [(T, T, usize, usize)], // shrinking view; `last()` is the least time
buffer: &'history mut Vec<T>, // accumulated (and compacted) times seen so far
}

impl<'history, T: Ord + Clone + Lattice> TimeReplay<'history, T> {
fn time(&self) -> Option<&T> { self.history.last().map(|entry| &entry.0) }
fn meet(&self) -> Option<&T> { self.history.last().map(|entry| &entry.1) }
fn step(&mut self) {
let last = self.history.len() - 1;
self.buffer.push(self.history[last].0.clone());
self.history = &self.history[..last];
}
fn step_while_time_is(&mut self, time: &T) -> bool {
let mut found = false;
while self.time() == Some(time) {
found = true;
self.step();
}
found
}
fn advance_buffer_by(&mut self, meet: &T) {
for time in self.buffer.iter_mut() { time.join_assign(meet); }
self.buffer.sort();
self.buffer.dedup();
}
fn buffer(&self) -> &[T] { &self.buffer[..] }
}
Loading
Loading