Integer-proxy chunks: backend-agnostic join and reduce over (key_hash, value_id)#781
Open
frankmcsherry wants to merge 11 commits into
Open
Integer-proxy chunks: backend-agnostic join and reduce over (key_hash, value_id)#781frankmcsherry wants to merge 11 commits into
frankmcsherry wants to merge 11 commits into
Conversation
…, value_id) A boundary where only integers cross: a storage backend presents each record as ((key_hash, value_id), time, diff) — integer proxies for data it keeps in its own layout — the operators own all the lattice/time logic over those integers, and the backend supplies value semantics via callbacks. Any columnar (or otherwise opaque-to-DD) value store can then reuse join and reduce without materializing values. - trace/chunk/int_proxy: ProxyChunk, a cursor-less Chunk of proxy columns, with from_unsorted (integer sort+consolidate with representative provenance) as the presentation-building helper. - operators/int_proxy: ProxyJoinTactic / ProxyReduceTactic for the join_with_tactic and reduce_with_tactic seams (made pub here), and the backend traits: present-as-proxies (read), value callback with hash-minted output ids (write), materialize (egress). Reduce output ids are content hashes, so an output arrangement re-presents with the same ids downstream with no registry; pending interesting times are keyed by the stable key_hash across retires. The module doc carries the boundary contract and design notes (why value_id is not order-preserving; collision risk); each tactic and the in-memory reference backend (VecChunk arrangements, fnv hashes) sit in their own file under the module. - Tests: join and count/distinct/min reduces against the row operators over multi-round retracting inputs, and a scripted Product-time retire sequence exercising synthetic corrections and pending. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…e, not a requirement value_id never outlives one computation (a join unit's presentations; one reduce retire, whose materialize resolves ids to real data before anything leaves). The actual contract is a per-computation bijection with value equality, plus within-retire agreement between the output presentation and minted ids. Content hashing discharges all of it statelessly (the reference backend's choice); exact schemes — dense ordinals from grouping, a per-retire value→id map — are equally valid and collision-free. Only key_hash must be a stable pure function of the key (cross-retire pending, changed-key filter), making the key side the irreducible collision exposure. Persisting ids into an output arrangement itself would force stable value ids; this design does not. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…worse than the cursor tactics Two regressions found by asking exactly that question, both fixed: - The reference reduce backend implemented the changed-key restriction by scanning every batch and filtering — O(trace) per retire where the cursor tactic seeks, O(delta·log). Presents now seek the changed keys (novel keys resolve from this retire's delta-sized input batches; pending keys from the retire that pended them, which is exactly what the persistent hash→key map retains — pruned to the changed set each retire, so it is bounded by the delta, and the per-retire value map is cleared). - The join interface had no restriction at all, forcing ANY backend to present the entire accumulated side per fresh batch — O(trace·log) per unit where the cursor join seeks. present0/present1 now take an optional sorted key-hash filter; the tactic presents the fresh side first and passes its key set for the accumulated side, the join analogue of reduce's changed-key restriction. The check is mechanical, not wall-clock: counting backend wrappers measure presented records — with 20k arranged keys and five single-key rounds, reduce presents 37 records and join 10, where the scanning versions present Ω(rounds·N) (join: 100,005) and fail the gate. What remains above the cursor tactics is a log-factor sort of delta-sized presentations, and per-(key, wave) rescans of a changed key's presented range where the row replayer consolidates progressively — same worst-case order, different constants. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…cle fuzz The minimal harness for the framework, closed over itself: the tactics demand only BatchReader of the trace, and ProxyChunk is already a Chunk, so a batch of proxy chunks is the minimal arrangement — the proxy data IS the data, no separate chunk class needed. The identity backend makes values u64s with the identity as the id function: no hashing, no resolution machinery, no collision possibility, and materialize emits the proxy records verbatim (incidentally exercising ChunkBatch<ProxyChunk> as a real output batch). What remains under test is exactly the framework's own contribution — interesting-time discovery, desired-vs-current deltas, pending, held routing — fuzzed over Product-time grids: 300 random inputs retired through random diagonal frontiers (so synthetic joins arise inside and across intervals and must pend), driven through an emulation of the reduce driver protocol, and checked against a brute-force oracle at every grid point: the accumulated output must equal the reduction of the accumulated input, everywhere, for count/distinct/min-shaped reducers. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
An ignored benchmark (cargo test --release --test int_proxy -- --ignored --nocapture) compares full stacks — the stock row operators against chunk arrangements plus the proxy tactics over the reference backend — for a bulk load and a steady-state incremental phase (warmed past the post-load merge-amortization transient). The bench caught what the counting gates could not: the reference backend's hash→key map was pruned by retain (and the per-retire value map by clear), both of which keep the backing table — so after a million-key load, every retire walked a million-bucket table to visit one entry, ~130µs/round of pure capacity. shrink_to_fit after the prune and a fresh map per retire fix it. Steady-state single-key rounds after the fix, proxy vs row: reduce ~1.4x at every scale (flat from 10k to 1M keys — the delta- proportionality gates hold in wall clock too), join at parity below 1M (~1.0x). Bulk load carries the presentation layer's constant factor (per-record hashing, clones, by-hash sort): reduce 3-5x, join ~2x — the costs a columnar backend's bulk primitives are meant to attack. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The longer-term goal for the boundary is fewer crossings into the backend's value logic (interpreted or columnar execution pays per-call overhead): each crossing should carry a list of keys and a longer bracketed list of value entries. ProxyReduceBackend gains reduce_many(keys, ends, input) — group_offsets -shaped brackets, one per key, each non-empty — returning concatenated per-key outputs with their own bracket ends. A default implementation loops the per-key reduce, so simple backends implement only that; backends with bulk value logic override reduce_many. The tactic now calls only reduce_many: retire's key-major loop becomes two passes — derive each changed key's active times, group the work into waves by time, then play the waves in ascending order (Ord extends the partial order, so a key's earlier deltas always precede a later time's reads) with at most one callback per wave, batching every key active at that time. The identity backend overrides reduce_many (asserting the bracket protocol from the backend's seat), so the grid-oracle fuzz exercises the batched path; the reference backend uses the default, so the row-comparison tests cover the loop. All gates and benches unchanged. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The desired output at a (key, time) moment is a function of the key's input accumulation at that time alone — no output-side state — so no time ordering constrains the batch: every moment of the retire can share one reduce_many call, a key contributing one bracket per active time. The order-sensitive part (subtracting the current output, which includes deltas emitted at earlier moments) is pure proxy-space arithmetic and moves to a separate pass that plays the moments in ascending time order. The bracket, not the key, is reduce_many's unit: keys may repeat. Contract docs updated accordingly. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…linear The row suite's reduce_scaling/join_scaling shapes (one key, many distinct times, one batch) exposed quadratic behavior in both proxy tactics: reduce rescanned a key's full presented range per interesting time (and its interesting-time closure joined all pairs), and join cross-produced full matched histories pairwise. Measured: reduce 6.4s at scale 10k, 4.7x per doubling; join 9.8s at 10k, >90s at 20k. The robust versions are the cursor variants with integers in place of keys and values, as intended: - history.rs: IdHistory, the id-space ValueHistory — (value_id, time, diff) edits replayed in ascending time order into a buffer repeatedly advanced by the meet of the times still to come and consolidated. The advancement is the collapse that keeps a key with many distinct times linear: accumulations read the small buffer, never the raw history. The presentations serve as the fused per-key load. - reduce: discover_and_accumulate ports history_replay::compute — lazy interesting-time discovery (novel and pending seed; synthetics from joins with the advanced batch buffer and times_current) replaces the eager join-closure, and per-moment accumulation reads the advanced buffers. Phase B replays the output side per key over the discovered moments with the same machinery (suffix meets; emitted deltas advanced and consolidated). Still one batched reduce_many crossing per retire. - join: join_key ports JoinThinker::think — each side's edits replayed against the other side's advanced buffer (identical emitted times: t0 ∨ (t1 ∨ meet) = t0 ∨ t1), with the dead-simple cross product kept for small histories. proxy_reduce_scaling / proxy_join_scaling (scale 100k, the row tests' shapes) now pin this; scale 10k dropped from seconds to milliseconds and growth is ~4x per 4x scale. The grid-oracle fuzz over partially ordered times, the scripted pending test, the row comparisons, the delta-proportionality gates, and the steady-state bench all pass unchanged. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The row-vs-proxy comparison on the scaling shapes showed proxy reduce still superlinear (9x per 4x scale; timeout at 4M where row takes 0.5s). Profiling pinned it outside the tactic: the reference backend's materialize built ONE giant VecChunk and let the builder settle it, and settle's split path peels TARGET-sized pieces off the front with split_off, copying the remaining tail each iteration — O(m²/TARGET) in the batch size. Feeding the builder TARGET-sized chunks directly fixes it: 4M drops from >120s to 3.2s. With that, both operators are in the row implementations' complexity class on the scaling shapes (growth ~5x per 4x scale ≈ n·log n; the hot frames are the presentation sort and fnv hashing — constants, not structure): at scale 4M, join row 0.96s / proxy 3.6s, reduce row 0.48s / proxy 3.2s. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
frankmcsherry
added a commit
to frankmcsherry/differential-dataflow
that referenced
this pull request
Jul 3, 2026
…flow#781) consumption plan Branch rebased onto int-proxy: the framework the SPIKE's boundary model called for now exists upstream (renamed ids -> int_proxy in review); the old in-branch copy is dropped. The hand-off section documents the current interfaces, the one-crossing-per-retire reduce_many contract, the assessment harness to replicate, the reduce-first plan, the traps already hit once (scan-presents, capacity leaks, giant-chunk settle), and the suitability questions the corgi agent should answer. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ged view Field report from the corgi porting session (confirmed against Model.lean): reduce's Step-1 seed set was flawed. discover_and_accumulate recovered the batch by time-thresholding the CONSOLIDATED present_input (history ⊎ novel, net-zero records dropped). Legal compaction can advance a stored +1 onto a novel −1's exact (value, time); they cancel in that merged view, so the time drops out of the seeds — but it is in the batch's own support b.support, and a standing output change is still owed there. That is Model.lean's scenario1_cancels (the SCC deadlock) verbatim: int_proxy computed a strictly smaller seedSet than the model's seedSet = b.support ∪ pending. Fix (model-prescribed): seed from the batch's own support. The backend trait gains present_novel(novel) — the freshly arrived batches presented ALONE, before any merge with stored history — replacing key_hashes (which present_novel subsumes: its key set is the changed keys). The tactic seeds discovery and the synthetic-join closure from this run; the merged present_input is replayed only for ACCUMULATION (compaction there is accumulation-preserving, so consolidation is fine). The two are the row tactic's separate batch_cursor vs source-history split, in id space. Regression test proxy_reduce_seeds_survive_compaction_cancellation pins the scenario directly (failed before, passes after). The Product-grid oracle fuzz now also runs a COMPACTION ADVERSARY on alternate iterations (advance accumulated batches to the interval lower + reconsolidate before each retire — the model's acc_mapDomain), so the whole 300-case space is checked under the exact move that triggered the bug. Reported-by: corgi porting session Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The prior fix's present_novel did a full value-present of the delta (hash keys AND values, from_unsorted sort+consolidate) purely to seed interesting times, duplicating the value work present_input already does on novel — a redundancy the corgi porting session flagged. But seeds need far less. Two facts: interesting-time over-derivation is sound (a non-changing seed yields a zero delta and is discarded), so the seed set may be any superset of b.support; and the tactic reads only TIMES from the seed source (batch_replay's value_ids and diffs were never touched). So the batch's raw (key_hash, time) support — a superset of b.support, no value consolidation — is exactly enough. Zeroing value_id in a ProxyChunk would NOT work: from_unsorted then consolidates distinct values at one time into a dropped zero, reintroducing the seed- cancellation bug. The seed run must be raw and value-free. - Trait: present_novel becomes seed_times(&self, novel) -> Vec<(u64, T)>. It hashes keys only (no value hashing, no value_id, no value-sort, no consolidation), returns one (key_hash, time) per record sorted by key_hash, and is &self (stateless — seeds store no alignment). A full value-present of the delta happens once, in present_input. - history.rs: TimeHistory, the times-only replay (ValueHistory with no values/diffs) — same meet-advancement, keeping the *_scaling shapes linear. batch_replay is now a TimeHistory. - This resolves the redundancy without the fragile call-order caching contract: the seed call is intrinsically cheap, no backend caches. Load improves (n=10k reduce 1.67x to 1.22x row; n=1M ~4.4 to ~3.9); steady state and all correctness unchanged: compaction regression, compaction-adversary grid fuzz, scaling shapes, row comparisons, and delta-proportionality gates all green. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
A boundary where only integers cross: a storage backend presents each record as
((key_hash, value_id), time, diff)— integer proxies for data it keeps in its own layout — the operators own all the lattice/time logic over those integers, and the backend supplies value semantics via callbacks. Any columnar (or otherwise opaque-to-DD) value store can then reuse join and reduce without materializing values as Rust types.The two integers carry different contracts.
key_hashis a content hash of the key, stable across the whole system with no registry: the same key hashes identically in every operator, including across the output→input boundary (a reduce output re-ingested downstream).value_idis an intra-key identifier, consistent within one operator computation — equal ids mean equal values there, which is all that consolidation and presence need. Output ids are minted by hashing the produced value, which is what lets a reduce output become a real arrangement whose values present with the same ids downstream. Hash collisions are an accepted risk (birthday bound; the module doc quantifies it, and the upgrade path is a wider id, never a registry).Contents:
trace/chunk/int_proxy:ProxyChunk, a cursor-lessChunkof proxy columns (slots into the Chunk navigation capability #778 split; batches, fueled merging, and grading come free, entirely in integer space), plusfrom_unsorted— integer sort+consolidate with representative provenance — as the helper a backend uses to build presentations.operators/int_proxy:ProxyJoinTactic/ProxyReduceTacticfor thejoin_with_tacticandreduce_with_tacticseams (madepubhere — these tactics are the first out-of-crate-shaped consumers), and the backend traits: present-as-proxies (read), value callback with hash-minted output ids (write), materialize (egress). The reduce tactic keeps its cross-retirependingkeyed by the stablekey_hash, and restricts presentations to changed keys so incremental cost tracks the delta, not the accumulation. The module doc carries the design notes, including whyvalue_idis deliberately not order-preserving (min/max belong to the value callback).operators/int_proxy/reference: an in-memory backend overVecChunkarrangements with fnv content hashes, so the framework is exercised without any columnar engine.Product-time retire sequence exercising synthetic time corrections both within an interval and pended across retires.🤖 Generated with Claude Code