From 5de4f02b6752b41037d63f2c142093dfea40a804 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 2 Jul 2026 14:47:19 -0400 Subject: [PATCH 01/10] reduce: add TimeReplay, a time-only non-destructive ValueHistory walk TimeReplay mirrors the time-facing half of HistoryReplay (time / meet / step / step_while_time_is / advance_buffer_by / buffer) but carries only *times* and walks a shrinking view of the built history rather than popping it, so the underlying history stays intact for a later value walk. advance_buffer_by compacts by join + dedup -- the time-only analogue of consolidation -- which keeps a join-closure over the times an antichain, hence non-quadratic. Adds ValueHistory::{walk, replay_times}. Co-Authored-By: Claude Fable 5 --- differential-dataflow/src/operators/mod.rs | 57 ++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 4260c2b95..746781daa 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -91,6 +91,7 @@ struct ValueHistory { edits: EditList, history: Vec<(T, T, usize, usize)>, // (time, meet, value_index, edit_offset) buffer: Vec<((V, T), D)>, // where we accumulate / collapse updates. + time_buffer: Vec, // like `buffer`, but times only, for `TimeReplay`. } impl ValueHistory { @@ -99,12 +100,14 @@ impl V edits: EditList::new(), history: Vec::new(), buffer: Vec::new(), + time_buffer: Vec::new(), } } fn clear(&mut self) { self.edits.clear(); self.history.clear(); self.buffer.clear(); + self.time_buffer.clear(); } /// Loads and replays a specified key. @@ -125,6 +128,23 @@ impl V self.replay() } + /// Wraps the already-built, sorted `history` for a fresh walk, WITHOUT rebuilding or re-sorting + /// it. Valid whenever `history` is intact — e.g. after a `replay_key` whose returned replay was + /// only read through [`HistoryReplay::times`] (which does not step it). Used by reduce's + /// `reference` tactic, whose determination reads times and whose application then walks values. + fn walk<'history>(&'history mut self) -> HistoryReplay<'history, V, T, D> { + self.buffer.clear(); + HistoryReplay { replay: self } + } + + /// A time-only, non-destructive walk over the already-built `history` (see [`TimeReplay`]). It + /// reads times and their precomputed meets and accumulates only *times*, leaving `history` + /// intact for a later [`walk`](Self::walk) over values. + fn replay_times<'history>(&'history mut self) -> TimeReplay<'history, T> { + self.time_buffer.clear(); + TimeReplay { history: &self.history[..], buffer: &mut self.time_buffer } + } + /// Organizes history based on current contents of edits. fn replay<'history>(&'history mut self) -> HistoryReplay<'history, V, T, D> { @@ -181,3 +201,40 @@ impl<'history, V: Copy + Ord, T: Ord + Clone + Lattice, D: Clone + crate::differ } fn is_done(&self) -> bool { self.replay.history.is_empty() } } + +/// A time-only, non-destructive walk over an already-built [`ValueHistory`] history. +/// +/// It mirrors the time-facing half of [`HistoryReplay`] — `time`/`meet`/`step`/`step_while_time_is` +/// /`advance_buffer_by`/`buffer` — but it carries only *times*, and it walks a shrinking *view* of +/// `history` rather than popping it, so the underlying history stays intact for a later value walk. +/// `advance_buffer_by` compacts the accumulated times by joining with `meet` and deduplicating (the +/// time-only analogue of consolidation), which is what keeps a join-closure over these times an +/// antichain — and hence non-quadratic. +struct TimeReplay<'history, T> { + history: &'history [(T, T, usize, usize)], // shrinking view; `last()` is the least time + buffer: &'history mut Vec, // accumulated (and compacted) times seen so far +} + +impl<'history, T: Ord + Clone + Lattice> TimeReplay<'history, T> { + fn time(&self) -> Option<&T> { self.history.last().map(|entry| &entry.0) } + fn meet(&self) -> Option<&T> { self.history.last().map(|entry| &entry.1) } + fn step(&mut self) { + let last = self.history.len() - 1; + self.buffer.push(self.history[last].0.clone()); + self.history = &self.history[..last]; + } + fn step_while_time_is(&mut self, time: &T) -> bool { + let mut found = false; + while self.time() == Some(time) { + found = true; + self.step(); + } + found + } + fn advance_buffer_by(&mut self, meet: &T) { + for time in self.buffer.iter_mut() { time.join_assign(meet); } + self.buffer.sort(); + self.buffer.dedup(); + } + fn buffer(&self) -> &[T] { &self.buffer[..] } +} From c15b63c6a0440719a00c776ba5488b4267c31c53 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 2 Jul 2026 14:47:19 -0400 Subject: [PATCH 02/10] reduce: add model-derived 'reference' tactic Adds mod reference, a second ReduceTactic whose determination computes reduce's interesting times directly as formal/Differential/Model.lean's Reached -- the truncated join-closure over the stored input/output/seed times -- the non-quadratic way: an ordered walk that keeps its live join-partners an antichain via meet-compaction, over TimeReplay so it is time-only and leaves the histories intact for the value walk. It runs at parity with the default cursor tactic, and exists as a differential oracle for that tactic and an executable form of the model. Also splits the cursor tactic's per-key compute into the same two phases (determination, then evaluation); semantically identical, validated by the suite. Entry points: Collection::{reduce_reference, reduce_named_reference}, Arranged::{reduce_abelian_reference, reduce_core_reference}, reduce_trace_reference. Optional interesting-time counters in operators::reduce::metrics behind the reduce-metrics feature. Co-Authored-By: Claude Fable 5 --- differential-dataflow/Cargo.toml | 2 + differential-dataflow/src/collection.rs | 21 + .../src/operators/arrange/arrangement.rs | 33 + differential-dataflow/src/operators/reduce.rs | 600 ++++++++++++++++-- 4 files changed, 619 insertions(+), 37 deletions(-) diff --git a/differential-dataflow/Cargo.toml b/differential-dataflow/Cargo.toml index 3dbace3c7..ae9a2b0da 100644 --- a/differential-dataflow/Cargo.toml +++ b/differential-dataflow/Cargo.toml @@ -39,6 +39,8 @@ timely = {workspace = true} [features] default = ["timely/getopts"] +# Enables the reduce interesting-time counters in `operators::reduce::metrics`. +reduce-metrics = [] [[bench]] name = "chunk_bench" diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index ebf634ff2..6eed46760 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -757,6 +757,27 @@ pub mod vec { .as_collection(|k,v| (k.clone(), v.clone())) } + /// As [`reduce`](Self::reduce), but driven by the model-derived reference tactic. Same result; + /// intended for differential testing against the default reduce. + pub fn reduce_reference(self, logic: L) -> Collection<'scope, T, (K, V2), R2> + where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { + self.reduce_named_reference("Reduce", logic) + } + + /// As [`reduce_named`](Self::reduce_named), but driven by the model-derived reference tactic. + pub fn reduce_named_reference(self, name: &str, logic: L) -> Collection<'scope, T, (K, V2), R2> + where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { + use crate::trace::implementations::{ValBuilder, ValSpine}; + + self.arrange_by_key_named(&format!("Arrange: {}", name)) + .reduce_abelian_reference::<_,ValBuilder<_,_,_,_>,ValSpine,_>( + name, + logic, + |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, + ) + .as_collection(|k,v| (k.clone(), v.clone())) + } + /// Applies `reduce` to arranged data, and returns an arrangement of output data. /// /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index f6c4e52a4..9445f95bc 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -294,6 +294,39 @@ impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> { use crate::operators::reduce::reduce_trace; reduce_trace::<_,Bu,_,_,_>(self, name, logic, push) } + + /// As [`reduce_abelian`](Self::reduce_abelian), but using the model-derived reference tactic. + pub fn reduce_abelian_reference(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceAgent> + where + Tr2: Trace+'static, + BatchCursor: Cursor