Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions differential-dataflow/benches/chunk_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use timely::container::{PushInto, SizableContainer};

use differential_dataflow::consolidation::Consolidate;
use differential_dataflow::columnar::layout::ColumnarUpdate;
use differential_dataflow::trace::chunk::{merge_chains, Chunk};
use differential_dataflow::trace::chunk::{merge_chains, Chunk, NavigableChunk};
use differential_dataflow::trace::chunk::vec::VecChunk;
use differential_dataflow::columnar::trace::ColChunk;
use differential_dataflow::trace::cursor::Cursor;
Expand Down Expand Up @@ -60,7 +60,7 @@ where

/// Walk every `(key, val, time, diff)` via the chunk cursors, counting updates
/// (a uniform proxy for scan cost across layouts).
fn scan<C: Chunk>(chunks: &[C]) -> usize {
fn scan<C: NavigableChunk>(chunks: &[C]) -> usize {
let mut count = 0;
for c in chunks {
let mut cur = c.cursor();
Expand All @@ -81,7 +81,7 @@ fn ms(t: Instant) -> f64 { t.elapsed().as_secs_f64() * 1000.0 }
/// two interleaving halves for the merge. Returns timings + footprint.
fn bench<C, I>(updates: &[I], half_a: &[I], half_b: &[I]) -> (f64, f64, f64, f64, usize, usize)
where
C: Chunk + Default + SizableContainer + Consolidate + Container + PushInto<I>,
C: NavigableChunk + Default + SizableContainer + Consolidate + Container + PushInto<I>,
I: Clone,
{
let (half_a, half_b) = (half_a.to_vec(), half_b.to_vec());
Expand Down
13 changes: 8 additions & 5 deletions differential-dataflow/src/columnar/trace/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,8 @@ where U::Time: 'static {
}
}

impl<U: ColumnarUpdate> Chunk for ColChunk<U>
impl<U: ColumnarUpdate> crate::trace::chunk::NavigableChunk for ColChunk<U>
where U::Time: 'static {
type Time = <<ColumnarLayout<U> as Layout>::TimeContainer as BatchContainer>::Owned;

const TARGET: usize = TARGET;

fn bounds(&self) -> (
(<Self::Cursor as Cursor>::Key<'_>, <Self::Cursor as Cursor>::Val<'_>, <Self::Cursor as Cursor>::TimeGat<'_>),
(<Self::Cursor as Cursor>::Key<'_>, <Self::Cursor as Cursor>::Val<'_>, <Self::Cursor as Cursor>::TimeGat<'_>),
Expand All @@ -327,6 +323,13 @@ where U::Time: 'static {
}
}
}
}

impl<U: ColumnarUpdate> Chunk for ColChunk<U>
where U::Time: 'static {
type Time = <<ColumnarLayout<U> as Layout>::TimeContainer as BatchContainer>::Owned;

const TARGET: usize = TARGET;

fn len(&self) -> usize {
match self {
Expand Down
153 changes: 95 additions & 58 deletions differential-dataflow/src/trace/chunk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
//! [`arrange_core`](crate::operators::arrange::arrangement::arrange_core), along with a
//! chunker that forms `C` from the input stream — typically
//! [`ContainerChunker<C>`](crate::trace::implementations::chunker::ContainerChunker).
//! Trace *maintenance* needs only [`Chunk`]; cursor-driven *consumption* of the
//! arrangement additionally asks `C` for the [`NavigableChunk`] capability.
//! Everything else here ([`ChunkBatch`], [`ChunkMerger`], [`ChunkBatchMerger`],
//! [`ChunkBatchCursor`], [`ChunkBatchBuilder`]) is machinery those aliases expand to and is
//! not named directly. The [`vec`](mod@vec) module is a worked `Chunk`
Expand All @@ -45,10 +47,10 @@
//! settles its committed output as it goes (see [`Chunk::settle`]), rather than
//! building a whole sequence and settling at the end. And every walk over a whole
//! chunk sequence reads only resident metadata — [`len`](Chunk::len) and
//! [`bounds`](Chunk::bounds) — never a chunk body: a batch indexes its chunks'
//! bounds once at construction, so cursors binary-search that resident index and
//! open only the chunk(s) a query touches. Implementors must therefore keep `len`
//! and `bounds` cheap even when a chunk's body is paged out.
//! [`bounds`](NavigableChunk::bounds) — never a chunk body: the straddle cursor
//! seeks by galloping the chunks' bounds from a hint and opens only the chunk(s) a
//! query touches. Implementors must therefore keep `len` and `bounds` cheap even
//! when a chunk's body is paged out.

use std::collections::VecDeque;

Expand All @@ -70,22 +72,20 @@ pub mod vec;
/// The "data" operations transform lists of chunks, are expected to do roughly
/// "one chunk's worth" of work at a time; they can afford to compress and page.
/// The "metadata" operations provide chunk information, and should be lightweight.
pub trait Chunk: Navigable<Cursor: Cursor<Time = Self::Time>> + Sized + Clone {
///
/// The trait has no opinion about keys, vals, or diffs — only time, which trace
/// maintenance needs. Reading a chunk's contents is a separate, optional
/// capability: see [`NavigableChunk`].
pub trait Chunk: Sized + Clone {
/// The timestamp type of the chunk's updates.
///
/// Key/val/diff opinions live on the chunk's [`Navigable::Cursor`]; the chunk itself only needs
/// time, to bound its interval and participate in advancement and compaction.
/// Key/val/diff opinions live on the optional [`NavigableChunk`] capability; the chunk itself
/// only needs time, to bound its interval and participate in advancement and compaction.
type Time: Lattice + timely::progress::Timestamp;

/// The intended maximum chunk size.
const TARGET: usize;

/// The first and last `(key, val, time)` triples in the chunk.
fn bounds(&self) -> (
(<Self::Cursor as Cursor>::Key<'_>, <Self::Cursor as Cursor>::Val<'_>, <Self::Cursor as Cursor>::TimeGat<'_>),
(<Self::Cursor as Cursor>::Key<'_>, <Self::Cursor as Cursor>::Val<'_>, <Self::Cursor as Cursor>::TimeGat<'_>),
);

/// The number of updates in the chunk.
fn len(&self) -> usize;

Expand Down Expand Up @@ -148,6 +148,25 @@ pub trait Chunk: Navigable<Cursor: Cursor<Time = Self::Time>> + Sized + Clone {

}

/// The navigation capability: a [`Chunk`] whose contents can be read by cursor.
///
/// This is optional. Batch formation and trace maintenance need only [`Chunk`];
/// implementing this trait additionally lets [`ChunkBatch`] offer the straddle
/// cursor ([`ChunkBatchCursor`]), which is how cursor-driven operator paths read
/// an arrangement. Chunks consumed only by whole-chunk logic (tactics) can skip it.
///
/// `bounds` must stay cheap even when a chunk's body is paged out: the straddle
/// cursor consults chunk bounds throughout navigation — seeks binary-search them,
/// boundary crossings compare against them — and opens a chunk's body only when a
/// query touches it.
pub trait NavigableChunk: Chunk + Navigable<Cursor: Cursor<Time = <Self as Chunk>::Time>> {
/// The first and last `(key, val, time)` triples in the chunk.
fn bounds(&self) -> (
(<Self::Cursor as Cursor>::Key<'_>, <Self::Cursor as Cursor>::Val<'_>, <Self::Cursor as Cursor>::TimeGat<'_>),
(<Self::Cursor as Cursor>::Key<'_>, <Self::Cursor as Cursor>::Val<'_>, <Self::Cursor as Cursor>::TimeGat<'_>),
);
}

/// Maximal-packing driver an implementor's [`Chunk::settle`] may delegate to.
///
/// Holds a `carry` chunk under construction, grown by `combine` until it reaches
Expand Down Expand Up @@ -219,41 +238,24 @@ type KeyCon<C> = <<C as Navigable>::Cursor as Cursor>::KeyContainer;
type ValCon<C> = <<C as Navigable>::Cursor as Cursor>::ValContainer;

/// A batch is a [`Chunk`] sequence plus a [`Description`].
///
/// Metadata about the batches is cached to make subselection efficient.
pub struct ChunkBatch<C: Chunk> {
/// Ordered, consolidated chunks; their concatenation is the batch.
pub chunks: Vec<C>,
/// The lower, upper, and since frontiers of the batch.
pub description: Description<C::Time>,
/// Per-chunk first and last key, and first and last val, parallel to `chunks`.
first_keys: KeyCon<C>,
last_keys: KeyCon<C>,
first_vals: ValCon<C>,
last_vals: ValCon<C>,
}

impl<C: Chunk> ChunkBatch<C> {
/// Assemble a batch from ordered chunks, building the per-chunk index.
/// Assemble a batch from ordered chunks.
pub fn new(chunks: Vec<C>, description: Description<C::Time>) -> Self {
let n = chunks.len();
let mut first_keys = <KeyCon<C>>::with_capacity(n);
let mut last_keys = <KeyCon<C>>::with_capacity(n);
let mut first_vals = <ValCon<C>>::with_capacity(n);
let mut last_vals = <ValCon<C>>::with_capacity(n);
for chunk in &chunks {
assert!(chunk.len() > 0, "ChunkBatch chunks must be non-empty");
let ((fk, fv, _), (lk, lv, _)) = chunk.bounds();
first_keys.push_ref(fk);
last_keys.push_ref(lk);
first_vals.push_ref(fv);
last_vals.push_ref(lv);
}
ChunkBatch { chunks, description, first_keys, last_keys, first_vals, last_vals }
ChunkBatch { chunks, description }
}
}

impl<C: Chunk> crate::trace::Navigable for ChunkBatch<C> {
impl<C: NavigableChunk> crate::trace::Navigable for ChunkBatch<C> {
type Cursor = ChunkBatchCursor<C>;
fn cursor(&self) -> Self::Cursor {
ChunkBatchCursor { key_chunk: 0, chunk: 0, inner: self.chunks.first().map(C::cursor) }
Expand Down Expand Up @@ -301,11 +303,14 @@ pub type ChunkBuilder<C> = crate::trace::rc_blanket_impls::RcBuilder<ChunkBatchB
/// merely cut at arbitrary points, so the operation is *concatenation*, never a
/// merge: across a boundary a key's vals concatenate and a `(key, val)`'s times
/// concatenate. The cursor exploits this. It holds the chunk currently being read
/// and a cursor into it; it seeks by binary-searching the per-chunk index on
/// `ChunkBatch`, and at boundaries it *continues* into the next chunk rather than
/// merging — using the index to detect when a key or `(key, val)` spills forward,
/// without touching chunk contents.
pub struct ChunkBatchCursor<C: Chunk> {
/// and a cursor into it; it seeks by galloping the chunks' resident
/// [`bounds`](NavigableChunk::bounds) from a remembered hint (the current key's first
/// chunk), and at boundaries it *continues* into the next chunk rather than merging —
/// consulting the two neighbouring chunks' bounds to detect when a key or `(key, val)`
/// spills forward, without touching chunk contents. No state is materialized up front:
/// a monotone seek sweep costs `O(log Δ)` bounds reads per seek and a sequential pass two
/// per boundary, so cursor construction is free.
pub struct ChunkBatchCursor<C: NavigableChunk> {
/// First chunk of the current key's run; where `rewind_vals` returns to.
key_chunk: usize,
/// Chunk currently being read; `>= key_chunk`, within the current key's span.
Expand All @@ -314,15 +319,54 @@ pub struct ChunkBatchCursor<C: Chunk> {
inner: Option<C::Cursor>,
}

impl<C: Chunk> ChunkBatchCursor<C> {
impl<C: NavigableChunk> ChunkBatchCursor<C> {
/// Move the active chunk to `c`, opening a fresh inner cursor at its start.
fn goto(&mut self, c: usize, storage: &ChunkBatch<C>) {
self.chunk = c;
self.inner = storage.chunks.get(c).map(C::cursor);
}

/// Does key `k` span the boundary between chunks `c` and `c + 1` — chunk `c`
/// ends with it and chunk `c + 1` begins with it?
///
/// Two resident [`bounds`](NavigableChunk::bounds) reads; the `reborrow`s
/// unify the (invariant) item lifetimes with `k`'s.
fn key_spills(s: &ChunkBatch<C>, c: usize, k: <C::Cursor as Cursor>::Key<'_>) -> bool {
<KeyCon<C> as BatchContainer>::reborrow(s.chunks[c].bounds().1.0) == <KeyCon<C> as BatchContainer>::reborrow(k)
&& <KeyCon<C> as BatchContainer>::reborrow(s.chunks[c + 1].bounds().0.0) == <KeyCon<C> as BatchContainer>::reborrow(k)
}

/// Does `(k, v)` span the boundary between chunks `c` and `c + 1`?
fn val_spills(s: &ChunkBatch<C>, c: usize, k: <C::Cursor as Cursor>::Key<'_>, v: <C::Cursor as Cursor>::Val<'_>) -> bool {
Self::key_spills(s, c, k)
&& <ValCon<C> as BatchContainer>::reborrow(s.chunks[c].bounds().1.1) == <ValCon<C> as BatchContainer>::reborrow(v)
&& <ValCon<C> as BatchContainer>::reborrow(s.chunks[c + 1].bounds().0.1) == <ValCon<C> as BatchContainer>::reborrow(v)
}

/// The first chunk, at or after `hint`, whose last key is `>= key`: where `key`'s run
/// begins. `hint` — the current key's first chunk (`key_chunk`) — is a valid lower bound
/// for a forward seek; a backward seek is detected and served by a full search from the
/// front. Galloping keeps a monotone seek sweep at `O(log Δ)` bounds reads per seek rather
/// than `O(log chunks)`; only resident [`bounds`](NavigableChunk::bounds) are read.
fn locate_key(s: &ChunkBatch<C>, hint: usize, key: <C::Cursor as Cursor>::Key<'_>) -> usize {
let n = s.chunks.len();
// `last_key(i) < key`, from chunk `i`'s resident bounds.
let lt = |i: usize| <KeyCon<C> as BatchContainer>::reborrow(s.chunks[i].bounds().1.0)
.lt(&<KeyCon<C> as BatchContainer>::reborrow(key));
let hint = hint.min(n);
// The hint can skip the answer only on a backward seek; then search from the front.
let lo = if hint == 0 || lt(hint - 1) { hint } else { 0 };
if lo >= n || !lt(lo) { return lo; }
// Exponential search from `lo`, then binary within the final bracket.
let (mut prev, mut step) = (lo, 1usize);
while prev + step < n && lt(prev + step) { prev += step; step <<= 1; }
let (mut a, mut b) = (prev + 1, (prev + step).min(n));
while a < b { let m = a + (b - a) / 2; if lt(m) { a = m + 1; } else { b = m; } }
a
}
}

impl<C: Chunk> Cursor for ChunkBatchCursor<C> {
impl<C: NavigableChunk> Cursor for ChunkBatchCursor<C> {
type Storage = ChunkBatch<C>;

type KeyContainer = <C::Cursor as Cursor>::KeyContainer;
Expand Down Expand Up @@ -350,10 +394,7 @@ impl<C: Chunk> Cursor for ChunkBatchCursor<C> {
self.inner.as_mut().unwrap().map_times(&s.chunks[self.chunk], &mut logic);
// Follow the (key, val) forward across boundaries while it spills.
let mut c = self.chunk;
while c + 1 < s.chunks.len()
&& s.last_keys.index(c) == k && s.first_keys.index(c + 1) == k
&& s.last_vals.index(c) == v && s.first_vals.index(c + 1) == v
{
while c + 1 < s.chunks.len() && Self::val_spills(s, c, k, v) {
c += 1;
s.chunks[c].cursor().map_times(&s.chunks[c], &mut logic);
}
Expand All @@ -364,7 +405,7 @@ impl<C: Chunk> Cursor for ChunkBatchCursor<C> {
let n = s.chunks.len();
let k = self.key(s);
// Advance to the last chunk the key spans.
while self.chunk + 1 < n && s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k {
while self.chunk + 1 < n && Self::key_spills(s, self.chunk, k) {
self.goto(self.chunk + 1, s);
}
// Step past the key within its last chunk.
Expand All @@ -382,31 +423,27 @@ impl<C: Chunk> Cursor for ChunkBatchCursor<C> {

fn seek_key(&mut self, s: &Self::Storage, key: Self::Key<'_>) {
let n = s.chunks.len();
// First chunk whose last key is `>= key`: where `key`'s run begins.
let c = s.last_keys.advance(0, n, |x| {
<KeyCon<C> as BatchContainer>::reborrow(x).lt(&<KeyCon<C> as BatchContainer>::reborrow(key))
});
self.goto(c, s);
self.key_chunk = c;
if c < n { self.inner.as_mut().unwrap().seek_key(&s.chunks[c], key); }
// First chunk whose last key is `>= key`: where `key`'s run begins. Gallop from the
// current key's first chunk (`key_chunk`), a lower bound for forward seeks.
let lo = Self::locate_key(s, self.key_chunk, key);
self.goto(lo, s);
self.key_chunk = lo;
if lo < n { self.inner.as_mut().unwrap().seek_key(&s.chunks[lo], key); }
}

fn step_val(&mut self, s: &Self::Storage) {
if !self.val_valid(s) { return; }
let n = s.chunks.len();
let (k, v) = (self.key(s), self.val(s));
// Advance to the last chunk the (key, val) spans.
while self.chunk + 1 < n
&& s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k
&& s.last_vals.index(self.chunk) == v && s.first_vals.index(self.chunk + 1) == v
{
while self.chunk + 1 < n && Self::val_spills(s, self.chunk, k, v) {
self.goto(self.chunk + 1, s);
}
// Step past the (key, val) within that chunk.
self.inner.as_mut().unwrap().step_val(&s.chunks[self.chunk]);
// If the key's vals are exhausted here but the key spills, roll forward.
if !self.inner.as_ref().unwrap().val_valid(&s.chunks[self.chunk])
&& self.chunk + 1 < n && s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k
&& self.chunk + 1 < n && Self::key_spills(s, self.chunk, k)
{
self.goto(self.chunk + 1, s);
self.inner.as_mut().unwrap().seek_key(&s.chunks[self.chunk], k);
Expand All @@ -421,7 +458,7 @@ impl<C: Chunk> Cursor for ChunkBatchCursor<C> {
self.inner.as_mut().unwrap().seek_val(&s.chunks[self.chunk], val);
if self.inner.as_ref().unwrap().val_valid(&s.chunks[self.chunk]) { return; }
// Key's vals exhausted in this chunk; if the key spills, retry in the next.
if self.chunk + 1 < n && s.last_keys.index(self.chunk) == k && s.first_keys.index(self.chunk + 1) == k {
if self.chunk + 1 < n && Self::key_spills(s, self.chunk, k) {
self.goto(self.chunk + 1, s);
self.inner.as_mut().unwrap().seek_key(&s.chunks[self.chunk], k);
} else {
Expand Down
43 changes: 38 additions & 5 deletions differential-dataflow/src/trace/chunk/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,20 @@ where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Timestamp, R: Ord+S
}
}

impl<K, V, T, R> Chunk for VecChunk<K, V, T, R>
impl<K, V, T, R> super::NavigableChunk for VecChunk<K, V, T, R>
where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Timestamp, R: Ord+Semigroup+'static {
type Time = <<Vector<((K, V), T, R)> as Layout>::TimeContainer as BatchContainer>::Owned;

const TARGET: usize = TARGET;

fn bounds(&self) -> ((&K, &V, &T), (&K, &V, &T)) {
let s = &self.0[..];
let (f, l) = (&s[0], &s[s.len() - 1]);
((&f.0.0, &f.0.1, &f.1), (&l.0.0, &l.0.1, &l.1))
}
}

impl<K, V, T, R> Chunk for VecChunk<K, V, T, R>
where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Timestamp, R: Ord+Semigroup+'static {
type Time = <<Vector<((K, V), T, R)> as Layout>::TimeContainer as BatchContainer>::Owned;

const TARGET: usize = TARGET;

fn len(&self) -> usize { self.0.len() }

Expand Down Expand Up @@ -764,4 +767,34 @@ mod test {
eprintln!("probes={probes:>7}: gallop={g:>12?} linear={l:>12?}");
}
}

// `seek_key` must land at the first key `>= target` regardless of where the cursor
// starts, so the galloping hint (and its backward-seek fallback) never changes the
// answer. Probe every (start, target) pair — forward and backward — against an
// analytic oracle.
#[test]
fn seek_key_hint_is_direction_independent() {
use crate::trace::cursor::Cursor;
use crate::trace::Description;
use crate::trace::chunk::ChunkBatch;
use timely::progress::Antichain;

// One key per chunk (even keys 0, 2, .., 38) so seeks cross boundaries both ways.
let chunks: Vec<_> = (0..20u64).map(|k| chunk(vec![((2 * k, 0u64), 0u64, 1i64)])).collect();
let desc = Description::new(
Antichain::from_elem(0u64), Antichain::from_elem(1u64), Antichain::from_elem(0u64));
let batch = ChunkBatch::new(chunks, desc);

// First key `>= tgt`: the next even at or above `tgt`, or absent past the last (38).
let oracle = |tgt: u64| { let e = tgt + (tgt & 1); (e <= 38).then_some(e) };
for start in 0..=40u64 {
for tgt in 0..=40u64 {
let mut c = batch.cursor();
c.seek_key(&batch, &start);
c.seek_key(&batch, &tgt);
assert_eq!(c.get_key(&batch).copied(), oracle(tgt), "start={start} tgt={tgt}");
}
}
}

}
Loading