Skip to content

[python] Reuse base entries on commit retry instead of full re-scan#8358

Open
XiaoHongbo-Hope wants to merge 7 commits into
apache:masterfrom
XiaoHongbo-Hope:overwrite_commit_fix
Open

[python] Reuse base entries on commit retry instead of full re-scan#8358
XiaoHongbo-Hope wants to merge 7 commits into
apache:masterfrom
XiaoHongbo-Hope:overwrite_commit_fix

Conversation

@XiaoHongbo-Hope

Copy link
Copy Markdown
Contributor

Purpose

Tests

Port of apache#4286 to pypaimon. On an OVERWRITE (or any conflict-detected)
commit, every retry re-scanned the full changed partitions to rebuild the
base entries for conflict detection. Under concurrent writers this made
each retry as expensive as the first attempt, so a slow commit kept losing
the snapshot CAS race and could exhaust commit.max-retries and drop the
whole batch.

Carry the base entries computed by a failed attempt on RetryResult and, on
the next attempt, reuse them plus only the incremental delta committed since
(CommitScanner.read_incremental_changes), mirroring Java FileStoreCommitImpl.

Add a deterministic regression test that forces K conflicts and asserts the
full conflict scan runs once (not K+1) and the incremental read runs once
per retry.
entries = []
for snapshot_id in range(from_snapshot.id + 1, to_snapshot.id + 1):
snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
if snapshot is None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slightly weaker than the Java implementation. Java CommitScanner#readIncrementalChanges scans every snapshot id in (from, to] via scan.withSnapshot(i), so a missing/expired intermediate snapshot is not treated as an empty delta. Here, skipping None while reusing base_data_files can make the reconstructed base incomplete and allow conflict detection to run on a state that is not equivalent to latest_snapshot. Could we either fall back to read_all_entries_from_changed_partitions(to_snapshot, commit_entries) when any intermediate snapshot is missing, or fail conservatively so the retry does a full scan?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slightly weaker than the Java implementation. Java CommitScanner#readIncrementalChanges scans every snapshot id in (from, to] via scan.withSnapshot(i), so a missing/expired intermediate snapshot is not treated as an empty delta. Here, skipping None while reusing base_data_files can make the reconstructed base incomplete and allow conflict detection to run on a state that is not equivalent to latest_snapshot. Could we either fall back to read_all_entries_from_changed_partitions(to_snapshot, commit_entries) when any intermediate snapshot is missing, or fail conservatively so the retry does a full scan?

Fixed

def patched_cas(snapshot, statistics):
if snapshot.commit_kind == "OVERWRITE" and cas['fails'] < K:
cas['fails'] += 1
self._append(pd.DataFrame({'f0': [99], 'f1': [f'x{cas["fails"]}']}))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry logic matches the Java approach, but this test does not exercise the non-empty incremental merge path: the overwrite targets f0=1, while the simulated concurrent writer appends to f0=99, so read_incremental_changes is called but its partition filter returns no entries. To make the Java-port behavior safer to maintain, could we also cover a same-partition concurrent append/update and assert that the incremental entries are merged into the reused base correctly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry logic matches the Java approach, but this test does not exercise the non-empty incremental merge path: the overwrite targets f0=1, while the simulated concurrent writer appends to f0=99, so read_incremental_changes is called but its partition filter returns no entries. To make the Java-port behavior safer to maintain, could we also cover a same-partition concurrent append/update and assert that the incremental entries are merged into the reused base correctly?

Added.

- read_incremental_changes: raise on a missing snapshot in the range instead
  of silently skipping (a skip drops concurrent changes -> wrong conflict
  base), aligning with Java.
- read_incremental_changes: build the partition filter once and reuse it
  across the snapshot range instead of rebuilding per snapshot.
- On rollback, return a RetryResult carrying neither snapshot nor base so the
  duplicate check re-scans from the start (matches Java RollbackRetryResult).
- Make the regression test self-contained (only the conflict-detection scan).
@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as ready for review June 25, 2026 14:30
- read_incremental_changes returns None on a missing/expired intermediate
  snapshot; the caller then falls back to a full scan instead of raising, so the
  conflict base stays equivalent to the latest snapshot.
- Add a test where the concurrent append hits the overwrite target partition, so
  the incremental read is non-empty and merged into the reused base; assert the
  merged base equals a fresh full scan.
- read_incremental_changes annotated -> Optional[List[ManifestEntry]] since it
  returns None when an intermediate snapshot is missing.
- Add a test that makes an intermediate snapshot appear missing and asserts
  read_incremental_changes returns None and the retry falls back to a full scan.
Prove the incremental conflict base stays correct when a non-APPEND (OVERWRITE)
snapshot lands between retries: its delta carries the DELETE entries and
merge_entries reconciles them, so the merged base equals a fresh full scan
(mirrors Java apache#4286, which merges deltas regardless of commit kind).
- Cover a COMPACT-kind intermediate snapshot (synthesized, as pypaimon has no
  compact API): its DELETE+ADD delta is merged correctly, merged base == full
  scan.
- Share the non-APPEND/COMPACT assertion via a helper; trim verbose comments.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants