diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 310c0d62005..f337c5bae12 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -4641,9 +4641,10 @@ def commit_batch( file metadata cache. transactions: Iterable[Transaction] The transactions to apply to the dataset. These will be merged into - a single transaction and applied to the dataset. Note: Only append - transactions are currently supported. Other transaction types will be - supported in the future. + a single transaction and applied to the dataset. All transactions + must be the same kind of operation: either all appends, or all + deletes (the driver side of a distributed / parallel delete). Other + transaction types are not yet supported. commit_lock : CommitLock, optional A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details. diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 36ea5facc29..f7a60bf2a37 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -191,6 +191,10 @@ harness = false name = "merge_insert" harness = false +[[bench]] +name = "fragment_scoped_delete" +harness = false + [[bench]] name = "scan" harness = false diff --git a/rust/lance/benches/fragment_scoped_delete.rs b/rust/lance/benches/fragment_scoped_delete.rs new file mode 100644 index 00000000000..3ac3878981f --- /dev/null +++ b/rust/lance/benches/fragment_scoped_delete.rs @@ -0,0 +1,186 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors +// +//! Benchmark validating the fragment-scoped delete optimization. +//! +//! A distributed / parallel delete splits the target's fragments across N tasks +//! and each task runs an uncommitted delete of a shared predicate. Without +//! scoping, every task would scan the WHOLE target to find matching rows, so the +//! aggregate target I/O is `O(N * target)` — the pattern that OOMs and stalls a +//! large distributed delete. With `DeleteBuilder::with_target_fragments`, each +//! task scans only its own slice of the target's fragments, so the aggregate +//! target I/O is `O(target)` — the target is read once in total instead of N +//! times. +//! +//! This bench measures ONE full round (all N tasks) of the same predicate delete +//! in two modes, on a single machine so the difference isolates the target-scan +//! cost (no Spark / cluster noise): +//! +//! - `full_scan_per_task`: each task deletes over the whole target (the naive +//! distributed shape). Aggregate scan cost grows with N. +//! - `fragment_scoped_per_task`: each task deletes only within its assigned +//! fragment slice (the optimization). Aggregate scan cost is ~constant in N. +//! +//! The per-task transactions are NOT committed here — we measure the scan + +//! deletion-application work, which is where the `O(N * target)` blowup lives. +//! +//! Caveat: a single-node delete already parallelizes its own scan across cores +//! (DataFusion `target_partitions = num_cpus`), so the honest single-node story +//! is the aggregate work fanned out across N tasks plus the distributed +//! `O(N * target) -> O(target)` collapse, not a raw per-scan speedup. Framing the +//! bench as "aggregate work across N tasks" matches the distributed model this +//! optimization targets. +//! +//! Run with `cargo bench --bench fragment_scoped_delete`. + +use std::sync::Arc; + +use arrow_array::{Int64Array, RecordBatch, RecordBatchIterator}; +use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; +use futures::future::try_join_all; +use lance::dataset::write::DeleteBuilder; +use lance::dataset::{Dataset, WriteMode, WriteParams}; +use lance_core::utils::tempfile::TempStrDir; +#[cfg(target_os = "linux")] +use lance_testing::pprof::{Output, PProfProfiler}; + +// A wide-ish target split into many fragments so the per-task scan cost is +// meaningful and the N-way fan-out is visible. ROWS_PER_FRAG * NUM_FRAGS rows. +const ROWS_PER_FRAG: u64 = 5_000; +const NUM_FRAGS: u64 = 64; +// Number of distributed tasks the fragments are split across (== whole-target +// scans in the full-scan mode; == fragment slices in the scoped mode). +const NUM_TASKS: usize = 8; + +fn schema() -> Arc { + Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("value", DataType::Int64, false), + ])) +} + +fn make_batch(start_id: i64, n: usize) -> RecordBatch { + let ids = Int64Array::from_iter_values(start_id..start_id + n as i64); + let vals = Int64Array::from_iter_values((start_id..start_id + n as i64).map(|v| v * 10)); + RecordBatch::try_new(schema(), vec![Arc::new(ids), Arc::new(vals)]).unwrap() +} + +fn make_batches(total_rows: u64) -> Vec { + let mut out = Vec::new(); + let mut remaining = total_rows; + let mut next_start = 0i64; + while remaining > 0 { + let n = remaining.min(ROWS_PER_FRAG) as usize; + out.push(make_batch(next_start, n)); + next_start += n as i64; + remaining -= n as u64; + } + out +} + +async fn build_base(path: &str) -> Dataset { + let total = ROWS_PER_FRAG * NUM_FRAGS; + let params = WriteParams { + max_rows_per_file: ROWS_PER_FRAG as usize, + max_rows_per_group: ROWS_PER_FRAG as usize, + mode: WriteMode::Create, + ..Default::default() + }; + let reader = RecordBatchIterator::new(make_batches(total).into_iter().map(Ok), schema()); + Dataset::write(reader, path, Some(params)).await.unwrap(); + Dataset::open(path).await.unwrap() +} + +// A predicate that matches ~10% of rows in every fragment, so both modes do +// real deletion work and every fragment contributes. +const PREDICATE: &str = "id % 10 = 0"; + +/// One task's uncommitted delete scanning the WHOLE target. +async fn full_scan_task(ds: Arc) { + let _ = DeleteBuilder::new(ds, PREDICATE) + .execute_uncommitted() + .await + .unwrap(); +} + +/// One task's uncommitted delete scanning ONLY `fragment_ids`. +async fn fragment_scoped_task(ds: Arc, fragment_ids: Vec) { + let _ = DeleteBuilder::new(ds, PREDICATE) + .with_target_fragments(fragment_ids) + .execute_uncommitted() + .await + .unwrap(); +} + +/// Assign the target's fragments round-robin to NUM_TASKS disjoint slices. +fn fragment_slices(ds: &Dataset) -> Vec> { + let mut slices = vec![Vec::new(); NUM_TASKS]; + for (i, frag) in ds.get_fragments().iter().enumerate() { + slices[i % NUM_TASKS].push(frag.id() as u32); + } + slices +} + +/// Build a fresh target in its own temp dir. Returned as the (untimed) per- +/// iteration setup so each timed round starts from a clean store: an +/// uncommitted delete still WRITES per-fragment deletion files to storage, and +/// reusing one dataset would let those files accumulate across iterations and +/// bias the full-scan mode (which writes NUM_TASKS× as many) more than the +/// scoped mode. The TempStrDir is returned alongside the dataset so it stays +/// alive during the timed round and is dropped (cleaning the files) after it. +fn fresh_target(rt: &tokio::runtime::Runtime) -> (TempStrDir, Arc, Vec>) { + let dir = TempStrDir::default(); + let ds = Arc::new(rt.block_on(build_base(dir.as_str()))); + let slices = fragment_slices(&ds); + (dir, ds, slices) +} + +fn bench_fragment_scoped_delete(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + // Baseline: every task scans the whole target → O(NUM_TASKS * target). + c.bench_function("fragment_scoped_delete/full_scan_per_task", |b| { + b.iter_batched( + || fresh_target(&rt), + |(_dir, ds, _slices)| { + rt.block_on(async { + let futs = (0..NUM_TASKS).map(|_| full_scan_task(ds.clone())); + try_join_all(futs.map(tokio::spawn)).await.unwrap(); + }) + }, + BatchSize::PerIteration, + ) + }); + + // Optimization: each task scans only its fragment slice → O(target) total. + c.bench_function("fragment_scoped_delete/fragment_scoped_per_task", |b| { + b.iter_batched( + || fresh_target(&rt), + |(_dir, ds, slices)| { + rt.block_on(async { + let futs = slices + .iter() + .map(|slice| fragment_scoped_task(ds.clone(), slice.clone())); + try_join_all(futs.map(tokio::spawn)).await.unwrap(); + }) + }, + BatchSize::PerIteration, + ) + }); +} + +#[cfg(target_os = "linux")] +criterion_group!( + name = benches; + config = Criterion::default().significance_level(0.1).sample_size(10) + .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_fragment_scoped_delete); + +#[cfg(not(target_os = "linux"))] +criterion_group!( + name = benches; + config = Criterion::default().significance_level(0.1).sample_size(10); + targets = bench_fragment_scoped_delete); + +criterion_main!(benches); diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index a0474320d5c..48192143ef7 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -64,7 +64,9 @@ pub mod update; pub use super::progress::{WriteProgressFn, WriteStats}; pub use commit::{CommitBuilder, DEFAULT_COMMIT_TIMEOUT}; -pub use delete::{DeleteBuilder, DeleteResult, UncommittedDelete}; +pub use delete::{ + CombinedDelete, DeleteBuilder, DeleteResult, UncommittedDelete, combine_delete_transactions, +}; pub use insert::InsertBuilder; /// The destination to write data to. diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 0461386e7d7..f0c66e69471 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -25,6 +25,7 @@ use crate::{ session::Session, }; +use super::delete::combine_delete_transactions; use super::{WriteDestination, resolve_commit_handler}; use crate::dataset::branch_location::BranchLocation; use crate::dataset::transaction::validate_operation; @@ -469,25 +470,82 @@ impl<'a> CommitBuilder<'a> { /// Commit a set of transactions as a single new version. /// - ///
- /// Only works for append transactions right now. Other kinds of transactions - /// will be supported in the future. - ///
+ /// All transactions must be the same kind of operation. Two kinds are + /// currently supported: + /// + /// - **Append**: the new fragments are concatenated (trivially + /// conflict-free). + /// - **Delete**: the per-transaction [`Operation::Delete`]s are combined via + /// [`combine_delete_transactions`] into one delete. This is the driver + /// side of a distributed / parallel delete, where each task produced a + /// fragment-scoped delete over a disjoint slice + /// (see [`super::DeleteBuilder::with_target_fragments`]). The combined + /// delete carries reconstructed `affected_rows`, so it rebases against a + /// concurrent writer at row granularity just like a single delete. + /// + /// A batch mixing operation kinds, or containing any other kind, is + /// rejected. + /// + /// The configured [`Self::with_timeout`] bounds the entire call, including + /// the delete path's dataset resolution and deletion-file reads. pub async fn execute_batch(self, transactions: Vec) -> Result { if transactions.is_empty() { return Err(Error::invalid_input_source( "No transactions to commit".into(), )); } + let timeout = self.timeout; + if let Some(t) = timeout + && t.is_zero() + { + return Err(Error::invalid_input( + "CommitBuilder timeout must be non-zero; pass `None` to disable", + )); + } + // Box so wrapping in `tokio::time::Timeout` does not deepen the future + // type (see `execute`). The timeout covers the whole batch, unlike a + // per-`execute` timeout that would miss the delete path's pre-commit I/O. + let fut = Box::pin(self.execute_batch_inner(transactions)); + match timeout { + Some(t) => match tokio::time::timeout(t, fut).await { + Ok(res) => res, + Err(_) => Err(Error::timeout(format!( + "Commit timed out after {:?}. Increase the timeout via \ + CommitBuilder::with_timeout or pass `None` to disable.", + t + ))), + }, + None => fut.await, + } + } + + async fn execute_batch_inner( + self, + transactions: Vec, + ) -> Result { if transactions .iter() - .any(|t| !matches!(t.operation, Operation::Append { .. })) + .all(|t| matches!(t.operation, Operation::Append { .. })) { - return Err(Error::not_supported_source( - "Only append transactions are supported in batch commits".into(), - )); + self.execute_batch_append(transactions).await + } else if transactions + .iter() + .all(|t| matches!(t.operation, Operation::Delete { .. })) + { + self.execute_batch_delete(transactions).await + } else { + Err(Error::not_supported_source( + "Only batches of all-append or all-delete transactions are supported in batch \ + commits" + .into(), + )) } + } + async fn execute_batch_append( + self, + transactions: Vec, + ) -> Result { let read_version = transactions.iter().map(|t| t.read_version).min().unwrap(); let merged = Transaction { @@ -506,9 +564,58 @@ impl<'a> CommitBuilder<'a> { //TODO: handle batch transaction merges in the future transaction_properties: None, }; - let dataset = self.execute(merged.clone()).await?; + // execute_inner (not execute): the timeout is applied once by + // execute_batch around the whole call. + let dataset = self.execute_inner(merged.clone()).await?; Ok(BatchCommitResult { dataset, merged }) } + + async fn execute_batch_delete( + mut self, + transactions: Vec, + ) -> Result { + // combine_delete_transactions reads deletion files, so it needs a + // concrete dataset handle. Resolve one from the destination (opening the + // URI if necessary). + let dataset = self.resolve_dataset().await?; + let combined = combine_delete_transactions(dataset.as_ref(), transactions).await?; + let merged = combined.transaction; + // Carry the reconstructed affected rows so a concurrent writer is rebased + // at row granularity, matching a single DeleteBuilder::execute. + self.affected_rows = Some(combined.affected_rows); + // execute_inner (not execute): the timeout is applied once by + // execute_batch around the whole call. + let committed = self.execute_inner(merged.clone()).await?; + Ok(BatchCommitResult { + dataset: committed, + merged, + }) + } + + /// Resolve the destination to a concrete [`Dataset`], opening it from a URI + /// when the builder was constructed from a path rather than a dataset. + async fn resolve_dataset(&self) -> Result> { + match &self.dest { + WriteDestination::Dataset(dataset) => Ok(dataset.clone()), + WriteDestination::Uri(uri) => { + let session = self + .session + .clone() + .or_else(|| self.dest.dataset().map(|ds| ds.session.clone())) + .unwrap_or_default(); + let dataset = DatasetBuilder::from_uri(uri) + .with_read_params(ReadParams { + store_options: self.store_params.clone(), + commit_handler: self.commit_handler.clone(), + ..Default::default() + }) + .with_session(session) + .load() + .await?; + Ok(Arc::new(dataset)) + } + } + } } pub struct BatchCommitResult { @@ -880,12 +987,35 @@ mod tests { .await .unwrap(); - let res = CommitBuilder::new(Arc::new(dataset)) + let dataset = Arc::new(dataset); + let res = CommitBuilder::new(dataset.clone()) .with_timeout(Some(Duration::from_millis(50))) .execute_batch(vec![sample_transaction(1)]) .await; let Err(err) = res else { - panic!("commit should time out"); + panic!("append batch commit should time out"); + }; + assert!(matches!(&err, Error::Timeout { .. }), "got {err:?}"); + + // The delete path resolves a dataset and reads deletion files before + // committing; the timeout must cover that pre-commit work too, not just + // execute(). A delete transaction against the throttled store should + // still surface a Timeout rather than hanging. + let delete_txn = Transaction::new( + dataset.manifest.version, + Operation::Delete { + updated_fragments: Vec::new(), + deleted_fragment_ids: Vec::new(), + predicate: "i < 5".to_string(), + }, + None, + ); + let res = CommitBuilder::new(dataset) + .with_timeout(Some(Duration::from_millis(50))) + .execute_batch(vec![delete_txn]) + .await; + let Err(err) = res else { + panic!("delete batch commit should time out"); }; assert!(matches!(&err, Error::Timeout { .. }), "got {err:?}"); } diff --git a/rust/lance/src/dataset/write/delete.rs b/rust/lance/src/dataset/write/delete.rs index a063d28ad7b..a8b2b71b101 100644 --- a/rust/lance/src/dataset/write/delete.rs +++ b/rust/lance/src/dataset/write/delete.rs @@ -15,7 +15,7 @@ use lance_core::{Error, ROW_ID, Result}; use lance_select::RowAddrTreeMap; use lance_table::format::Fragment; use roaring::RoaringTreemap; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use std::time::Duration; @@ -123,6 +123,7 @@ pub struct DeleteBuilder { filter: ExprFilter, conflict_retries: u32, retry_timeout: Duration, + target_fragments: Option>, } impl DeleteBuilder { @@ -133,6 +134,7 @@ impl DeleteBuilder { filter: ExprFilter::Sql(predicate.into()), conflict_retries: 10, retry_timeout: Duration::from_secs(30), + target_fragments: None, } } @@ -143,6 +145,7 @@ impl DeleteBuilder { filter: ExprFilter::Datafusion(expr), conflict_retries: 10, retry_timeout: Duration::from_secs(30), + target_fragments: None, } } @@ -158,11 +161,48 @@ impl DeleteBuilder { self } + /// Restrict the delete to a subset of the dataset's fragments. + /// + /// The predicate is evaluated only against rows living in the given + /// fragments, and the resulting transaction only tombstones rows in those + /// fragments. This is the producer side of a distributed / parallel delete: + /// partition the target's fragments into disjoint slices, run one + /// [`Self::execute_uncommitted`] per slice (in parallel), then commit all of + /// the resulting transactions together with + /// [`CommitBuilder::execute_batch`]. Because the slices are disjoint, no two + /// tasks can tombstone the same physical row, so the batch commit is + /// conflict-free and equivalent to a single full delete with the same + /// predicate over the whole dataset. + /// + /// Unknown fragment ids are rejected when the delete executes. + /// + /// # Example + /// + /// ```rust + /// use lance::dataset::DeleteBuilder; + /// + /// # use std::sync::Arc; + /// # use lance::Result; + /// # use lance::dataset::Dataset; + /// # async fn example(dataset: Arc) -> Result<()> { + /// let staged_delete = DeleteBuilder::new(dataset, "age > 65") + /// .with_target_fragments(vec![0, 1]) + /// .execute_uncommitted() + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn with_target_fragments(mut self, fragment_ids: Vec) -> Self { + self.target_fragments = Some(fragment_ids); + self + } + /// Execute the delete operation pub async fn execute(self) -> Result { let job = DeleteJob { dataset: self.dataset.clone(), filter: self.filter, + target_fragments: self.target_fragments, }; let config = RetryConfig { @@ -203,6 +243,7 @@ impl DeleteBuilder { let job = DeleteJob { dataset: self.dataset, filter: self.filter, + target_fragments: self.target_fragments, }; let data = job.execute_impl().await?; let DeleteData { @@ -229,6 +270,8 @@ impl DeleteBuilder { struct DeleteJob { dataset: Arc, filter: ExprFilter, + /// When set, restrict the delete to this subset of the dataset's fragment ids. + target_fragments: Option>, } /// Data returned by delete operation @@ -240,6 +283,35 @@ struct DeleteData { } impl DeleteJob { + /// Resolve `target_fragments` ids into the dataset's [`Fragment`] metadata, + /// erroring on any id that does not exist or is repeated. Returns `None` for + /// an unscoped (whole-dataset) delete. + fn resolve_target_fragments(&self) -> Result>> { + let Some(fragment_ids) = &self.target_fragments else { + return Ok(None); + }; + let by_id: BTreeMap = + self.dataset.fragments().iter().map(|f| (f.id, f)).collect(); + let mut seen: BTreeSet = BTreeSet::new(); + let fragments = fragment_ids + .iter() + .map(|id| { + if !seen.insert(*id) { + return Err(Error::invalid_input(format!( + "target_fragments contains duplicate fragment id {id}" + ))); + } + by_id.get(&(*id as u64)).map(|f| (*f).clone()).ok_or_else(|| { + Error::invalid_input(format!( + "target_fragments references fragment id {} which does not exist in the dataset", + id + )) + }) + }) + .collect::>>()?; + Ok(Some(fragments)) + } + fn build_transaction( &self, dataset: &Dataset, @@ -267,9 +339,18 @@ impl RetryExecutor for DeleteJob { type Result = DeleteResult; async fn execute_impl(&self) -> Result { - // Create a single scanner for the entire dataset + // Resolve the fragment slice up front (if any). A scoped delete only reads + // and tombstones rows in these fragments, so the resulting transaction is + // disjoint from any other slice's — enabling a distributed delete to be + // batch-committed conflict-free (see `combine_delete_transactions`). + let target_fragments = self.resolve_target_fragments()?; + + // Create a scanner over the whole dataset, or just the target slice. let mut scanner = self.dataset.scan(); scanner.with_row_id().project(&[ROW_ID])?; + if let Some(fragments) = &target_fragments { + scanner.with_fragments(fragments.clone()); + } match &self.filter { ExprFilter::Sql(s) => { scanner.filter(s)?; @@ -295,13 +376,23 @@ impl RetryExecutor for DeleteJob { filter_expr, Expr::Literal(ScalarValue::Boolean(Some(true)), _) ) { - // Predicate evaluated to true - delete all fragments - let fragments = self.dataset.get_fragments(); + // Predicate evaluated to true - delete every targeted fragment. + // When scoped, only the slice's fragments are removed so the + // transaction stays disjoint from other slices. + let fragments = match &target_fragments { + Some(fragments) => fragments.clone(), + None => self + .dataset + .get_fragments() + .into_iter() + .map(|f| f.metadata) + .collect(), + }; let num_deleted_rows: u64 = fragments .iter() - .map(|f| f.metadata.num_rows().unwrap_or(0) as u64) + .map(|f| f.num_rows().unwrap_or(0) as u64) .sum(); - let deleted_fragment_ids = fragments.iter().map(|f| f.id() as u64).collect(); + let deleted_fragment_ids = fragments.iter().map(|f| f.id).collect(); // When deleting everything, we don't have specific row addresses, // so better not to emit affected rows. @@ -391,6 +482,223 @@ pub async fn delete(ds: &mut Dataset, predicate: &str) -> Result { Ok(result) } +/// A combined delete produced by [`combine_delete_transactions`], ready to +/// commit with [`CommitBuilder`]. +#[derive(Debug, Clone)] +pub struct CombinedDelete { + /// The single [`Operation::Delete`] transaction covering every slice. + pub transaction: Transaction, + /// The row addresses newly tombstoned across all slices, relative to the + /// shared `read_version`. Pass to [`CommitBuilder::with_affected_rows`] so a + /// concurrent writer can be rebased against at row granularity — matching a + /// single [`DeleteBuilder::execute`]. + pub affected_rows: RowAddrTreeMap, + /// Total number of rows deleted across all slices. + pub num_deleted_rows: u64, +} + +/// Combine several fragment-scoped [`Operation::Delete`] transactions into a +/// single delete transaction that can be committed once. +/// +/// This is the driver-side step of a distributed / parallel delete: each task +/// produced an independent [`UncommittedDelete`] over a **disjoint** slice of +/// the dataset's fragments (via [`DeleteBuilder::with_target_fragments`] + +/// [`DeleteBuilder::execute_uncommitted`]). This function stitches their +/// [`Operation::Delete`] metadata into one transaction — it never reads or +/// rewrites data files, and never rewrites deletion files (each task already +/// wrote its own fragments' deletion files): +/// +/// - `updated_fragments` from every transaction are concatenated. +/// - `deleted_fragment_ids` (fragments deleted in whole) are concatenated. +/// - All input transactions must be [`Operation::Delete`], share the same +/// `read_version`, and use the same `predicate`; otherwise an error is +/// returned. +/// +/// # Disjointness +/// +/// The slices must be disjoint, so each fragment id is modified by at most one +/// transaction and the combined result is exactly equal to a single full delete +/// of the same predicate over the whole dataset. If any fragment id appears in +/// more than one transaction (only possible if the caller passed overlapping +/// slices), this errors rather than silently committing — covering both +/// partially-updated fragments and whole-fragment deletions. +/// +/// To rebase safely against concurrent writers, [`CombinedDelete::affected_rows`] +/// is reconstructed by diffing each touched fragment's post-delete deletion +/// vector against its state at `read_version`. +pub async fn combine_delete_transactions( + dataset: &Dataset, + transactions: Vec, +) -> Result { + if transactions.is_empty() { + return Err(Error::invalid_input( + "combine_delete_transactions requires at least one transaction".to_string(), + )); + } + + let read_version = transactions[0].read_version; + let mut combined_predicate: Option = None; + let mut updated_fragments: Vec = Vec::new(); + let mut deleted_fragment_ids: Vec = Vec::new(); + // Every fragment id seen so far, to enforce disjointness across slices. A + // fragment appearing twice (whether updated or wholly deleted) means the + // caller passed overlapping slices. + let mut seen_fragment_ids: BTreeMap = BTreeMap::new(); + + for txn in &transactions { + if txn.read_version != read_version { + return Err(Error::invalid_input(format!( + "all delete transactions must share read_version; expected {}, got {}", + read_version, txn.read_version + ))); + } + let Operation::Delete { + updated_fragments: txn_updated, + deleted_fragment_ids: txn_deleted, + predicate, + } = &txn.operation + else { + return Err(Error::invalid_input(format!( + "combine_delete_transactions only supports Operation::Delete, got {}", + txn.operation.name() + ))); + }; + + match &combined_predicate { + Some(existing) if existing != predicate => { + return Err(Error::invalid_input(format!( + "all delete transactions must share the same predicate; expected {:?}, got {:?}", + existing, predicate + ))); + } + Some(_) => {} + None => combined_predicate = Some(predicate.clone()), + } + + for id in txn_updated + .iter() + .map(|f| f.id) + .chain(txn_deleted.iter().copied()) + { + if seen_fragment_ids.insert(id, ()).is_some() { + return Err(Error::invalid_input(format!( + "two delete transactions both modified fragment {id} — the fragment slices \ + passed to with_target_fragments are not disjoint" + ))); + } + } + + updated_fragments.extend(txn_updated.iter().cloned()); + deleted_fragment_ids.extend(txn_deleted.iter().copied()); + } + + // Deterministic ordering for the committed transaction. + updated_fragments.sort_unstable_by_key(|f| f.id); + deleted_fragment_ids.sort_unstable(); + + // Reconstruct the newly-deleted row addresses (delta vs read_version) so the + // batch commit can rebase against a concurrent writer at row granularity, + // just as a single delete does. This reads deletion files only; data files + // are untouched. `predicate` is guaranteed Some (non-empty input checked + // above); a missing predicate is a programming error. + let predicate = combined_predicate + .ok_or_else(|| Error::internal("combined delete produced no predicate".to_string()))?; + let (affected_rows, num_deleted_rows) = delete_delta_affected_rows( + dataset, + read_version, + &updated_fragments, + &deleted_fragment_ids, + ) + .await?; + + let operation = Operation::Delete { + updated_fragments, + deleted_fragment_ids, + predicate, + }; + + Ok(CombinedDelete { + transaction: Transaction::new(read_version, operation, None), + affected_rows, + num_deleted_rows, + }) +} + +/// Compute the row addresses newly tombstoned by a combined delete, relative to +/// `read_version`: for each updated fragment, the post-delete deletion vector +/// minus the deletions that already existed at `read_version`; for each +/// wholly-deleted fragment, all of its still-live rows at `read_version`. +async fn delete_delta_affected_rows( + dataset: &Dataset, + read_version: u64, + updated_fragments: &[Fragment], + deleted_fragment_ids: &[u64], +) -> Result<(RowAddrTreeMap, u64)> { + let base = dataset.checkout_version(read_version).await?; + let base = &base; + let base_by_id: BTreeMap = base.fragments().iter().map(|f| (f.id, f)).collect(); + let base_by_id = &base_by_id; + let io_parallelism = dataset.object_store.io_parallelism(); + + // Per-fragment newly-deleted bitmaps, computed concurrently (I/O bound). + let per_fragment: Vec<(u32, roaring::RoaringBitmap)> = futures::stream::iter( + updated_fragments + .iter() + .map(|f| (f.id, Some(f))) + .chain(deleted_fragment_ids.iter().map(|id| (*id, None))), + ) + .map(|(frag_id, updated)| { + let base_fragment = base_by_id.get(&frag_id).copied(); + async move { + let before = match base_fragment { + Some(bf) => read_fragment_deletions(base, bf).await?, + // Fragment not present at read_version: nothing was deleted before. + None => roaring::RoaringBitmap::new(), + }; + let after = match updated { + // Partial delete: read the fragment's post-delete deletion vector. + Some(f) => read_fragment_deletions(dataset, f).await?, + // Whole-fragment delete: every physical row is now tombstoned. + None => match base_fragment.and_then(|bf| bf.physical_rows) { + Some(rows) => (0..rows as u32).collect(), + None => roaring::RoaringBitmap::new(), + }, + }; + Ok::<_, Error>((frag_id as u32, after - before)) + } + }) + .buffer_unordered(io_parallelism) + .try_collect() + .await?; + + let mut affected_rows = RowAddrTreeMap::new(); + let mut num_deleted_rows: u64 = 0; + for (frag_id, bitmap) in per_fragment { + num_deleted_rows += bitmap.len(); + if !bitmap.is_empty() { + affected_rows.insert_bitmap(frag_id, bitmap); + } + } + Ok((affected_rows, num_deleted_rows)) +} + +/// Read a fragment's deletion vector as a `RoaringBitmap` (empty if none). +async fn read_fragment_deletions( + dataset: &Dataset, + fragment: &Fragment, +) -> Result { + use crate::io::deletion::read_dataset_deletion_file; + match &fragment.deletion_file { + Some(df) => { + let dv = read_dataset_deletion_file(dataset, fragment.id, df).await?; + Ok(roaring::RoaringBitmap::from_iter( + dv.as_ref().to_sorted_iter(), + )) + } + None => Ok(roaring::RoaringBitmap::new()), + } +} + #[cfg(test)] mod tests { use super::*; @@ -406,6 +714,7 @@ mod tests { use lance_core::utils::tempfile::TempStrDir; use lance_file::version::LanceFileVersion; use lance_index::{IndexType, scalar::ScalarIndexParams}; + use lance_select::mask::RowSetOps; use rstest::rstest; use std::collections::HashSet; use std::ops::Range; @@ -993,6 +1302,7 @@ mod tests { let delete_job = DeleteJob { dataset: dataset_arc.clone(), filter: ExprFilter::Sql("true".to_string()), + target_fragments: None, }; let delete_data = delete_job.execute_impl().await.unwrap(); @@ -1069,4 +1379,490 @@ mod tests { dataset.checkout_latest().await.unwrap(); assert_eq!(dataset.count_rows(None).await.unwrap(), 90); } + + // ------------------------------------------------------------------ + // Fragment-scoped / distributed delete + // ------------------------------------------------------------------ + + fn scoped_delete_schema() -> Arc { + Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::UInt32, + false, + )])) + } + + fn scoped_delete_batch(range: Range) -> RecordBatch { + RecordBatch::try_new( + scoped_delete_schema(), + vec![Arc::new(UInt32Array::from_iter_values(range))], + ) + .unwrap() + } + + /// Build a dataset with one fragment per 100-row block: [0,100), [100,200), ... + async fn make_multi_fragment_dataset(tmp_path: &str, num_fragments: u32) -> Dataset { + let batches: Vec = (0..num_fragments) + .map(|f| scoped_delete_batch((f * 100)..((f + 1) * 100))) + .collect(); + TestDatasetGenerator::new(batches, LanceFileVersion::Stable) + .make_hostile(tmp_path) + .await + } + + async fn surviving_ids(dataset: &Dataset) -> Vec { + let batch = dataset.scan().try_into_batch().await.unwrap(); + let mut ids: Vec = batch["i"].as_primitive::().values().to_vec(); + ids.sort_unstable(); + ids + } + + /// A fragment-scoped delete over disjoint slices, batch-committed, produces + /// exactly the same surviving rows as a single full delete of the same + /// predicate. + #[tokio::test] + async fn test_fragment_scoped_delete_equals_full_delete() { + let predicate = "i % 3 = 0"; + + // Baseline: full delete over the whole dataset. + let baseline_dir = TempStrDir::default(); + let mut baseline = make_multi_fragment_dataset(baseline_dir.as_str(), 4).await; + let baseline_deleted = baseline.delete(predicate).await.unwrap().num_deleted_rows; + let expected_ids = surviving_ids(&baseline).await; + + // Split: partition the 4 fragments into 2 disjoint slices, delete each + // slice uncommitted, then batch-commit the two transactions together. + let split_dir = TempStrDir::default(); + let dataset = Arc::new(make_multi_fragment_dataset(split_dir.as_str(), 4).await); + let all_ids: Vec = dataset.fragments().iter().map(|f| f.id as u32).collect(); + assert_eq!(all_ids.len(), 4); + let slices = [vec![all_ids[0], all_ids[1]], vec![all_ids[2], all_ids[3]]]; + + let mut transactions = Vec::new(); + for slice in &slices { + let staged = DeleteBuilder::new(dataset.clone(), predicate) + .with_target_fragments(slice.clone()) + .execute_uncommitted() + .await + .unwrap(); + + // Each slice's transaction only touches fragments in that slice. + let touched = touched_fragment_ids(&staged.transaction); + for id in &touched { + assert!( + slice.contains(&(*id as u32)), + "slice {slice:?} produced out-of-slice fragment {id}" + ); + } + transactions.push(staged.transaction); + } + + // The combiner reconstructs the same total row count and affected-row + // set as the single full delete. + let combined = combine_delete_transactions(dataset.as_ref(), transactions.clone()) + .await + .unwrap(); + assert_eq!(combined.num_deleted_rows, baseline_deleted); + assert_eq!( + combined.affected_rows.len(), + Some(baseline_deleted), + "affected_rows should cover exactly the deleted rows" + ); + + let result = CommitBuilder::new(dataset.clone()) + .execute_batch(transactions) + .await + .unwrap(); + assert!(matches!(result.merged.operation, Operation::Delete { .. })); + + let committed = result.dataset; + assert_eq!(surviving_ids(&committed).await, expected_ids); + committed.validate().await.unwrap(); + } + + /// Collect every fragment id referenced by a delete transaction (both + /// updated and wholly-deleted fragments). + fn touched_fragment_ids(transaction: &Transaction) -> Vec { + match &transaction.operation { + Operation::Delete { + updated_fragments, + deleted_fragment_ids, + .. + } => updated_fragments + .iter() + .map(|f| f.id) + .chain(deleted_fragment_ids.iter().copied()) + .collect(), + other => panic!("expected delete transaction, got {other:?}"), + } + } + + /// A predicate that matches every row in a slice's fragment removes that + /// fragment entirely (it lands in `deleted_fragment_ids`, not + /// `updated_fragments`), and the batch commit still produces the correct + /// surviving rows. + #[tokio::test] + async fn test_fragment_scoped_delete_deletes_whole_fragment() { + let tmp_dir = TempStrDir::default(); + let dataset = Arc::new(make_multi_fragment_dataset(tmp_dir.as_str(), 3).await); + let ids: Vec = dataset.fragments().iter().map(|f| f.id as u32).collect(); + + // A single predicate that matches ALL of fragment 0 (rows 0..100) but + // only a subset of fragments 1 and 2 (rows 150..250). Slice A (fragment + // 0) therefore deletes its fragment whole; slice B (fragments 1, 2) + // produces partial updates. + let predicate = "i < 100 OR (i >= 150 AND i < 250)"; + + let staged_a = DeleteBuilder::new(dataset.clone(), predicate) + .with_target_fragments(vec![ids[0]]) + .execute_uncommitted() + .await + .unwrap(); + match &staged_a.transaction.operation { + Operation::Delete { + updated_fragments, + deleted_fragment_ids, + .. + } => { + assert!( + updated_fragments.is_empty(), + "whole-fragment delete should not leave an updated fragment" + ); + assert_eq!(deleted_fragment_ids, &vec![ids[0] as u64]); + } + other => panic!("expected delete, got {other:?}"), + } + + let staged_b = DeleteBuilder::new(dataset.clone(), predicate) + .with_target_fragments(vec![ids[1], ids[2]]) + .execute_uncommitted() + .await + .unwrap(); + + let committed = CommitBuilder::new(dataset.clone()) + .execute_batch(vec![staged_a.transaction, staged_b.transaction]) + .await + .unwrap() + .dataset; + + // Fragment 0 gone entirely; 150..250 gone; everything else survives. + let expected: Vec = (100..150).chain(250..300).collect(); + assert_eq!(surviving_ids(&committed).await, expected); + committed.validate().await.unwrap(); + } + + /// A slice whose predicate matches nothing contributes an empty delete, and + /// combining it with a non-empty slice still yields the correct result. + #[tokio::test] + async fn test_fragment_scoped_delete_empty_slice() { + let tmp_dir = TempStrDir::default(); + let dataset = Arc::new(make_multi_fragment_dataset(tmp_dir.as_str(), 2).await); + let ids: Vec = dataset.fragments().iter().map(|f| f.id as u32).collect(); + + // Slice for fragment 0: predicate "i >= 100" matches nothing in [0,100). + let staged_empty = DeleteBuilder::new(dataset.clone(), "i >= 100") + .with_target_fragments(vec![ids[0]]) + .execute_uncommitted() + .await + .unwrap(); + assert_eq!(staged_empty.num_deleted_rows, 0); + assert!(touched_fragment_ids(&staged_empty.transaction).is_empty()); + + // Slice for fragment 1: delete 150..200. + let staged_full = DeleteBuilder::new(dataset.clone(), "i >= 100") + .with_target_fragments(vec![ids[1]]) + .execute_uncommitted() + .await + .unwrap(); + assert_eq!(staged_full.num_deleted_rows, 100); + + let committed = CommitBuilder::new(dataset.clone()) + .execute_batch(vec![staged_empty.transaction, staged_full.transaction]) + .await + .unwrap() + .dataset; + + let expected: Vec = (0..100).collect(); + assert_eq!(surviving_ids(&committed).await, expected); + committed.validate().await.unwrap(); + } + + /// with_target_fragments rejects an id that does not exist in the dataset. + #[tokio::test] + async fn test_fragment_scoped_delete_unknown_fragment_errors() { + let tmp_dir = TempStrDir::default(); + let dataset = Arc::new(make_multi_fragment_dataset(tmp_dir.as_str(), 2).await); + + let result = DeleteBuilder::new(dataset, "i < 10") + .with_target_fragments(vec![999]) + .execute_uncommitted() + .await; + assert!( + matches!(&result, Err(Error::InvalidInput { .. })), + "expected InvalidInput for unknown fragment id, got {result:?}" + ); + } + + /// Assert an error is `InvalidInput` and its message contains `needle`, so + /// the test distinguishes *which* invariant fired (all guards here map to + /// the same `InvalidInput` variant). + fn assert_invalid_input_contains(err: &Result, needle: &str) { + match err { + Err(Error::InvalidInput { source, .. }) => { + let msg = source.to_string(); + assert!( + msg.contains(needle), + "expected InvalidInput message containing {needle:?}, got {msg:?}" + ); + } + other => panic!("expected InvalidInput containing {needle:?}, got {other:?}"), + } + } + + /// combine_delete_transactions rejects transactions whose fragment slices + /// overlap (two tasks both modified the same fragment). + #[tokio::test] + async fn test_combine_delete_rejects_overlapping_slices() { + let tmp_dir = TempStrDir::default(); + let dataset = Arc::new(make_multi_fragment_dataset(tmp_dir.as_str(), 2).await); + let ids: Vec = dataset.fragments().iter().map(|f| f.id as u32).collect(); + + // Both slices target the SAME fragment 0 (overlapping slices), which the + // combiner must reject before ever touching deletion vectors. + let staged_a = DeleteBuilder::new(dataset.clone(), "i < 10") + .with_target_fragments(vec![ids[0]]) + .execute_uncommitted() + .await + .unwrap(); + let staged_b = DeleteBuilder::new(dataset.clone(), "i < 10") + .with_target_fragments(vec![ids[0]]) + .execute_uncommitted() + .await + .unwrap(); + + let err = combine_delete_transactions( + dataset.as_ref(), + vec![staged_a.transaction, staged_b.transaction], + ) + .await; + assert_invalid_input_contains(&err, "not disjoint"); + } + + /// combine_delete_transactions rejects overlapping WHOLE-fragment deletions + /// (same fragment id in two transactions' deleted_fragment_ids), not just + /// overlapping partial updates. + #[tokio::test] + async fn test_combine_delete_rejects_overlapping_whole_fragment() { + let tmp_dir = TempStrDir::default(); + let dataset = Arc::new(make_multi_fragment_dataset(tmp_dir.as_str(), 2).await); + let ids: Vec = dataset.fragments().iter().map(|f| f.id as u32).collect(); + + // "i < 100" deletes ALL of fragment 0, so both staged deletes put + // fragment 0 in deleted_fragment_ids (empty updated_fragments). + let staged_a = DeleteBuilder::new(dataset.clone(), "i < 100") + .with_target_fragments(vec![ids[0]]) + .execute_uncommitted() + .await + .unwrap(); + let staged_b = DeleteBuilder::new(dataset.clone(), "i < 100") + .with_target_fragments(vec![ids[0]]) + .execute_uncommitted() + .await + .unwrap(); + assert!(matches!( + &staged_a.transaction.operation, + Operation::Delete { updated_fragments, deleted_fragment_ids, .. } + if updated_fragments.is_empty() && deleted_fragment_ids == &[ids[0] as u64] + )); + + let err = combine_delete_transactions( + dataset.as_ref(), + vec![staged_a.transaction, staged_b.transaction], + ) + .await; + assert_invalid_input_contains(&err, "not disjoint"); + } + + /// combine_delete_transactions rejects a mix of predicates. + #[tokio::test] + async fn test_combine_delete_rejects_mismatched_predicate() { + let tmp_dir = TempStrDir::default(); + let dataset = Arc::new(make_multi_fragment_dataset(tmp_dir.as_str(), 2).await); + let ids: Vec = dataset.fragments().iter().map(|f| f.id as u32).collect(); + + let staged_a = DeleteBuilder::new(dataset.clone(), "i < 10") + .with_target_fragments(vec![ids[0]]) + .execute_uncommitted() + .await + .unwrap(); + let staged_b = DeleteBuilder::new(dataset.clone(), "i >= 150") + .with_target_fragments(vec![ids[1]]) + .execute_uncommitted() + .await + .unwrap(); + + let err = combine_delete_transactions( + dataset.as_ref(), + vec![staged_a.transaction, staged_b.transaction], + ) + .await; + assert_invalid_input_contains(&err, "same predicate"); + } + + /// with_target_fragments rejects a repeated fragment id. + #[tokio::test] + async fn test_fragment_scoped_delete_duplicate_id_errors() { + let tmp_dir = TempStrDir::default(); + let dataset = Arc::new(make_multi_fragment_dataset(tmp_dir.as_str(), 2).await); + let ids: Vec = dataset.fragments().iter().map(|f| f.id as u32).collect(); + + let err = DeleteBuilder::new(dataset, "i < 10") + .with_target_fragments(vec![ids[0], ids[0]]) + .execute_uncommitted() + .await; + assert_invalid_input_contains(&err, "duplicate fragment id"); + } + + /// execute_batch rejects a batch that mixes operation kinds. + #[tokio::test] + async fn test_execute_batch_rejects_mixed_operations() { + let tmp_dir = TempStrDir::default(); + let dataset = Arc::new(make_multi_fragment_dataset(tmp_dir.as_str(), 2).await); + let ids: Vec = dataset.fragments().iter().map(|f| f.id as u32).collect(); + + let staged_delete = DeleteBuilder::new(dataset.clone(), "i < 10") + .with_target_fragments(vec![ids[0]]) + .execute_uncommitted() + .await + .unwrap(); + let append = Transaction::new( + dataset.manifest.version, + Operation::Append { + fragments: Vec::new(), + }, + None, + ); + + let err = CommitBuilder::new(dataset.clone()) + .execute_batch(vec![staged_delete.transaction, append]) + .await + .err(); + assert!( + matches!(&err, Some(Error::NotSupported { .. })), + "expected NotSupported for mixed batch, got {err:?}" + ); + } + + /// A fragment-scoped distributed delete keeps a scalar index consistent: + /// the surviving rows match a full delete and post-delete queries are + /// correct. + #[tokio::test] + async fn test_fragment_scoped_delete_with_scalar_index() { + let predicate = "i % 2 = 0"; + + let tmp_dir = TempStrDir::default(); + let mut dataset = make_multi_fragment_dataset(tmp_dir.as_str(), 4).await; + dataset + .create_index( + &["i"], + IndexType::Scalar, + Some("scalar_index".to_string()), + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + let ids: Vec = dataset.fragments().iter().map(|f| f.id as u32).collect(); + let slices = [vec![ids[0], ids[1]], vec![ids[2], ids[3]]]; + let mut transactions = Vec::new(); + for slice in &slices { + let staged = DeleteBuilder::new(dataset.clone(), predicate) + .with_target_fragments(slice.clone()) + .execute_uncommitted() + .await + .unwrap(); + transactions.push(staged.transaction); + } + + let committed = CommitBuilder::new(dataset.clone()) + .execute_batch(transactions) + .await + .unwrap() + .dataset; + committed.validate().await.unwrap(); + + // Only odd i survive. + let expected: Vec = (0..400).filter(|v| v % 2 == 1).collect(); + assert_eq!(surviving_ids(&committed).await, expected); + + // The scalar index survives the distributed delete and still covers the + // dataset: statistics report the post-delete live-row count with nothing + // left unindexed. (A filtered count alone would pass even if the index + // were dropped, since the scanner falls back to a full scan.) + let stats: serde_json::Value = + serde_json::from_str(&committed.index_statistics("scalar_index").await.unwrap()) + .unwrap(); + assert_eq!(stats["num_indexed_rows"].as_u64(), Some(200)); + assert_eq!(stats["num_unindexed_rows"].as_u64(), Some(0)); + + // An indexed lookup of a deleted value returns nothing; a surviving one + // returns exactly one row. + let mut deleted_scan = committed.scan(); + deleted_scan.filter("i = 2").unwrap(); + assert_eq!(deleted_scan.count_rows().await.unwrap(), 0); + + let mut surviving_scan = committed.scan(); + surviving_scan.filter("i = 3").unwrap(); + assert_eq!(surviving_scan.count_rows().await.unwrap(), 1); + } + + /// A batched distributed delete rebases against a concurrent delete that + /// touched a different fragment, thanks to the reconstructed affected_rows — + /// matching a single delete's row-level conflict handling. + #[tokio::test] + async fn test_batch_delete_rebases_against_concurrent_delete() { + let tmp_dir = TempStrDir::default(); + let dataset = Arc::new(make_multi_fragment_dataset(tmp_dir.as_str(), 3).await); + let ids: Vec = dataset.fragments().iter().map(|f| f.id as u32).collect(); + + // Stage a distributed delete over fragments 0 and 1 at the current + // version. + let mut transactions = Vec::new(); + for id in [ids[0], ids[1]] { + let staged = DeleteBuilder::new(dataset.clone(), "i % 2 = 0") + .with_target_fragments(vec![id]) + .execute_uncommitted() + .await + .unwrap(); + transactions.push(staged.transaction); + } + + // A concurrent delete commits first, tombstoning rows in fragment 2 (a + // fragment the staged batch does NOT touch). This advances the version, + // so the batch commit must rebase rather than fail. + DeleteBuilder::new(dataset.clone(), "i >= 250 AND i < 260") + .execute() + .await + .unwrap(); + + // The batch commit carries affected_rows, so it rebases onto the new + // version and succeeds (a None affected_rows would hard-fail here). + let committed = CommitBuilder::new(dataset.clone()) + .execute_batch(transactions) + .await + .unwrap() + .dataset; + committed.validate().await.unwrap(); + + // Both deletes applied: even i in [0,200) gone (distributed), 250..260 + // gone (concurrent). + let expected: Vec = (0..200) + .filter(|v| v % 2 == 1) + .chain(200..250) + .chain(260..300) + .collect(); + assert_eq!(surviving_ids(&committed).await, expected); + } }