Summary
Scope a merge_insert operation's target scan to a subset of the target's fragments, so a distributed merge can split the target across tasks and read it once in total instead of re-scanning the whole target in every task.
Branch: feat/fragment-scoped-merge · Commit c8affcc7
Problem
A distributed merge_insert (as driven by lance-spark's LanceNativeMergeExec) hash-partitions the source by the merge key into N partitions and runs one uncommitted merge_insert per partition on a separate task. Each task calls create_plan, which today scans the entire target dataset:
// merge_insert.rs — create_plan (before)
let scan = session_ctx.read_lance_unordered(self.dataset.clone(), true, true)?;
So the aggregate target I/O for one distributed merge is O(N × target):
N tasks each stream the full target through the join probe side.
- Each concurrent scan holds Arrow/native buffers off the JVM heap.
At scale this is fatal, not just slow. On a 550M-row target with N = 256 source partitions we observed:
- The full-target scan per partition never finishes in reasonable time (~256 full scans serialized across the available slots).
- The off-heap scan buffers overflow the container memory limit → the executor pod is OOM-killed (SIGKILL on the cgroup, not a recoverable JVM
OutOfMemoryError) → the whole stage cascades and the job dies. This reproduced across four memory/partition configurations; tuning memory and partition count does not fix an O(N × target) algorithm.
A microbenchmark reproduces the pathology in miniature and shows the fix (see Benchmark below).
Design
Let each task scan only its slice of the target's fragments instead of the whole target. Split the target's fragments across the N tasks; each task merges the source against its own fragments. Aggregate target I/O becomes O(target) — the target is read once in total.
Why this is correct
Partitioning by target fragment is disjoint by construction: a task only ever reads (and, for matched rows, deletes) rows in its own fragments, so no two tasks can touch the same physical target row. That is a stronger invariant than the existing source-key partitioning that combine_merge_transactions already relies on, so the per-slice uncommitted transactions combine and commit unchanged.
Restriction: matched-only
A fragment slice cannot see the whole target, so it cannot correctly decide:
- inserts (
when_not_matched = InsertAll): a source key absent from my slice may still match a row in another task's fragment — treating it as an insert would duplicate the key.
- not-matched-by-source deletes (
when_not_matched_by_source != Keep): deciding a target row is unmatched requires the whole source and whole target view.
So target_fragments is restricted to the matched-only shape (when_matched = UpdateAll | UpdateIf | Delete, when_not_matched = DoNothing, when_not_matched_by_source = Keep) and this is enforced at try_build(). This shape is exactly a pure-update upsert, the common streaming-merge case. Inserts under a fragment-scoped merge are a separate follow-up (see below).
API
MergeInsertBuilder::try_new(dataset, vec!["id".into()])?
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::DoNothing) // matched-only
.try_target_fragments(vec![3, 7, 11]) // <-- new: scan only these fragments
.try_build()?
.execute_uncommitted(source) // combine + commit across tasks as usual
.await?;
Implementation
Small, layered, and additive (None = whole-dataset scan = today's behavior, unchanged):
MergeInsertParams.target_fragments: Option<Vec<u32>> + MergeInsertBuilder::try_target_fragments(...) builder + try_build() validation of the matched-only restriction.
LanceTableProvider gains an optional fragments field (new_with_fragments) that calls the existing scan.with_fragments(...).
SessionContextExt::read_lance_unordered_fragments(...) — fragment-scoped counterpart of read_lance_unordered.
create_plan uses the fragment-scoped scan when target_fragments is set, resolving ids → Fragment metadata (erroring on unknown ids), else scans the whole dataset as before.
// create_plan (after)
let scan = match &self.params.target_fragments {
Some(fragment_ids) => {
let fragments = /* resolve ids -> Fragment, error on unknown */;
session_ctx.read_lance_unordered_fragments(self.dataset.clone(), true, true, fragments)?
}
None => session_ctx.read_lance_unordered(self.dataset.clone(), true, true)?,
};
Tests
test_fragment_scoped_merge_equals_full_merge — builds a 2-fragment dataset, runs one matched-only merge per fragment slice against the whole source, combines + commits, and asserts the result is byte-identical to a single full matched-only merge. Also asserts each slice's transaction modified only its own fragment (proving the scan was actually scoped and the partitioning is disjoint).
test_fragment_scoped_merge_rejects_insert_shape — try_build() rejects target_fragments combined with the insert shape.
cargo test -p lance green; cargo fmt + cargo clippy clean.
Benchmark
rust/lance/benches/fragment_scoped_merge.rs — one round of 8 tasks running a matched-only merge against a 320K-row / 64-fragment target, single machine (isolates the target-scan cost, no cluster noise):
| mode |
time / round |
full_scan_per_task (each task scans the whole target — current behavior) |
16.03 ms |
fragment_scoped_per_task (each task scans only its fragment slice — this change) |
3.19 ms |
| speedup |
5.03× |
The speedup scales with N (number of tasks) — it is the O(N × target) → O(target) collapse. At the 550M-row / 256-partition scale that OOM-killed every run, the full-scan path was doing ~256 full-target scans; this change reduces that to a single pass over the target.
Follow-ups (out of scope here)
- Wire
target_fragments through JNI → Java (MergeInsertParams) → lance-spark LanceNativeMergeExec: change the exec from repartition source by key to enumerate target fragments on the driver and assign slices to tasks. Then re-run the 550M distributed merge, which should now complete and be fast.
- Inserts under fragment-scoped merge: run per-fragment passes matched-only, then perform inserts in one separate distributed anti-join (source minus full target). That anti-join scans the target's key column once, not
N times.
Summary
Scope a
merge_insertoperation's target scan to a subset of the target's fragments, so a distributed merge can split the target across tasks and read it once in total instead of re-scanning the whole target in every task.Branch:
feat/fragment-scoped-merge· Commitc8affcc7Problem
A distributed
merge_insert(as driven by lance-spark'sLanceNativeMergeExec) hash-partitions the source by the merge key intoNpartitions and runs one uncommittedmerge_insertper partition on a separate task. Each task callscreate_plan, which today scans the entire target dataset:So the aggregate target I/O for one distributed merge is
O(N × target):Ntasks each stream the full target through the join probe side.At scale this is fatal, not just slow. On a 550M-row target with
N = 256source partitions we observed:OutOfMemoryError) → the whole stage cascades and the job dies. This reproduced across four memory/partition configurations; tuning memory and partition count does not fix anO(N × target)algorithm.A microbenchmark reproduces the pathology in miniature and shows the fix (see Benchmark below).
Design
Let each task scan only its slice of the target's fragments instead of the whole target. Split the target's fragments across the
Ntasks; each task merges the source against its own fragments. Aggregate target I/O becomesO(target)— the target is read once in total.Why this is correct
Partitioning by target fragment is disjoint by construction: a task only ever reads (and, for matched rows, deletes) rows in its own fragments, so no two tasks can touch the same physical target row. That is a stronger invariant than the existing source-key partitioning that
combine_merge_transactionsalready relies on, so the per-slice uncommitted transactions combine and commit unchanged.Restriction: matched-only
A fragment slice cannot see the whole target, so it cannot correctly decide:
when_not_matched = InsertAll): a source key absent from my slice may still match a row in another task's fragment — treating it as an insert would duplicate the key.when_not_matched_by_source != Keep): deciding a target row is unmatched requires the whole source and whole target view.So
target_fragmentsis restricted to the matched-only shape (when_matched = UpdateAll | UpdateIf | Delete,when_not_matched = DoNothing,when_not_matched_by_source = Keep) and this is enforced attry_build(). This shape is exactly a pure-update upsert, the common streaming-merge case. Inserts under a fragment-scoped merge are a separate follow-up (see below).API
Implementation
Small, layered, and additive (
None= whole-dataset scan = today's behavior, unchanged):MergeInsertParams.target_fragments: Option<Vec<u32>>+MergeInsertBuilder::try_target_fragments(...)builder +try_build()validation of the matched-only restriction.LanceTableProvidergains an optionalfragmentsfield (new_with_fragments) that calls the existingscan.with_fragments(...).SessionContextExt::read_lance_unordered_fragments(...)— fragment-scoped counterpart ofread_lance_unordered.create_planuses the fragment-scoped scan whentarget_fragmentsis set, resolving ids →Fragmentmetadata (erroring on unknown ids), else scans the whole dataset as before.Tests
test_fragment_scoped_merge_equals_full_merge— builds a 2-fragment dataset, runs one matched-only merge per fragment slice against the whole source, combines + commits, and asserts the result is byte-identical to a single full matched-only merge. Also asserts each slice's transaction modified only its own fragment (proving the scan was actually scoped and the partitioning is disjoint).test_fragment_scoped_merge_rejects_insert_shape—try_build()rejectstarget_fragmentscombined with the insert shape.cargo test -p lancegreen;cargo fmt+cargo clippyclean.Benchmark
rust/lance/benches/fragment_scoped_merge.rs— one round of 8 tasks running a matched-only merge against a 320K-row / 64-fragment target, single machine (isolates the target-scan cost, no cluster noise):full_scan_per_task(each task scans the whole target — current behavior)fragment_scoped_per_task(each task scans only its fragment slice — this change)The speedup scales with
N(number of tasks) — it is theO(N × target) → O(target)collapse. At the 550M-row / 256-partition scale that OOM-killed every run, the full-scan path was doing ~256 full-target scans; this change reduces that to a single pass over the target.Follow-ups (out of scope here)
target_fragmentsthrough JNI → Java (MergeInsertParams) → lance-sparkLanceNativeMergeExec: change the exec from repartition source by key to enumerate target fragments on the driver and assign slices to tasks. Then re-run the 550M distributed merge, which should now complete and be fast.Ntimes.