From 019ecaed07678809c5edce09b4515f357c6d680b Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 1 Jul 2026 22:26:01 -0400 Subject: [PATCH 1/3] Chunk: remove the Navigable requirement; add the NavigableChunk capability Chunk is now storage plus transducers plus Time/len/TARGET, with no key/val opinions. Navigation moves to NavigableChunk: Chunk + Navigable, which carries bounds(); VecChunk and ColChunk implement it. ChunkBatch drops its cached boundary columns, and the straddle cursor builds that resident index at construction instead, so Navigable for ChunkBatch gates on C: NavigableChunk. ChunkSpine now forms, merges, and settles for a cursor-less C; cursor-driven consumption is what demands the capability. chunk_bench's scan and driver bounds tighten to NavigableChunk accordingly. Co-Authored-By: Claude Fable 5 --- differential-dataflow/benches/chunk_bench.rs | 6 +- .../src/columnar/trace/chunk.rs | 13 +- differential-dataflow/src/trace/chunk/mod.rs | 142 +++++++++++------- differential-dataflow/src/trace/chunk/vec.rs | 13 +- 4 files changed, 110 insertions(+), 64 deletions(-) diff --git a/differential-dataflow/benches/chunk_bench.rs b/differential-dataflow/benches/chunk_bench.rs index aea4b85f0..33e34f9cd 100644 --- a/differential-dataflow/benches/chunk_bench.rs +++ b/differential-dataflow/benches/chunk_bench.rs @@ -22,7 +22,7 @@ use timely::container::{PushInto, SizableContainer}; use differential_dataflow::consolidation::Consolidate; use differential_dataflow::columnar::layout::ColumnarUpdate; -use differential_dataflow::trace::chunk::{merge_chains, Chunk}; +use differential_dataflow::trace::chunk::{merge_chains, Chunk, NavigableChunk}; use differential_dataflow::trace::chunk::vec::VecChunk; use differential_dataflow::columnar::trace::ColChunk; use differential_dataflow::trace::cursor::Cursor; @@ -60,7 +60,7 @@ where /// Walk every `(key, val, time, diff)` via the chunk cursors, counting updates /// (a uniform proxy for scan cost across layouts). -fn scan(chunks: &[C]) -> usize { +fn scan(chunks: &[C]) -> usize { let mut count = 0; for c in chunks { let mut cur = c.cursor(); @@ -81,7 +81,7 @@ fn ms(t: Instant) -> f64 { t.elapsed().as_secs_f64() * 1000.0 } /// two interleaving halves for the merge. Returns timings + footprint. fn bench(updates: &[I], half_a: &[I], half_b: &[I]) -> (f64, f64, f64, f64, usize, usize) where - C: Chunk + Default + SizableContainer + Consolidate + Container + PushInto, + C: NavigableChunk + Default + SizableContainer + Consolidate + Container + PushInto, I: Clone, { let (half_a, half_b) = (half_a.to_vec(), half_b.to_vec()); diff --git a/differential-dataflow/src/columnar/trace/chunk.rs b/differential-dataflow/src/columnar/trace/chunk.rs index f4cb6718f..a8357ae28 100644 --- a/differential-dataflow/src/columnar/trace/chunk.rs +++ b/differential-dataflow/src/columnar/trace/chunk.rs @@ -301,12 +301,8 @@ where U::Time: 'static { } } -impl Chunk for ColChunk +impl crate::trace::chunk::NavigableChunk for ColChunk where U::Time: 'static { - type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; - - const TARGET: usize = TARGET; - fn bounds(&self) -> ( (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), @@ -327,6 +323,13 @@ where U::Time: 'static { } } } +} + +impl Chunk for ColChunk +where U::Time: 'static { + type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; + + const TARGET: usize = TARGET; fn len(&self) -> usize { match self { diff --git a/differential-dataflow/src/trace/chunk/mod.rs b/differential-dataflow/src/trace/chunk/mod.rs index 940a28764..1953f95ed 100644 --- a/differential-dataflow/src/trace/chunk/mod.rs +++ b/differential-dataflow/src/trace/chunk/mod.rs @@ -22,6 +22,8 @@ //! [`arrange_core`](crate::operators::arrange::arrangement::arrange_core), along with a //! chunker that forms `C` from the input stream — typically //! [`ContainerChunker`](crate::trace::implementations::chunker::ContainerChunker). +//! Trace *maintenance* needs only [`Chunk`]; cursor-driven *consumption* of the +//! arrangement additionally asks `C` for the [`NavigableChunk`] capability. //! Everything else here ([`ChunkBatch`], [`ChunkMerger`], [`ChunkBatchMerger`], //! [`ChunkBatchCursor`], [`ChunkBatchBuilder`]) is machinery those aliases expand to and is //! not named directly. The [`vec`](mod@vec) module is a worked `Chunk` @@ -45,10 +47,10 @@ //! settles its committed output as it goes (see [`Chunk::settle`]), rather than //! building a whole sequence and settling at the end. And every walk over a whole //! chunk sequence reads only resident metadata — [`len`](Chunk::len) and -//! [`bounds`](Chunk::bounds) — never a chunk body: a batch indexes its chunks' -//! bounds once at construction, so cursors binary-search that resident index and -//! open only the chunk(s) a query touches. Implementors must therefore keep `len` -//! and `bounds` cheap even when a chunk's body is paged out. +//! [`bounds`](NavigableChunk::bounds) — never a chunk body: the straddle cursor +//! indexes its chunks' bounds once at construction, binary-searches that resident +//! index, and opens only the chunk(s) a query touches. Implementors must therefore +//! keep `len` and `bounds` cheap even when a chunk's body is paged out. use std::collections::VecDeque; @@ -70,22 +72,20 @@ pub mod vec; /// The "data" operations transform lists of chunks, are expected to do roughly /// "one chunk's worth" of work at a time; they can afford to compress and page. /// The "metadata" operations provide chunk information, and should be lightweight. -pub trait Chunk: Navigable> + Sized + Clone { +/// +/// The trait has no opinion about keys, vals, or diffs — only time, which trace +/// maintenance needs. Reading a chunk's contents is a separate, optional +/// capability: see [`NavigableChunk`]. +pub trait Chunk: Sized + Clone { /// The timestamp type of the chunk's updates. /// - /// Key/val/diff opinions live on the chunk's [`Navigable::Cursor`]; the chunk itself only needs - /// time, to bound its interval and participate in advancement and compaction. + /// Key/val/diff opinions live on the optional [`NavigableChunk`] capability; the chunk itself + /// only needs time, to bound its interval and participate in advancement and compaction. type Time: Lattice + timely::progress::Timestamp; /// The intended maximum chunk size. const TARGET: usize; - /// The first and last `(key, val, time)` triples in the chunk. - fn bounds(&self) -> ( - (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), - (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), - ); - /// The number of updates in the chunk. fn len(&self) -> usize; @@ -148,6 +148,24 @@ pub trait Chunk: Navigable> + Sized + Clone { } +/// The navigation capability: a [`Chunk`] whose contents can be read by cursor. +/// +/// This is optional. Batch formation and trace maintenance need only [`Chunk`]; +/// implementing this trait additionally lets [`ChunkBatch`] offer the straddle +/// cursor ([`ChunkBatchCursor`]), which is how cursor-driven operator paths read +/// an arrangement. Chunks consumed only by whole-chunk logic (tactics) can skip it. +/// +/// `bounds` must stay cheap even when a chunk's body is paged out: the straddle +/// cursor indexes every chunk's bounds at construction, and opens a chunk's body +/// only when a query touches it. +pub trait NavigableChunk: Chunk + Navigable::Time>> { + /// The first and last `(key, val, time)` triples in the chunk. + fn bounds(&self) -> ( + (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), + (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), + ); +} + /// Maximal-packing driver an implementor's [`Chunk::settle`] may delegate to. /// /// Holds a `carry` chunk under construction, grown by `combine` until it reaches @@ -219,44 +237,48 @@ type KeyCon = <::Cursor as Cursor>::KeyContainer; type ValCon = <::Cursor as Cursor>::ValContainer; /// A batch is a [`Chunk`] sequence plus a [`Description`]. -/// -/// Metadata about the batches is cached to make subselection efficient. pub struct ChunkBatch { /// Ordered, consolidated chunks; their concatenation is the batch. pub chunks: Vec, /// The lower, upper, and since frontiers of the batch. pub description: Description, - /// Per-chunk first and last key, and first and last val, parallel to `chunks`. - first_keys: KeyCon, - last_keys: KeyCon, - first_vals: ValCon, - last_vals: ValCon, } impl ChunkBatch { - /// Assemble a batch from ordered chunks, building the per-chunk index. + /// Assemble a batch from ordered chunks. pub fn new(chunks: Vec, description: Description) -> Self { - let n = chunks.len(); + for chunk in &chunks { + assert!(chunk.len() > 0, "ChunkBatch chunks must be non-empty"); + } + ChunkBatch { chunks, description } + } +} + +impl crate::trace::Navigable for ChunkBatch { + type Cursor = ChunkBatchCursor; + fn cursor(&self) -> Self::Cursor { + // Index the chunks' resident bounds, so navigation binary-searches the + // index and opens only the chunk(s) a query touches. The index lives on + // the cursor rather than the batch: its containers are cursor-typed, and + // batches of cursor-less chunks must still form, merge, and settle. + let n = self.chunks.len(); let mut first_keys = >::with_capacity(n); let mut last_keys = >::with_capacity(n); let mut first_vals = >::with_capacity(n); let mut last_vals = >::with_capacity(n); - for chunk in &chunks { - assert!(chunk.len() > 0, "ChunkBatch chunks must be non-empty"); + for chunk in &self.chunks { let ((fk, fv, _), (lk, lv, _)) = chunk.bounds(); first_keys.push_ref(fk); last_keys.push_ref(lk); first_vals.push_ref(fv); last_vals.push_ref(lv); } - ChunkBatch { chunks, description, first_keys, last_keys, first_vals, last_vals } - } -} - -impl crate::trace::Navigable for ChunkBatch { - type Cursor = ChunkBatchCursor; - fn cursor(&self) -> Self::Cursor { - ChunkBatchCursor { key_chunk: 0, chunk: 0, inner: self.chunks.first().map(C::cursor) } + ChunkBatchCursor { + key_chunk: 0, + chunk: 0, + inner: self.chunks.first().map(C::cursor), + first_keys, last_keys, first_vals, last_vals, + } } } @@ -301,28 +323,52 @@ pub type ChunkBuilder = crate::trace::rc_blanket_impls::RcBuilder { +/// and a cursor into it; it seeks by binary-searching the per-chunk bounds index +/// built at construction, and at boundaries it *continues* into the next chunk +/// rather than merging — using the index to detect when a key or `(key, val)` +/// spills forward, without touching chunk contents. +pub struct ChunkBatchCursor { /// First chunk of the current key's run; where `rewind_vals` returns to. key_chunk: usize, /// Chunk currently being read; `>= key_chunk`, within the current key's span. chunk: usize, /// Cursor into `chunk`; `None` once `chunk` is past the last chunk. inner: Option, + /// Per-chunk first and last key, and first and last val, parallel to the + /// batch's `chunks`: the resident index the seeks binary-search. + first_keys: KeyCon, + last_keys: KeyCon, + first_vals: ValCon, + last_vals: ValCon, } -impl ChunkBatchCursor { +impl ChunkBatchCursor { /// Move the active chunk to `c`, opening a fresh inner cursor at its start. fn goto(&mut self, c: usize, storage: &ChunkBatch) { self.chunk = c; self.inner = storage.chunks.get(c).map(C::cursor); } + + /// Does key `k` span the boundary between chunks `c` and `c + 1` — chunk `c` + /// ends with it and chunk `c + 1` begins with it? + /// + /// The `reborrow`s confine the comparison's lifetime to this call: the bounds + /// index lives on `self`, and without them the (invariant) item lifetimes + /// would tie the borrow of `self` to `k`, blocking the caller's `goto`. + fn key_spills(&self, c: usize, k: ::Key<'_>) -> bool { + as BatchContainer>::reborrow(self.last_keys.index(c)) == as BatchContainer>::reborrow(k) + && as BatchContainer>::reborrow(self.first_keys.index(c + 1)) == as BatchContainer>::reborrow(k) + } + + /// Does `(k, v)` span the boundary between chunks `c` and `c + 1`? + fn val_spills(&self, c: usize, k: ::Key<'_>, v: ::Val<'_>) -> bool { + self.key_spills(c, k) + && as BatchContainer>::reborrow(self.last_vals.index(c)) == as BatchContainer>::reborrow(v) + && as BatchContainer>::reborrow(self.first_vals.index(c + 1)) == as BatchContainer>::reborrow(v) + } } -impl Cursor for ChunkBatchCursor { +impl Cursor for ChunkBatchCursor { type Storage = ChunkBatch; type KeyContainer = ::KeyContainer; @@ -350,10 +396,7 @@ impl Cursor for ChunkBatchCursor { self.inner.as_mut().unwrap().map_times(&s.chunks[self.chunk], &mut logic); // Follow the (key, val) forward across boundaries while it spills. let mut c = self.chunk; - while c + 1 < s.chunks.len() - && s.last_keys.index(c) == k && s.first_keys.index(c + 1) == k - && s.last_vals.index(c) == v && s.first_vals.index(c + 1) == v - { + while c + 1 < s.chunks.len() && self.val_spills(c, k, v) { c += 1; s.chunks[c].cursor().map_times(&s.chunks[c], &mut logic); } @@ -364,7 +407,7 @@ impl Cursor for ChunkBatchCursor { let n = s.chunks.len(); let k = self.key(s); // Advance to the last chunk the key spans. - while self.chunk + 1 < n && s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k { + while self.chunk + 1 < n && self.key_spills(self.chunk, k) { self.goto(self.chunk + 1, s); } // Step past the key within its last chunk. @@ -383,7 +426,7 @@ impl Cursor for ChunkBatchCursor { fn seek_key(&mut self, s: &Self::Storage, key: Self::Key<'_>) { let n = s.chunks.len(); // First chunk whose last key is `>= key`: where `key`'s run begins. - let c = s.last_keys.advance(0, n, |x| { + let c = self.last_keys.advance(0, n, |x| { as BatchContainer>::reborrow(x).lt(& as BatchContainer>::reborrow(key)) }); self.goto(c, s); @@ -396,17 +439,14 @@ impl Cursor for ChunkBatchCursor { let n = s.chunks.len(); let (k, v) = (self.key(s), self.val(s)); // Advance to the last chunk the (key, val) spans. - while self.chunk + 1 < n - && s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k - && s.last_vals.index(self.chunk) == v && s.first_vals.index(self.chunk + 1) == v - { + while self.chunk + 1 < n && self.val_spills(self.chunk, k, v) { self.goto(self.chunk + 1, s); } // Step past the (key, val) within that chunk. self.inner.as_mut().unwrap().step_val(&s.chunks[self.chunk]); // If the key's vals are exhausted here but the key spills, roll forward. if !self.inner.as_ref().unwrap().val_valid(&s.chunks[self.chunk]) - && self.chunk + 1 < n && s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k + && self.chunk + 1 < n && self.key_spills(self.chunk, k) { self.goto(self.chunk + 1, s); self.inner.as_mut().unwrap().seek_key(&s.chunks[self.chunk], k); @@ -421,7 +461,7 @@ impl Cursor for ChunkBatchCursor { self.inner.as_mut().unwrap().seek_val(&s.chunks[self.chunk], val); if self.inner.as_ref().unwrap().val_valid(&s.chunks[self.chunk]) { return; } // Key's vals exhausted in this chunk; if the key spills, retry in the next. - if self.chunk + 1 < n && s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k { + if self.chunk + 1 < n && self.key_spills(self.chunk, k) { self.goto(self.chunk + 1, s); self.inner.as_mut().unwrap().seek_key(&s.chunks[self.chunk], k); } else { diff --git a/differential-dataflow/src/trace/chunk/vec.rs b/differential-dataflow/src/trace/chunk/vec.rs index 71170835b..7d47e289e 100644 --- a/differential-dataflow/src/trace/chunk/vec.rs +++ b/differential-dataflow/src/trace/chunk/vec.rs @@ -199,17 +199,20 @@ where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Timestamp, R: Ord+S } } -impl Chunk for VecChunk +impl super::NavigableChunk for VecChunk where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Timestamp, R: Ord+Semigroup+'static { - type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; - - const TARGET: usize = TARGET; - fn bounds(&self) -> ((&K, &V, &T), (&K, &V, &T)) { let s = &self.0[..]; let (f, l) = (&s[0], &s[s.len() - 1]); ((&f.0.0, &f.0.1, &f.1), (&l.0.0, &l.0.1, &l.1)) } +} + +impl Chunk for VecChunk +where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Timestamp, R: Ord+Semigroup+'static { + type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; + + const TARGET: usize = TARGET; fn len(&self) -> usize { self.0.len() } From e3feaab92aa064a92302bb2cc52b21555efa0497 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 2 Jul 2026 07:17:22 -0400 Subject: [PATCH 2/3] Straddle cursor: consult chunk bounds directly, drop the materialized index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Building the boundary index at cursor construction cost O(chunks) before the first seek — more than the walk itself for sparse lookups, and pure waste for sequential passes, which visit every chunk anyway. Every navigation question is answerable from the chunks' resident bounds() directly: seeks binary-search them (log(chunks) reads) and boundary spill checks read the two neighbouring chunks. Cursor construction and ChunkBatch::new are now free of per-chunk work entirely. Co-Authored-By: Claude Fable 5 --- differential-dataflow/src/trace/chunk/mod.rs | 96 ++++++++------------ 1 file changed, 38 insertions(+), 58 deletions(-) diff --git a/differential-dataflow/src/trace/chunk/mod.rs b/differential-dataflow/src/trace/chunk/mod.rs index 1953f95ed..46cb8bb5a 100644 --- a/differential-dataflow/src/trace/chunk/mod.rs +++ b/differential-dataflow/src/trace/chunk/mod.rs @@ -48,9 +48,9 @@ //! building a whole sequence and settling at the end. And every walk over a whole //! chunk sequence reads only resident metadata — [`len`](Chunk::len) and //! [`bounds`](NavigableChunk::bounds) — never a chunk body: the straddle cursor -//! indexes its chunks' bounds once at construction, binary-searches that resident -//! index, and opens only the chunk(s) a query touches. Implementors must therefore -//! keep `len` and `bounds` cheap even when a chunk's body is paged out. +//! seeks by binary-searching the chunks' bounds and opens only the chunk(s) a +//! query touches. Implementors must therefore keep `len` and `bounds` cheap even +//! when a chunk's body is paged out. use std::collections::VecDeque; @@ -156,8 +156,9 @@ pub trait Chunk: Sized + Clone { /// an arrangement. Chunks consumed only by whole-chunk logic (tactics) can skip it. /// /// `bounds` must stay cheap even when a chunk's body is paged out: the straddle -/// cursor indexes every chunk's bounds at construction, and opens a chunk's body -/// only when a query touches it. +/// cursor consults chunk bounds throughout navigation — seeks binary-search them, +/// boundary crossings compare against them — and opens a chunk's body only when a +/// query touches it. pub trait NavigableChunk: Chunk + Navigable::Time>> { /// The first and last `(key, val, time)` triples in the chunk. fn bounds(&self) -> ( @@ -257,28 +258,7 @@ impl ChunkBatch { impl crate::trace::Navigable for ChunkBatch { type Cursor = ChunkBatchCursor; fn cursor(&self) -> Self::Cursor { - // Index the chunks' resident bounds, so navigation binary-searches the - // index and opens only the chunk(s) a query touches. The index lives on - // the cursor rather than the batch: its containers are cursor-typed, and - // batches of cursor-less chunks must still form, merge, and settle. - let n = self.chunks.len(); - let mut first_keys = >::with_capacity(n); - let mut last_keys = >::with_capacity(n); - let mut first_vals = >::with_capacity(n); - let mut last_vals = >::with_capacity(n); - for chunk in &self.chunks { - let ((fk, fv, _), (lk, lv, _)) = chunk.bounds(); - first_keys.push_ref(fk); - last_keys.push_ref(lk); - first_vals.push_ref(fv); - last_vals.push_ref(lv); - } - ChunkBatchCursor { - key_chunk: 0, - chunk: 0, - inner: self.chunks.first().map(C::cursor), - first_keys, last_keys, first_vals, last_vals, - } + ChunkBatchCursor { key_chunk: 0, chunk: 0, inner: self.chunks.first().map(C::cursor) } } } @@ -323,10 +303,13 @@ pub type ChunkBuilder = crate::trace::rc_blanket_impls::RcBuilder { /// First chunk of the current key's run; where `rewind_vals` returns to. key_chunk: usize, @@ -334,12 +317,6 @@ pub struct ChunkBatchCursor { chunk: usize, /// Cursor into `chunk`; `None` once `chunk` is past the last chunk. inner: Option, - /// Per-chunk first and last key, and first and last val, parallel to the - /// batch's `chunks`: the resident index the seeks binary-search. - first_keys: KeyCon, - last_keys: KeyCon, - first_vals: ValCon, - last_vals: ValCon, } impl ChunkBatchCursor { @@ -352,19 +329,18 @@ impl ChunkBatchCursor { /// Does key `k` span the boundary between chunks `c` and `c + 1` — chunk `c` /// ends with it and chunk `c + 1` begins with it? /// - /// The `reborrow`s confine the comparison's lifetime to this call: the bounds - /// index lives on `self`, and without them the (invariant) item lifetimes - /// would tie the borrow of `self` to `k`, blocking the caller's `goto`. - fn key_spills(&self, c: usize, k: ::Key<'_>) -> bool { - as BatchContainer>::reborrow(self.last_keys.index(c)) == as BatchContainer>::reborrow(k) - && as BatchContainer>::reborrow(self.first_keys.index(c + 1)) == as BatchContainer>::reborrow(k) + /// Two resident [`bounds`](NavigableChunk::bounds) reads; the `reborrow`s + /// unify the (invariant) item lifetimes with `k`'s. + fn key_spills(s: &ChunkBatch, c: usize, k: ::Key<'_>) -> bool { + as BatchContainer>::reborrow(s.chunks[c].bounds().1.0) == as BatchContainer>::reborrow(k) + && as BatchContainer>::reborrow(s.chunks[c + 1].bounds().0.0) == as BatchContainer>::reborrow(k) } /// Does `(k, v)` span the boundary between chunks `c` and `c + 1`? - fn val_spills(&self, c: usize, k: ::Key<'_>, v: ::Val<'_>) -> bool { - self.key_spills(c, k) - && as BatchContainer>::reborrow(self.last_vals.index(c)) == as BatchContainer>::reborrow(v) - && as BatchContainer>::reborrow(self.first_vals.index(c + 1)) == as BatchContainer>::reborrow(v) + fn val_spills(s: &ChunkBatch, c: usize, k: ::Key<'_>, v: ::Val<'_>) -> bool { + Self::key_spills(s, c, k) + && as BatchContainer>::reborrow(s.chunks[c].bounds().1.1) == as BatchContainer>::reborrow(v) + && as BatchContainer>::reborrow(s.chunks[c + 1].bounds().0.1) == as BatchContainer>::reborrow(v) } } @@ -396,7 +372,7 @@ impl Cursor for ChunkBatchCursor { self.inner.as_mut().unwrap().map_times(&s.chunks[self.chunk], &mut logic); // Follow the (key, val) forward across boundaries while it spills. let mut c = self.chunk; - while c + 1 < s.chunks.len() && self.val_spills(c, k, v) { + while c + 1 < s.chunks.len() && Self::val_spills(s, c, k, v) { c += 1; s.chunks[c].cursor().map_times(&s.chunks[c], &mut logic); } @@ -407,7 +383,7 @@ impl Cursor for ChunkBatchCursor { let n = s.chunks.len(); let k = self.key(s); // Advance to the last chunk the key spans. - while self.chunk + 1 < n && self.key_spills(self.chunk, k) { + while self.chunk + 1 < n && Self::key_spills(s, self.chunk, k) { self.goto(self.chunk + 1, s); } // Step past the key within its last chunk. @@ -426,12 +402,16 @@ impl Cursor for ChunkBatchCursor { fn seek_key(&mut self, s: &Self::Storage, key: Self::Key<'_>) { let n = s.chunks.len(); // First chunk whose last key is `>= key`: where `key`'s run begins. - let c = self.last_keys.advance(0, n, |x| { - as BatchContainer>::reborrow(x).lt(& as BatchContainer>::reborrow(key)) - }); - self.goto(c, s); - self.key_chunk = c; - if c < n { self.inner.as_mut().unwrap().seek_key(&s.chunks[c], key); } + // Binary search over the chunks' resident bounds. + let (mut lo, mut hi) = (0, n); + while lo < hi { + let mid = lo + (hi - lo) / 2; + let last = as BatchContainer>::reborrow(s.chunks[mid].bounds().1.0); + if last.lt(& as BatchContainer>::reborrow(key)) { lo = mid + 1; } else { hi = mid; } + } + self.goto(lo, s); + self.key_chunk = lo; + if lo < n { self.inner.as_mut().unwrap().seek_key(&s.chunks[lo], key); } } fn step_val(&mut self, s: &Self::Storage) { @@ -439,14 +419,14 @@ impl Cursor for ChunkBatchCursor { let n = s.chunks.len(); let (k, v) = (self.key(s), self.val(s)); // Advance to the last chunk the (key, val) spans. - while self.chunk + 1 < n && self.val_spills(self.chunk, k, v) { + while self.chunk + 1 < n && Self::val_spills(s, self.chunk, k, v) { self.goto(self.chunk + 1, s); } // Step past the (key, val) within that chunk. self.inner.as_mut().unwrap().step_val(&s.chunks[self.chunk]); // If the key's vals are exhausted here but the key spills, roll forward. if !self.inner.as_ref().unwrap().val_valid(&s.chunks[self.chunk]) - && self.chunk + 1 < n && self.key_spills(self.chunk, k) + && self.chunk + 1 < n && Self::key_spills(s, self.chunk, k) { self.goto(self.chunk + 1, s); self.inner.as_mut().unwrap().seek_key(&s.chunks[self.chunk], k); @@ -461,7 +441,7 @@ impl Cursor for ChunkBatchCursor { self.inner.as_mut().unwrap().seek_val(&s.chunks[self.chunk], val); if self.inner.as_ref().unwrap().val_valid(&s.chunks[self.chunk]) { return; } // Key's vals exhausted in this chunk; if the key spills, retry in the next. - if self.chunk + 1 < n && self.key_spills(self.chunk, k) { + if self.chunk + 1 < n && Self::key_spills(s, self.chunk, k) { self.goto(self.chunk + 1, s); self.inner.as_mut().unwrap().seek_key(&s.chunks[self.chunk], k); } else { From b3461907e53a99390f963697906784d3e65a636f Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 2 Jul 2026 16:50:19 -0400 Subject: [PATCH 3/3] perf: gallop the straddle cursor's chunk seek from a hint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `seek_key` binary-searched the chunks' resident bounds over the whole `[0, chunks)` range on every call. Seeks within a cursor pass are monotone forward, so start from the current key's first chunk (`key_chunk`) and gallop, falling back to a full search only on a backward seek. A monotone sweep now costs `O(log Δ)` bounds reads per seek rather than `O(log chunks)`. Isolated, the chunk-selection cost drops several-fold on large batches (up to ~19x at ~1000 chunks) and regresses only by a negligible constant on very sparse probe sets. No new state: the batch carries no index, and the hint is the `key_chunk` the cursor already tracks. Adds a test asserting `seek_key` lands at the first key `>= target` for every (start, target) pair, forward and backward, so the hint never changes the result. Co-Authored-By: Claude Opus 4.8 (1M context) --- differential-dataflow/src/trace/chunk/mod.rs | 49 +++++++++++++------- differential-dataflow/src/trace/chunk/vec.rs | 30 ++++++++++++ 2 files changed, 63 insertions(+), 16 deletions(-) diff --git a/differential-dataflow/src/trace/chunk/mod.rs b/differential-dataflow/src/trace/chunk/mod.rs index 46cb8bb5a..93a4d28aa 100644 --- a/differential-dataflow/src/trace/chunk/mod.rs +++ b/differential-dataflow/src/trace/chunk/mod.rs @@ -48,7 +48,7 @@ //! building a whole sequence and settling at the end. And every walk over a whole //! chunk sequence reads only resident metadata — [`len`](Chunk::len) and //! [`bounds`](NavigableChunk::bounds) — never a chunk body: the straddle cursor -//! seeks by binary-searching the chunks' bounds and opens only the chunk(s) a +//! seeks by galloping the chunks' bounds from a hint and opens only the chunk(s) a //! query touches. Implementors must therefore keep `len` and `bounds` cheap even //! when a chunk's body is paged out. @@ -303,13 +303,13 @@ pub type ChunkBuilder = crate::trace::rc_blanket_impls::RcBuilder { /// First chunk of the current key's run; where `rewind_vals` returns to. key_chunk: usize, @@ -342,6 +342,28 @@ impl ChunkBatchCursor { && as BatchContainer>::reborrow(s.chunks[c].bounds().1.1) == as BatchContainer>::reborrow(v) && as BatchContainer>::reborrow(s.chunks[c + 1].bounds().0.1) == as BatchContainer>::reborrow(v) } + + /// The first chunk, at or after `hint`, whose last key is `>= key`: where `key`'s run + /// begins. `hint` — the current key's first chunk (`key_chunk`) — is a valid lower bound + /// for a forward seek; a backward seek is detected and served by a full search from the + /// front. Galloping keeps a monotone seek sweep at `O(log Δ)` bounds reads per seek rather + /// than `O(log chunks)`; only resident [`bounds`](NavigableChunk::bounds) are read. + fn locate_key(s: &ChunkBatch, hint: usize, key: ::Key<'_>) -> usize { + let n = s.chunks.len(); + // `last_key(i) < key`, from chunk `i`'s resident bounds. + let lt = |i: usize| as BatchContainer>::reborrow(s.chunks[i].bounds().1.0) + .lt(& as BatchContainer>::reborrow(key)); + let hint = hint.min(n); + // The hint can skip the answer only on a backward seek; then search from the front. + let lo = if hint == 0 || lt(hint - 1) { hint } else { 0 }; + if lo >= n || !lt(lo) { return lo; } + // Exponential search from `lo`, then binary within the final bracket. + let (mut prev, mut step) = (lo, 1usize); + while prev + step < n && lt(prev + step) { prev += step; step <<= 1; } + let (mut a, mut b) = (prev + 1, (prev + step).min(n)); + while a < b { let m = a + (b - a) / 2; if lt(m) { a = m + 1; } else { b = m; } } + a + } } impl Cursor for ChunkBatchCursor { @@ -401,14 +423,9 @@ impl Cursor for ChunkBatchCursor { fn seek_key(&mut self, s: &Self::Storage, key: Self::Key<'_>) { let n = s.chunks.len(); - // First chunk whose last key is `>= key`: where `key`'s run begins. - // Binary search over the chunks' resident bounds. - let (mut lo, mut hi) = (0, n); - while lo < hi { - let mid = lo + (hi - lo) / 2; - let last = as BatchContainer>::reborrow(s.chunks[mid].bounds().1.0); - if last.lt(& as BatchContainer>::reborrow(key)) { lo = mid + 1; } else { hi = mid; } - } + // First chunk whose last key is `>= key`: where `key`'s run begins. Gallop from the + // current key's first chunk (`key_chunk`), a lower bound for forward seeks. + let lo = Self::locate_key(s, self.key_chunk, key); self.goto(lo, s); self.key_chunk = lo; if lo < n { self.inner.as_mut().unwrap().seek_key(&s.chunks[lo], key); } diff --git a/differential-dataflow/src/trace/chunk/vec.rs b/differential-dataflow/src/trace/chunk/vec.rs index 7d47e289e..1a6b9aff8 100644 --- a/differential-dataflow/src/trace/chunk/vec.rs +++ b/differential-dataflow/src/trace/chunk/vec.rs @@ -767,4 +767,34 @@ mod test { eprintln!("probes={probes:>7}: gallop={g:>12?} linear={l:>12?}"); } } + + // `seek_key` must land at the first key `>= target` regardless of where the cursor + // starts, so the galloping hint (and its backward-seek fallback) never changes the + // answer. Probe every (start, target) pair — forward and backward — against an + // analytic oracle. + #[test] + fn seek_key_hint_is_direction_independent() { + use crate::trace::cursor::Cursor; + use crate::trace::Description; + use crate::trace::chunk::ChunkBatch; + use timely::progress::Antichain; + + // One key per chunk (even keys 0, 2, .., 38) so seeks cross boundaries both ways. + let chunks: Vec<_> = (0..20u64).map(|k| chunk(vec![((2 * k, 0u64), 0u64, 1i64)])).collect(); + let desc = Description::new( + Antichain::from_elem(0u64), Antichain::from_elem(1u64), Antichain::from_elem(0u64)); + let batch = ChunkBatch::new(chunks, desc); + + // First key `>= tgt`: the next even at or above `tgt`, or absent past the last (38). + let oracle = |tgt: u64| { let e = tgt + (tgt & 1); (e <= 38).then_some(e) }; + for start in 0..=40u64 { + for tgt in 0..=40u64 { + let mut c = batch.cursor(); + c.seek_key(&batch, &start); + c.seek_key(&batch, &tgt); + assert_eq!(c.get_key(&batch).copied(), oracle(tgt), "start={start} tgt={tgt}"); + } + } + } + }