Skip to content

feat(merge): fragment-scoped merge_insert target scan (read target once in distributed merge) #6

Description

@sezruby

Summary

Scope a merge_insert operation's target scan to a subset of the target's fragments, so a distributed merge can split the target across tasks and read it once in total instead of re-scanning the whole target in every task.

Branch: feat/fragment-scoped-merge · Commit c8affcc7

Problem

A distributed merge_insert (as driven by lance-spark's LanceNativeMergeExec) hash-partitions the source by the merge key into N partitions and runs one uncommitted merge_insert per partition on a separate task. Each task calls create_plan, which today scans the entire target dataset:

// merge_insert.rs — create_plan (before)
let scan = session_ctx.read_lance_unordered(self.dataset.clone(), true, true)?;

So the aggregate target I/O for one distributed merge is O(N × target):

  • N tasks each stream the full target through the join probe side.
  • Each concurrent scan holds Arrow/native buffers off the JVM heap.

At scale this is fatal, not just slow. On a 550M-row target with N = 256 source partitions we observed:

  • The full-target scan per partition never finishes in reasonable time (~256 full scans serialized across the available slots).
  • The off-heap scan buffers overflow the container memory limit → the executor pod is OOM-killed (SIGKILL on the cgroup, not a recoverable JVM OutOfMemoryError) → the whole stage cascades and the job dies. This reproduced across four memory/partition configurations; tuning memory and partition count does not fix an O(N × target) algorithm.

A microbenchmark reproduces the pathology in miniature and shows the fix (see Benchmark below).

Design

Let each task scan only its slice of the target's fragments instead of the whole target. Split the target's fragments across the N tasks; each task merges the source against its own fragments. Aggregate target I/O becomes O(target) — the target is read once in total.

Why this is correct

Partitioning by target fragment is disjoint by construction: a task only ever reads (and, for matched rows, deletes) rows in its own fragments, so no two tasks can touch the same physical target row. That is a stronger invariant than the existing source-key partitioning that combine_merge_transactions already relies on, so the per-slice uncommitted transactions combine and commit unchanged.

Restriction: matched-only

A fragment slice cannot see the whole target, so it cannot correctly decide:

  • inserts (when_not_matched = InsertAll): a source key absent from my slice may still match a row in another task's fragment — treating it as an insert would duplicate the key.
  • not-matched-by-source deletes (when_not_matched_by_source != Keep): deciding a target row is unmatched requires the whole source and whole target view.

So target_fragments is restricted to the matched-only shape (when_matched = UpdateAll | UpdateIf | Delete, when_not_matched = DoNothing, when_not_matched_by_source = Keep) and this is enforced at try_build(). This shape is exactly a pure-update upsert, the common streaming-merge case. Inserts under a fragment-scoped merge are a separate follow-up (see below).

API

MergeInsertBuilder::try_new(dataset, vec!["id".into()])?
    .when_matched(WhenMatched::UpdateAll)
    .when_not_matched(WhenNotMatched::DoNothing)   // matched-only
    .try_target_fragments(vec![3, 7, 11])          // <-- new: scan only these fragments
    .try_build()?
    .execute_uncommitted(source)                   // combine + commit across tasks as usual
    .await?;

Implementation

Small, layered, and additive (None = whole-dataset scan = today's behavior, unchanged):

  1. MergeInsertParams.target_fragments: Option<Vec<u32>> + MergeInsertBuilder::try_target_fragments(...) builder + try_build() validation of the matched-only restriction.
  2. LanceTableProvider gains an optional fragments field (new_with_fragments) that calls the existing scan.with_fragments(...).
  3. SessionContextExt::read_lance_unordered_fragments(...) — fragment-scoped counterpart of read_lance_unordered.
  4. create_plan uses the fragment-scoped scan when target_fragments is set, resolving ids → Fragment metadata (erroring on unknown ids), else scans the whole dataset as before.
// create_plan (after)
let scan = match &self.params.target_fragments {
    Some(fragment_ids) => {
        let fragments = /* resolve ids -> Fragment, error on unknown */;
        session_ctx.read_lance_unordered_fragments(self.dataset.clone(), true, true, fragments)?
    }
    None => session_ctx.read_lance_unordered(self.dataset.clone(), true, true)?,
};

Tests

  • test_fragment_scoped_merge_equals_full_merge — builds a 2-fragment dataset, runs one matched-only merge per fragment slice against the whole source, combines + commits, and asserts the result is byte-identical to a single full matched-only merge. Also asserts each slice's transaction modified only its own fragment (proving the scan was actually scoped and the partitioning is disjoint).
  • test_fragment_scoped_merge_rejects_insert_shapetry_build() rejects target_fragments combined with the insert shape.

cargo test -p lance green; cargo fmt + cargo clippy clean.

Benchmark

rust/lance/benches/fragment_scoped_merge.rs — one round of 8 tasks running a matched-only merge against a 320K-row / 64-fragment target, single machine (isolates the target-scan cost, no cluster noise):

mode time / round
full_scan_per_task (each task scans the whole target — current behavior) 16.03 ms
fragment_scoped_per_task (each task scans only its fragment slice — this change) 3.19 ms
speedup 5.03×

The speedup scales with N (number of tasks) — it is the O(N × target) → O(target) collapse. At the 550M-row / 256-partition scale that OOM-killed every run, the full-scan path was doing ~256 full-target scans; this change reduces that to a single pass over the target.

Follow-ups (out of scope here)

  • Wire target_fragments through JNI → Java (MergeInsertParams) → lance-spark LanceNativeMergeExec: change the exec from repartition source by key to enumerate target fragments on the driver and assign slices to tasks. Then re-run the 550M distributed merge, which should now complete and be fast.
  • Inserts under fragment-scoped merge: run per-fragment passes matched-only, then perform inserts in one separate distributed anti-join (source minus full target). That anti-join scans the target's key column once, not N times.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions