[flink] Add restore_as_latest procedure#8139
Conversation
ecd0935 to
0f821ed
Compare
0f821ed to
1492902
Compare
|
I think The new snapshot writes the target snapshot's files into the base manifest list, but writes an empty delta manifest list and marks the commit as Could |
|
@JingsongLi The new snapshot currently has the target snapshot's complete data manifests in baseManifestList, but its deltaManifestList is empty. This makes the final table state correct for batch/full scans, but the restore can be invisible to streaming overwrite readers. I will update restoreAsLatest to generate an overwrite delta relative to the previous latest snapshot: DELETE files that exist only in the previous latest snapshot, and ADD files that exist only in the target snapshot. The baseManifestList will contain the previous latest snapshot's merged effective ADD files, while deltaManifestList will describe the previous-latest-to-target transition. |
Ensure restore_as_latest writes an overwrite delta so streaming overwrite readers can observe restored file changes.
439db01 to
1c523ac
Compare
|
@JingsongLi I also added IT coverage to verify both DELETE-only and ADD-only restore deltas. |
Skip automatic expiration on the restore-as-latest path so it no longer deletes the snapshots/tags it promises to keep (e.g. with snapshot.num-retained.max=1), and keep nextRowId monotonic by taking the max of the previous latest and target snapshot, preventing row id reuse that breaks _ROW_ID global uniqueness on row-tracking tables. Add IT cases covering both fixes.
| targetSnapshot.properties(), | ||
| nextRowId); | ||
|
|
||
| return commitSnapshotImpl(newSnapshot, new ArrayList<>(PartitionEntry.merge(deltaFiles))); |
There was a problem hiding this comment.
This restore commit bypasses the normal post-commit callbacks. Regular commits call commitCallbacks with the committed snapshot and delta files after commitSnapshotImpl succeeds, and those callbacks keep external state in sync (for example Iceberg compatibility metadata uses context.snapshot/context.deltaFiles, and chain-table overwrite handling reacts to CommitKind.OVERWRITE). restoreAsLatest also changes the table state with an overwrite delta, but it returns immediately after writing the Paimon snapshot, so those external views can remain at the pre-restore state. Could we trigger the same commit callback path after a successful restore, using the restored base/delta/index files and the new snapshot context?
There was a problem hiding this comment.
Good catch, thanks. restoreAsLatest committed via commitSnapshotImpl directly and skipped the commit callbacks, so external views (Iceberg metadata, chain-table overwrite) could stay at the pre-restore state.
Fixed by notifying the callbacks after a successful restore, like a regular commit. The context uses the restore delta (DELETE previous-only + ADD target-only) and an index delta derived the same way from the previous-latest and target index manifests. Both callbacks are idempotent, so retries stay correct.
Added an IT case testRestoreTriggersCommitCallback asserting Iceberg metadata is generated for the restore snapshot.
restoreAsLatest committed the restore snapshot directly via commitSnapshotImpl, bypassing the commit callbacks that a regular commit runs. External views that depend on those callbacks (Iceberg compatibility metadata, chain-table overwrite handling) could therefore stay at the pre-restore state. Notify the callbacks after a successful restore using the restored base/delta/index files and the new snapshot. The index changes are derived from the previous latest and target index manifests, mirroring how the data delta files are computed. Add an IT case asserting Iceberg metadata is generated for the restore snapshot.
| targetSnapshot.properties(), | ||
| nextRowId); | ||
|
|
||
| boolean success = |
There was a problem hiding this comment.
This restore path still bypasses the normal pre-commit callbacks. Regular commits call commitPreCallbacks before commitSnapshotImpl, and ChainTableCommitPreCallback uses that hook to reject unsafe pure-DELETE overwrite commits on the snapshot branch. restoreAsLatest also creates an OVERWRITE delta, so restoring a snapshot branch back to an older state can delete snapshot partitions that a normal overwrite would validate and potentially abort. Please run the same pre-callback path (with the restore base files, delta files, index changes, and new snapshot) before making the restore snapshot visible.
There was a problem hiding this comment.
restoreAsLatest only ran the post-commit callbacks and skipped the pre-commit ones. ChainTableCommitPreCallback uses that hook to reject a pure-DELETE overwrite on a chain-table snapshot branch that would drop a snapshot partition still anchoring delta partitions, so restoring such a branch to an older state could silently break the chain.
Fixed by invoking commitPreCallbacks before commitSnapshotImpl (sharing the restore base/delta/index files with the post callbacks), so an unsafe restore is now aborted before the snapshot is created — same behavior as a regular overwrite.
Added an IT case asserting such a restore is rejected and the latest snapshot is left unchanged.
| options.scanManifestParallelism()); | ||
| targetEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD); | ||
|
|
||
| List<ManifestEntry> deltaFiles = new ArrayList<>(); |
There was a problem hiding this comment.
The restore delta is built only from data-file identifier differences. For tables using deletion vectors or other index-manifest state, the logical contents can change while the data files stay the same; restoring across a DV-only delete/update would leave deltaFiles empty and deltaRecordCount as 0, even though targetSnapshot.indexManifest() and totalRecordCount differ. Streaming overwrite readers call readChanges() from the DELTA data manifests, and the streaming path does not load DV indexes, so such a restore can still be skipped by streaming readers. Please include the relevant index/DV changes in the restore transition (or otherwise make the overwrite read/delta counts handle index-only restores) instead of relying on the final indexManifest alone.
There was a problem hiding this comment.
You're right. A deletion-vector-only delete/update doesn't rewrite the data file, so the identifier-based diff left an empty data delta with deltaRecordCount 0, and streaming overwrite readers (which read the DELTA data manifests and don't load DV indexes) could skip the restore.
Fixed by comparing the deletion vectors between the previous latest and the target (via the index-manifest dvRanges) and re-emitting each data file whose DV changed as DELETE(latest) + ADD(target). This makes the restore transition visible to streaming overwrite readers. The physical row count is unchanged, so the pair nets to zero in the delta record count, matching the unchanged totalRecordCount.
Note this makes restore behave like a regular overwrite for DV tables; applying the DV itself during streaming-read-overwrite is a separate, pre-existing limitation of that read path and is out of scope here.
Added a FileStoreCommitTest covering a DV-only restore transition.
restoreAsLatest only ran the post-commit callbacks; it skipped the pre-commit callbacks that a regular commit runs before the snapshot becomes visible. ChainTableCommitPreCallback uses that hook to reject a pure-DELETE overwrite on a chain-table snapshot branch that would drop a snapshot partition still anchoring delta partitions. Since restore also creates an OVERWRITE delta, restoring such a branch to an older state could silently break the chain. Invoke commitPreCallbacks before commitSnapshotImpl, sharing the restore base/delta/index files with the post callbacks, so an unsafe restore is aborted before the snapshot is created. Add an IT case asserting the dangerous restore is rejected and the latest snapshot is left unchanged.
The restore delta was built only from data-file identifier differences. A deletion-vector-only delete/update does not rewrite the data file, so such a change was missed, leaving an empty data delta with deltaRecordCount 0. Streaming overwrite readers read the DELTA data manifests and do not load DV indexes, so they could skip the restore entirely. Compare the deletion vectors (via index-manifest dvRanges) between the previous latest and the target, and re-emit each data file whose DV changed as DELETE(latest) + ADD(target). The physical row count is unchanged, so the pair nets to zero in the delta record count, matching the unchanged totalRecordCount. Add a FileStoreCommitTest covering a DV-only restore transition.
| // DELETE(latest) + ADD(target) makes the restore transition visible. The physical row count | ||
| // is unchanged, so the pair nets to zero in the delta record count, consistent with the | ||
| // unchanged totalRecordCount. | ||
| addDeletionVectorOnlyChanges( |
There was a problem hiding this comment.
This still does not make DV-only restores correct for streaming readers. readChanges() always calls toIncrementalPlan(true, ...), and SnapshotReaderImpl only scans DV indexes when !isStreaming, so the IncrementalSplit built for this DELETE+ADD pair has both beforeDeletionFiles and afterDeletionFiles as null. IncrementalChangelogReadProvider then reads the same physical data file on both sides without applying either snapshot's DV: restoring from a DV snapshot back to a no-DV snapshot emits a full-file retract plus a full-file add instead of only adding the previously deleted rows (and the opposite direction similarly fails to delete only the DV rows). The new test only checks that the delta manifest is non-empty, so it does not catch this consumer-path behavior. Please either make the restore streaming delta carry the snapshot-specific DV files, or cover this with a streaming changelog/diff read test that proves the logical rows produced by a DV-only restore are correct.
Purpose
This PR adds a non-destructive restore procedure for Flink:
Unlike
rollback_to, this procedure restores the table to the state of a target snapshot or tag by creating a new latest snapshot. Later snapshots and tags are preserved.What is changed
RestoreAsLatestProcedureand register it in the Flink procedure factory list.Tests
mvn -pl paimon-flink/paimon-flink-common -am -Pfast-build -DfailIfNoTests=false -Dtest=RestoreAsLatestProcedureITCase test git diff --checkNotes
This PR is not associated with an issue yet. If the community prefers following the discussion-first flow strictly, I can open or join an issue/discussion and adjust the design accordingly.