diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index f6c4e52a4..bb7f74f6f 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -296,7 +296,6 @@ impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> { } } - impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { /// Brings an arranged collection out of a nested region. /// diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 4260c2b95..a2e0a32b2 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -125,6 +125,24 @@ impl V self.replay() } + /// Wraps the already-built, sorted `history` for a fresh walk, WITHOUT rebuilding or re-sorting + /// it. Valid whenever `history` is intact — e.g. after a `replay_key` whose returned replay was + /// only read through [`HistoryReplay::times`] (which does not step it). Used by reduce's + /// `reference` tactic, whose determination reads times and whose application then walks values. + fn walk<'history>(&'history mut self) -> HistoryReplay<'history, V, T, D> { + self.buffer.clear(); + HistoryReplay { replay: self } + } + + /// A time-only, non-destructive walk over the already-built `history` (see [`TimeReplay`]). It + /// reads times and their precomputed meets and accumulates only *times* into the caller-supplied + /// `buffer`, leaving `history` intact for a later [`walk`](Self::walk) over values. The buffer is + /// owned by the caller so the standard value walk pays nothing for this reference-only capability. + fn replay_times<'history>(&'history self, buffer: &'history mut Vec) -> TimeReplay<'history, T> { + buffer.clear(); + TimeReplay { history: &self.history[..], buffer } + } + /// Organizes history based on current contents of edits. fn replay<'history>(&'history mut self) -> HistoryReplay<'history, V, T, D> { @@ -181,3 +199,40 @@ impl<'history, V: Copy + Ord, T: Ord + Clone + Lattice, D: Clone + crate::differ } fn is_done(&self) -> bool { self.replay.history.is_empty() } } + +/// A time-only, non-destructive walk over an already-built [`ValueHistory`] history. +/// +/// It mirrors the time-facing half of [`HistoryReplay`] — `time`/`meet`/`step`/`step_while_time_is` +/// /`advance_buffer_by`/`buffer` — but it carries only *times*, and it walks a shrinking *view* of +/// `history` rather than popping it, so the underlying history stays intact for a later value walk. +/// `advance_buffer_by` compacts the accumulated times by joining with `meet` and deduplicating (the +/// time-only analogue of consolidation), which is what keeps a join-closure over these times an +/// antichain — and hence non-quadratic. +struct TimeReplay<'history, T> { + history: &'history [(T, T, usize, usize)], // shrinking view; `last()` is the least time + buffer: &'history mut Vec, // accumulated (and compacted) times seen so far +} + +impl<'history, T: Ord + Clone + Lattice> TimeReplay<'history, T> { + fn time(&self) -> Option<&T> { self.history.last().map(|entry| &entry.0) } + fn meet(&self) -> Option<&T> { self.history.last().map(|entry| &entry.1) } + fn step(&mut self) { + let last = self.history.len() - 1; + self.buffer.push(self.history[last].0.clone()); + self.history = &self.history[..last]; + } + fn step_while_time_is(&mut self, time: &T) -> bool { + let mut found = false; + while self.time() == Some(time) { + found = true; + self.step(); + } + found + } + fn advance_buffer_by(&mut self, meet: &T) { + for time in self.buffer.iter_mut() { time.join_assign(meet); } + self.buffer.sort(); + self.buffer.dedup(); + } + fn buffer(&self) -> &[T] { &self.buffer[..] } +} diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 08c99dba5..55210ace4 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -73,6 +73,11 @@ where reduce_with_tactic(trace, name, cursors::CursorTactic::::new(logic, push)) } +// The model-derived reference tactic and its entry point live in `mod reference`; re-exported here +// (doc-hidden) as the sole public handle for its differential and oracle tests. +#[doc(hidden)] +pub use reference::reduce_trace_reference; + /// Drives a key-wise reduction using a supplied [`ReduceTactic`]. /// /// This is the general reduce operator: it does the dataflow plumbing (frontiers, capabilities, output @@ -620,33 +625,73 @@ mod cursors { // before we add the time to `synth_times`. if !upper_limit.less_equal(&next_time) { + // DETERMINATION (times only). Determine synthetic interesting times. + // + // Synthetic interesting times are produced differently for interesting and uninteresting + // times. An uninteresting time must join with an interesting time to become interesting, + // which means joins with `self.batch_history` and `self.times_current`. I think we can + // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be + // joined against everything. + + // Any time, even uninteresting times, must be joined with the current accumulation of + // batch times as well as the current accumulation of `times_current`. + self.temporary.extend(batch_replay.buffer().iter().map(|((_,time),_)| time).filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time))); + self.temporary.extend(self.times_current.iter().filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time))); + + // An interesting time additionally joins with `input` and `output` history and this round's + // produced output: it carries the seed, so those joins stay interesting (an uninteresting + // time does not, as `input`/`output` times are not themselves seeds). We advance the buffers + // by `meet` first, exactly as evaluation reads them below; by join preservation the advanced + // and unadvanced times spawn the same synthetics, so this matches the pre-split behavior. + if interesting { + if let Some(meet) = meet.as_ref() { input_replay.advance_buffer_by(meet) }; + if let Some(meet) = meet.as_ref() { output_replay.advance_buffer_by(meet) }; + self.temporary.extend(input_replay.buffer().iter().map(|((_,time),_)| time).filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time))); + self.temporary.extend(output_replay.buffer().iter().map(|((_,time),_)| time).filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time))); + self.temporary.extend(self.output_produced.iter().map(|((_,time),_)| time).filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time))); + } + sort_dedup(&mut self.temporary); + + // Introduce synthetic times, and re-organize if we add any. + let synth_len = self.synth_times.len(); + for time in self.temporary.drain(..) { + // We can either service `join` now, or must delay for the future. + if upper_limit.less_equal(&time) { + debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&time))); + new_interesting.push(time); + } + else { + self.synth_times.push(time); + } + } + if self.synth_times.len() > synth_len { + self.synth_times.sort_by(|x,y| y.cmp(x)); + self.synth_times.dedup(); + } + + // EVALUATION (values only). // We should re-evaluate the computation if this is an interesting time. // If the time is uninteresting (and our logic is sound) it is not possible for there to be // output produced. This sounds like a good test to have for debug builds! if interesting { // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use). + // The buffers were advanced by `meet` in the determination step above. debug_assert!(self.input_buffer.is_empty()); - if let Some(meet) = meet.as_ref() { input_replay.advance_buffer_by(meet) }; for ((value, time), diff) in input_replay.buffer().iter() { if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); } - else { self.temporary.push(next_time.join(time)); } } for ((value, time), diff) in batch_replay.buffer().iter() { if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); } - else { self.temporary.push(next_time.join(time)); } } crate::consolidation::consolidate(&mut self.input_buffer); // Assemble the output collection at `next_time`. (`self.output_buffer` cleared just after use). - if let Some(meet) = meet.as_ref() { output_replay.advance_buffer_by(meet) }; for ((value, time), diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { self.output_buffer.push((C2::owned_val(*value), diff.clone())); } - else { self.temporary.push(next_time.join(time)); } } for ((value, time), diff) in self.output_produced.iter() { if time.less_equal(&next_time) { self.output_buffer.push(((*value).to_owned(), diff.clone())); } - else { self.temporary.push(next_time.join(time)); } } crate::consolidation::consolidate(&mut self.output_buffer); @@ -686,37 +731,6 @@ mod cursors { } } } - - // Determine synthetic interesting times. - // - // Synthetic interesting times are produced differently for interesting and uninteresting - // times. An uninteresting time must join with an interesting time to become interesting, - // which means joins with `self.batch_history` and `self.times_current`. I think we can - // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be - // joined against everything. - - // Any time, even uninteresting times, must be joined with the current accumulation of - // batch times as well as the current accumulation of `times_current`. - self.temporary.extend(batch_replay.buffer().iter().map(|((_,time),_)| time).filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time))); - self.temporary.extend(self.times_current.iter().filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time))); - sort_dedup(&mut self.temporary); - - // Introduce synthetic times, and re-organize if we add any. - let synth_len = self.synth_times.len(); - for time in self.temporary.drain(..) { - // We can either service `join` now, or must delay for the future. - if upper_limit.less_equal(&time) { - debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&time))); - new_interesting.push(time); - } - else { - self.synth_times.push(time); - } - } - if self.synth_times.len() > synth_len { - self.synth_times.sort_by(|x,y| y.cmp(x)); - self.synth_times.dedup(); - } } else if interesting { // We cannot process `next_time` now, and must delay it. @@ -760,3 +774,511 @@ mod cursors { } } } + +/// A second [`ReduceTactic`], written directly from the incremental model in +/// `formal/Differential/Model.lean`. +/// +/// Per key it runs two phases over one cursor walk. Phase 1 (determination) computes the +/// interesting times as the truncated join-closure over {input, output, seeds} — the model's +/// `Reached` — advancing by meets so the synthetic set stays bounded. Phase 2 (application) walks +/// exactly those times in order, maintaining tight input/output accumulations by meets, and emits +/// the corrections — the model's `emit_correct`. Determination never consults the output produced +/// this round (it finishes first), so this tactic embodies the proven algorithm exactly and is the +/// clean subject for differential testing against [`cursors::CursorTactic`]. +pub(crate) mod reference { + + use super::*; + use crate::lattice::Lattice; + use crate::operators::ValueHistory; + + /// Drives a key-wise reduction with the model-derived [`ReferenceTactic`], the analogue of the + /// default [`super::reduce_trace`]. Same result contract; intended for differential testing of the + /// two tactics against each other. Re-exported (doc-hidden) from the parent module as the sole + /// public handle: the reference tactic is a testing and demonstration oracle, not a stable entry + /// point to build on. + pub fn reduce_trace_reference<'scope, Tr1, Bu, Tr2, L, P>(trace: Arranged<'scope, Tr1>, name: &str, logic: L, push: P) -> Arranged<'scope, TraceAgent> + where + Tr1: TraceReader + 'static, + Tr2: Trace + 'static, + BatchCursor: Cursor