diff --git a/differential-dataflow/benches/chunk_bench.rs b/differential-dataflow/benches/chunk_bench.rs index aea4b85f0..33e34f9cd 100644 --- a/differential-dataflow/benches/chunk_bench.rs +++ b/differential-dataflow/benches/chunk_bench.rs @@ -22,7 +22,7 @@ use timely::container::{PushInto, SizableContainer}; use differential_dataflow::consolidation::Consolidate; use differential_dataflow::columnar::layout::ColumnarUpdate; -use differential_dataflow::trace::chunk::{merge_chains, Chunk}; +use differential_dataflow::trace::chunk::{merge_chains, Chunk, NavigableChunk}; use differential_dataflow::trace::chunk::vec::VecChunk; use differential_dataflow::columnar::trace::ColChunk; use differential_dataflow::trace::cursor::Cursor; @@ -60,7 +60,7 @@ where /// Walk every `(key, val, time, diff)` via the chunk cursors, counting updates /// (a uniform proxy for scan cost across layouts). -fn scan(chunks: &[C]) -> usize { +fn scan(chunks: &[C]) -> usize { let mut count = 0; for c in chunks { let mut cur = c.cursor(); @@ -81,7 +81,7 @@ fn ms(t: Instant) -> f64 { t.elapsed().as_secs_f64() * 1000.0 } /// two interleaving halves for the merge. Returns timings + footprint. fn bench(updates: &[I], half_a: &[I], half_b: &[I]) -> (f64, f64, f64, f64, usize, usize) where - C: Chunk + Default + SizableContainer + Consolidate + Container + PushInto, + C: NavigableChunk + Default + SizableContainer + Consolidate + Container + PushInto, I: Clone, { let (half_a, half_b) = (half_a.to_vec(), half_b.to_vec()); diff --git a/differential-dataflow/src/columnar/trace/chunk.rs b/differential-dataflow/src/columnar/trace/chunk.rs index f4cb6718f..a8357ae28 100644 --- a/differential-dataflow/src/columnar/trace/chunk.rs +++ b/differential-dataflow/src/columnar/trace/chunk.rs @@ -301,12 +301,8 @@ where U::Time: 'static { } } -impl Chunk for ColChunk +impl crate::trace::chunk::NavigableChunk for ColChunk where U::Time: 'static { - type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; - - const TARGET: usize = TARGET; - fn bounds(&self) -> ( (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), @@ -327,6 +323,13 @@ where U::Time: 'static { } } } +} + +impl Chunk for ColChunk +where U::Time: 'static { + type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; + + const TARGET: usize = TARGET; fn len(&self) -> usize { match self { diff --git a/differential-dataflow/src/trace/chunk/mod.rs b/differential-dataflow/src/trace/chunk/mod.rs index 940a28764..93a4d28aa 100644 --- a/differential-dataflow/src/trace/chunk/mod.rs +++ b/differential-dataflow/src/trace/chunk/mod.rs @@ -22,6 +22,8 @@ //! [`arrange_core`](crate::operators::arrange::arrangement::arrange_core), along with a //! chunker that forms `C` from the input stream — typically //! [`ContainerChunker`](crate::trace::implementations::chunker::ContainerChunker). +//! Trace *maintenance* needs only [`Chunk`]; cursor-driven *consumption* of the +//! arrangement additionally asks `C` for the [`NavigableChunk`] capability. //! Everything else here ([`ChunkBatch`], [`ChunkMerger`], [`ChunkBatchMerger`], //! [`ChunkBatchCursor`], [`ChunkBatchBuilder`]) is machinery those aliases expand to and is //! not named directly. The [`vec`](mod@vec) module is a worked `Chunk` @@ -45,10 +47,10 @@ //! settles its committed output as it goes (see [`Chunk::settle`]), rather than //! building a whole sequence and settling at the end. And every walk over a whole //! chunk sequence reads only resident metadata — [`len`](Chunk::len) and -//! [`bounds`](Chunk::bounds) — never a chunk body: a batch indexes its chunks' -//! bounds once at construction, so cursors binary-search that resident index and -//! open only the chunk(s) a query touches. Implementors must therefore keep `len` -//! and `bounds` cheap even when a chunk's body is paged out. +//! [`bounds`](NavigableChunk::bounds) — never a chunk body: the straddle cursor +//! seeks by galloping the chunks' bounds from a hint and opens only the chunk(s) a +//! query touches. Implementors must therefore keep `len` and `bounds` cheap even +//! when a chunk's body is paged out. use std::collections::VecDeque; @@ -70,22 +72,20 @@ pub mod vec; /// The "data" operations transform lists of chunks, are expected to do roughly /// "one chunk's worth" of work at a time; they can afford to compress and page. /// The "metadata" operations provide chunk information, and should be lightweight. -pub trait Chunk: Navigable> + Sized + Clone { +/// +/// The trait has no opinion about keys, vals, or diffs — only time, which trace +/// maintenance needs. Reading a chunk's contents is a separate, optional +/// capability: see [`NavigableChunk`]. +pub trait Chunk: Sized + Clone { /// The timestamp type of the chunk's updates. /// - /// Key/val/diff opinions live on the chunk's [`Navigable::Cursor`]; the chunk itself only needs - /// time, to bound its interval and participate in advancement and compaction. + /// Key/val/diff opinions live on the optional [`NavigableChunk`] capability; the chunk itself + /// only needs time, to bound its interval and participate in advancement and compaction. type Time: Lattice + timely::progress::Timestamp; /// The intended maximum chunk size. const TARGET: usize; - /// The first and last `(key, val, time)` triples in the chunk. - fn bounds(&self) -> ( - (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), - (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), - ); - /// The number of updates in the chunk. fn len(&self) -> usize; @@ -148,6 +148,25 @@ pub trait Chunk: Navigable> + Sized + Clone { } +/// The navigation capability: a [`Chunk`] whose contents can be read by cursor. +/// +/// This is optional. Batch formation and trace maintenance need only [`Chunk`]; +/// implementing this trait additionally lets [`ChunkBatch`] offer the straddle +/// cursor ([`ChunkBatchCursor`]), which is how cursor-driven operator paths read +/// an arrangement. Chunks consumed only by whole-chunk logic (tactics) can skip it. +/// +/// `bounds` must stay cheap even when a chunk's body is paged out: the straddle +/// cursor consults chunk bounds throughout navigation — seeks binary-search them, +/// boundary crossings compare against them — and opens a chunk's body only when a +/// query touches it. +pub trait NavigableChunk: Chunk + Navigable::Time>> { + /// The first and last `(key, val, time)` triples in the chunk. + fn bounds(&self) -> ( + (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), + (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), + ); +} + /// Maximal-packing driver an implementor's [`Chunk::settle`] may delegate to. /// /// Holds a `carry` chunk under construction, grown by `combine` until it reaches @@ -219,41 +238,24 @@ type KeyCon = <::Cursor as Cursor>::KeyContainer; type ValCon = <::Cursor as Cursor>::ValContainer; /// A batch is a [`Chunk`] sequence plus a [`Description`]. -/// -/// Metadata about the batches is cached to make subselection efficient. pub struct ChunkBatch { /// Ordered, consolidated chunks; their concatenation is the batch. pub chunks: Vec, /// The lower, upper, and since frontiers of the batch. pub description: Description, - /// Per-chunk first and last key, and first and last val, parallel to `chunks`. - first_keys: KeyCon, - last_keys: KeyCon, - first_vals: ValCon, - last_vals: ValCon, } impl ChunkBatch { - /// Assemble a batch from ordered chunks, building the per-chunk index. + /// Assemble a batch from ordered chunks. pub fn new(chunks: Vec, description: Description) -> Self { - let n = chunks.len(); - let mut first_keys = >::with_capacity(n); - let mut last_keys = >::with_capacity(n); - let mut first_vals = >::with_capacity(n); - let mut last_vals = >::with_capacity(n); for chunk in &chunks { assert!(chunk.len() > 0, "ChunkBatch chunks must be non-empty"); - let ((fk, fv, _), (lk, lv, _)) = chunk.bounds(); - first_keys.push_ref(fk); - last_keys.push_ref(lk); - first_vals.push_ref(fv); - last_vals.push_ref(lv); } - ChunkBatch { chunks, description, first_keys, last_keys, first_vals, last_vals } + ChunkBatch { chunks, description } } } -impl crate::trace::Navigable for ChunkBatch { +impl crate::trace::Navigable for ChunkBatch { type Cursor = ChunkBatchCursor; fn cursor(&self) -> Self::Cursor { ChunkBatchCursor { key_chunk: 0, chunk: 0, inner: self.chunks.first().map(C::cursor) } @@ -301,11 +303,14 @@ pub type ChunkBuilder = crate::trace::rc_blanket_impls::RcBuilder { +/// and a cursor into it; it seeks by galloping the chunks' resident +/// [`bounds`](NavigableChunk::bounds) from a remembered hint (the current key's first +/// chunk), and at boundaries it *continues* into the next chunk rather than merging — +/// consulting the two neighbouring chunks' bounds to detect when a key or `(key, val)` +/// spills forward, without touching chunk contents. No state is materialized up front: +/// a monotone seek sweep costs `O(log Δ)` bounds reads per seek and a sequential pass two +/// per boundary, so cursor construction is free. +pub struct ChunkBatchCursor { /// First chunk of the current key's run; where `rewind_vals` returns to. key_chunk: usize, /// Chunk currently being read; `>= key_chunk`, within the current key's span. @@ -314,15 +319,54 @@ pub struct ChunkBatchCursor { inner: Option, } -impl ChunkBatchCursor { +impl ChunkBatchCursor { /// Move the active chunk to `c`, opening a fresh inner cursor at its start. fn goto(&mut self, c: usize, storage: &ChunkBatch) { self.chunk = c; self.inner = storage.chunks.get(c).map(C::cursor); } + + /// Does key `k` span the boundary between chunks `c` and `c + 1` — chunk `c` + /// ends with it and chunk `c + 1` begins with it? + /// + /// Two resident [`bounds`](NavigableChunk::bounds) reads; the `reborrow`s + /// unify the (invariant) item lifetimes with `k`'s. + fn key_spills(s: &ChunkBatch, c: usize, k: ::Key<'_>) -> bool { + as BatchContainer>::reborrow(s.chunks[c].bounds().1.0) == as BatchContainer>::reborrow(k) + && as BatchContainer>::reborrow(s.chunks[c + 1].bounds().0.0) == as BatchContainer>::reborrow(k) + } + + /// Does `(k, v)` span the boundary between chunks `c` and `c + 1`? + fn val_spills(s: &ChunkBatch, c: usize, k: ::Key<'_>, v: ::Val<'_>) -> bool { + Self::key_spills(s, c, k) + && as BatchContainer>::reborrow(s.chunks[c].bounds().1.1) == as BatchContainer>::reborrow(v) + && as BatchContainer>::reborrow(s.chunks[c + 1].bounds().0.1) == as BatchContainer>::reborrow(v) + } + + /// The first chunk, at or after `hint`, whose last key is `>= key`: where `key`'s run + /// begins. `hint` — the current key's first chunk (`key_chunk`) — is a valid lower bound + /// for a forward seek; a backward seek is detected and served by a full search from the + /// front. Galloping keeps a monotone seek sweep at `O(log Δ)` bounds reads per seek rather + /// than `O(log chunks)`; only resident [`bounds`](NavigableChunk::bounds) are read. + fn locate_key(s: &ChunkBatch, hint: usize, key: ::Key<'_>) -> usize { + let n = s.chunks.len(); + // `last_key(i) < key`, from chunk `i`'s resident bounds. + let lt = |i: usize| as BatchContainer>::reborrow(s.chunks[i].bounds().1.0) + .lt(& as BatchContainer>::reborrow(key)); + let hint = hint.min(n); + // The hint can skip the answer only on a backward seek; then search from the front. + let lo = if hint == 0 || lt(hint - 1) { hint } else { 0 }; + if lo >= n || !lt(lo) { return lo; } + // Exponential search from `lo`, then binary within the final bracket. + let (mut prev, mut step) = (lo, 1usize); + while prev + step < n && lt(prev + step) { prev += step; step <<= 1; } + let (mut a, mut b) = (prev + 1, (prev + step).min(n)); + while a < b { let m = a + (b - a) / 2; if lt(m) { a = m + 1; } else { b = m; } } + a + } } -impl Cursor for ChunkBatchCursor { +impl Cursor for ChunkBatchCursor { type Storage = ChunkBatch; type KeyContainer = ::KeyContainer; @@ -350,10 +394,7 @@ impl Cursor for ChunkBatchCursor { self.inner.as_mut().unwrap().map_times(&s.chunks[self.chunk], &mut logic); // Follow the (key, val) forward across boundaries while it spills. let mut c = self.chunk; - while c + 1 < s.chunks.len() - && s.last_keys.index(c) == k && s.first_keys.index(c + 1) == k - && s.last_vals.index(c) == v && s.first_vals.index(c + 1) == v - { + while c + 1 < s.chunks.len() && Self::val_spills(s, c, k, v) { c += 1; s.chunks[c].cursor().map_times(&s.chunks[c], &mut logic); } @@ -364,7 +405,7 @@ impl Cursor for ChunkBatchCursor { let n = s.chunks.len(); let k = self.key(s); // Advance to the last chunk the key spans. - while self.chunk + 1 < n && s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k { + while self.chunk + 1 < n && Self::key_spills(s, self.chunk, k) { self.goto(self.chunk + 1, s); } // Step past the key within its last chunk. @@ -382,13 +423,12 @@ impl Cursor for ChunkBatchCursor { fn seek_key(&mut self, s: &Self::Storage, key: Self::Key<'_>) { let n = s.chunks.len(); - // First chunk whose last key is `>= key`: where `key`'s run begins. - let c = s.last_keys.advance(0, n, |x| { - as BatchContainer>::reborrow(x).lt(& as BatchContainer>::reborrow(key)) - }); - self.goto(c, s); - self.key_chunk = c; - if c < n { self.inner.as_mut().unwrap().seek_key(&s.chunks[c], key); } + // First chunk whose last key is `>= key`: where `key`'s run begins. Gallop from the + // current key's first chunk (`key_chunk`), a lower bound for forward seeks. + let lo = Self::locate_key(s, self.key_chunk, key); + self.goto(lo, s); + self.key_chunk = lo; + if lo < n { self.inner.as_mut().unwrap().seek_key(&s.chunks[lo], key); } } fn step_val(&mut self, s: &Self::Storage) { @@ -396,17 +436,14 @@ impl Cursor for ChunkBatchCursor { let n = s.chunks.len(); let (k, v) = (self.key(s), self.val(s)); // Advance to the last chunk the (key, val) spans. - while self.chunk + 1 < n - && s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k - && s.last_vals.index(self.chunk) == v && s.first_vals.index(self.chunk + 1) == v - { + while self.chunk + 1 < n && Self::val_spills(s, self.chunk, k, v) { self.goto(self.chunk + 1, s); } // Step past the (key, val) within that chunk. self.inner.as_mut().unwrap().step_val(&s.chunks[self.chunk]); // If the key's vals are exhausted here but the key spills, roll forward. if !self.inner.as_ref().unwrap().val_valid(&s.chunks[self.chunk]) - && self.chunk + 1 < n && s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k + && self.chunk + 1 < n && Self::key_spills(s, self.chunk, k) { self.goto(self.chunk + 1, s); self.inner.as_mut().unwrap().seek_key(&s.chunks[self.chunk], k); @@ -421,7 +458,7 @@ impl Cursor for ChunkBatchCursor { self.inner.as_mut().unwrap().seek_val(&s.chunks[self.chunk], val); if self.inner.as_ref().unwrap().val_valid(&s.chunks[self.chunk]) { return; } // Key's vals exhausted in this chunk; if the key spills, retry in the next. - if self.chunk + 1 < n && s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k { + if self.chunk + 1 < n && Self::key_spills(s, self.chunk, k) { self.goto(self.chunk + 1, s); self.inner.as_mut().unwrap().seek_key(&s.chunks[self.chunk], k); } else { diff --git a/differential-dataflow/src/trace/chunk/vec.rs b/differential-dataflow/src/trace/chunk/vec.rs index 71170835b..1a6b9aff8 100644 --- a/differential-dataflow/src/trace/chunk/vec.rs +++ b/differential-dataflow/src/trace/chunk/vec.rs @@ -199,17 +199,20 @@ where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Timestamp, R: Ord+S } } -impl Chunk for VecChunk +impl super::NavigableChunk for VecChunk where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Timestamp, R: Ord+Semigroup+'static { - type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; - - const TARGET: usize = TARGET; - fn bounds(&self) -> ((&K, &V, &T), (&K, &V, &T)) { let s = &self.0[..]; let (f, l) = (&s[0], &s[s.len() - 1]); ((&f.0.0, &f.0.1, &f.1), (&l.0.0, &l.0.1, &l.1)) } +} + +impl Chunk for VecChunk +where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Timestamp, R: Ord+Semigroup+'static { + type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; + + const TARGET: usize = TARGET; fn len(&self) -> usize { self.0.len() } @@ -764,4 +767,34 @@ mod test { eprintln!("probes={probes:>7}: gallop={g:>12?} linear={l:>12?}"); } } + + // `seek_key` must land at the first key `>= target` regardless of where the cursor + // starts, so the galloping hint (and its backward-seek fallback) never changes the + // answer. Probe every (start, target) pair — forward and backward — against an + // analytic oracle. + #[test] + fn seek_key_hint_is_direction_independent() { + use crate::trace::cursor::Cursor; + use crate::trace::Description; + use crate::trace::chunk::ChunkBatch; + use timely::progress::Antichain; + + // One key per chunk (even keys 0, 2, .., 38) so seeks cross boundaries both ways. + let chunks: Vec<_> = (0..20u64).map(|k| chunk(vec![((2 * k, 0u64), 0u64, 1i64)])).collect(); + let desc = Description::new( + Antichain::from_elem(0u64), Antichain::from_elem(1u64), Antichain::from_elem(0u64)); + let batch = ChunkBatch::new(chunks, desc); + + // First key `>= tgt`: the next even at or above `tgt`, or absent past the last (38). + let oracle = |tgt: u64| { let e = tgt + (tgt & 1); (e <= 38).then_some(e) }; + for start in 0..=40u64 { + for tgt in 0..=40u64 { + let mut c = batch.cursor(); + c.seek_key(&batch, &start); + c.seek_key(&batch, &tgt); + assert_eq!(c.get_key(&batch).copied(), oracle(tgt), "start={start} tgt={tgt}"); + } + } + } + }