[python] Reuse base entries on commit retry instead of full re-scan#8358
[python] Reuse base entries on commit retry instead of full re-scan#8358XiaoHongbo-Hope wants to merge 7 commits into
Conversation
Port of apache#4286 to pypaimon. On an OVERWRITE (or any conflict-detected) commit, every retry re-scanned the full changed partitions to rebuild the base entries for conflict detection. Under concurrent writers this made each retry as expensive as the first attempt, so a slow commit kept losing the snapshot CAS race and could exhaust commit.max-retries and drop the whole batch. Carry the base entries computed by a failed attempt on RetryResult and, on the next attempt, reuse them plus only the incremental delta committed since (CommitScanner.read_incremental_changes), mirroring Java FileStoreCommitImpl. Add a deterministic regression test that forces K conflicts and asserts the full conflict scan runs once (not K+1) and the incremental read runs once per retry.
| entries = [] | ||
| for snapshot_id in range(from_snapshot.id + 1, to_snapshot.id + 1): | ||
| snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id) | ||
| if snapshot is None: |
There was a problem hiding this comment.
This is slightly weaker than the Java implementation. Java CommitScanner#readIncrementalChanges scans every snapshot id in (from, to] via scan.withSnapshot(i), so a missing/expired intermediate snapshot is not treated as an empty delta. Here, skipping None while reusing base_data_files can make the reconstructed base incomplete and allow conflict detection to run on a state that is not equivalent to latest_snapshot. Could we either fall back to read_all_entries_from_changed_partitions(to_snapshot, commit_entries) when any intermediate snapshot is missing, or fail conservatively so the retry does a full scan?
There was a problem hiding this comment.
This is slightly weaker than the Java implementation. Java
CommitScanner#readIncrementalChangesscans every snapshot id in(from, to]viascan.withSnapshot(i), so a missing/expired intermediate snapshot is not treated as an empty delta. Here, skippingNonewhile reusingbase_data_filescan make the reconstructed base incomplete and allow conflict detection to run on a state that is not equivalent tolatest_snapshot. Could we either fall back toread_all_entries_from_changed_partitions(to_snapshot, commit_entries)when any intermediate snapshot is missing, or fail conservatively so the retry does a full scan?
Fixed
| def patched_cas(snapshot, statistics): | ||
| if snapshot.commit_kind == "OVERWRITE" and cas['fails'] < K: | ||
| cas['fails'] += 1 | ||
| self._append(pd.DataFrame({'f0': [99], 'f1': [f'x{cas["fails"]}']})) |
There was a problem hiding this comment.
The retry logic matches the Java approach, but this test does not exercise the non-empty incremental merge path: the overwrite targets f0=1, while the simulated concurrent writer appends to f0=99, so read_incremental_changes is called but its partition filter returns no entries. To make the Java-port behavior safer to maintain, could we also cover a same-partition concurrent append/update and assert that the incremental entries are merged into the reused base correctly?
There was a problem hiding this comment.
The retry logic matches the Java approach, but this test does not exercise the non-empty incremental merge path: the overwrite targets
f0=1, while the simulated concurrent writer appends tof0=99, soread_incremental_changesis called but its partition filter returns no entries. To make the Java-port behavior safer to maintain, could we also cover a same-partition concurrent append/update and assert that the incremental entries are merged into the reused base correctly?
Added.
- read_incremental_changes: raise on a missing snapshot in the range instead of silently skipping (a skip drops concurrent changes -> wrong conflict base), aligning with Java. - read_incremental_changes: build the partition filter once and reuse it across the snapshot range instead of rebuilding per snapshot. - On rollback, return a RetryResult carrying neither snapshot nor base so the duplicate check re-scans from the start (matches Java RollbackRetryResult). - Make the regression test self-contained (only the conflict-detection scan).
- read_incremental_changes returns None on a missing/expired intermediate snapshot; the caller then falls back to a full scan instead of raising, so the conflict base stays equivalent to the latest snapshot. - Add a test where the concurrent append hits the overwrite target partition, so the incremental read is non-empty and merged into the reused base; assert the merged base equals a fresh full scan.
- read_incremental_changes annotated -> Optional[List[ManifestEntry]] since it returns None when an intermediate snapshot is missing. - Add a test that makes an intermediate snapshot appear missing and asserts read_incremental_changes returns None and the retry falls back to a full scan.
Prove the incremental conflict base stays correct when a non-APPEND (OVERWRITE) snapshot lands between retries: its delta carries the DELETE entries and merge_entries reconciles them, so the merged base equals a fresh full scan (mirrors Java apache#4286, which merges deltas regardless of commit kind).
- Cover a COMPACT-kind intermediate snapshot (synthesized, as pypaimon has no compact API): its DELETE+ADD delta is merged correctly, merged base == full scan. - Share the non-APPEND/COMPACT assertion via a helper; trim verbose comments.
Purpose
Tests