Skip to content

[flink] Add restore_as_latest procedure#8139

Open
zhuxiangyi wants to merge 7 commits into
apache:masterfrom
zhuxiangyi:feat/restore-as-latest-procedure
Open

[flink] Add restore_as_latest procedure#8139
zhuxiangyi wants to merge 7 commits into
apache:masterfrom
zhuxiangyi:feat/restore-as-latest-procedure

Conversation

@zhuxiangyi

Copy link
Copy Markdown

Purpose

This PR adds a non-destructive restore procedure for Flink:

CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 3);
CALL sys.restore_as_latest(`table` => 'default.T', tag => 'tag-1');

Unlike rollback_to, this procedure restores the table to the state of a target snapshot or tag by creating a new latest snapshot. Later snapshots and tags are preserved.

What is changed

  • Add RestoreAsLatestProcedure and register it in the Flink procedure factory list.
  • Add commit support to create a new latest snapshot from the complete data manifests of the target snapshot.
  • Add IT coverage for restoring from snapshot and tag, preserving later snapshots, and writing after restore.
  • Document the procedure in Flink procedures and snapshot/tag maintenance docs.

Tests

mvn -pl paimon-flink/paimon-flink-common -am -Pfast-build -DfailIfNoTests=false -Dtest=RestoreAsLatestProcedureITCase test
git diff --check

Notes

This PR is not associated with an issue yet. If the community prefers following the discussion-first flow strictly, I can open or join an issue/discussion and adjust the design accordingly.

@zhuxiangyi zhuxiangyi force-pushed the feat/restore-as-latest-procedure branch 2 times, most recently from ecd0935 to 0f821ed Compare June 5, 2026 23:37
@zhuxiangyi zhuxiangyi force-pushed the feat/restore-as-latest-procedure branch from 0f821ed to 1492902 Compare June 6, 2026 01:03
@JingsongLi

Copy link
Copy Markdown
Contributor

I think restoreAsLatest can be invisible to streaming readers that handle overwrite snapshots.

The new snapshot writes the target snapshot's files into the base manifest list, but writes an empty delta manifest list and marks the commit as CommitKind.OVERWRITE (FileStoreCommitImpl.java:1174-1200). DataTableStreamScan first handles overwrite snapshots via the overwrite-change path, and if the returned plan is empty it advances past the snapshot. Since the restore snapshot has no delta, a streaming reader with streaming-read-overwrite=true can skip the restore entirely, missing both files/rows that should be removed from the current latest snapshot and files/rows that should be restored from the target snapshot.

Could restoreAsLatest produce an overwrite delta relative to the previous latest snapshot (DELETE previous-only files and ADD target-only files), or introduce a dedicated commit kind/streaming-scan handling for restore snapshots?

@zhuxiangyi

Copy link
Copy Markdown
Author

@JingsongLi
Thanks for pointing this out. I agree that the current restore snapshot is not sufficient for streaming readers with streaming-read-overwrite=true.

The new snapshot currently has the target snapshot's complete data manifests in baseManifestList, but its deltaManifestList is empty. This makes the final table state correct for batch/full scans, but the restore can be invisible to streaming overwrite readers.

I will update restoreAsLatest to generate an overwrite delta relative to the previous latest snapshot: DELETE files that exist only in the previous latest snapshot, and ADD files that exist only in the target snapshot. The baseManifestList will contain the previous latest snapshot's merged effective ADD files, while deltaManifestList will describe the previous-latest-to-target transition.

Ensure restore_as_latest writes an overwrite delta so streaming overwrite readers can observe restored file changes.
@zhuxiangyi zhuxiangyi force-pushed the feat/restore-as-latest-procedure branch from 439db01 to 1c523ac Compare June 8, 2026 02:17
@zhuxiangyi

Copy link
Copy Markdown
Author

@JingsongLi
Updated in the latest commit. restoreAsLatest now writes an overwrite delta from the previous latest snapshot to the target snapshot: DELETE files that exist only in the previous latest snapshot and ADD files that exist only in the target snapshot.

I also added IT coverage to verify both DELETE-only and ADD-only restore deltas.

Comment thread paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java Outdated
Comment thread paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java Outdated
Skip automatic expiration on the restore-as-latest path so it no longer
deletes the snapshots/tags it promises to keep (e.g. with
snapshot.num-retained.max=1), and keep nextRowId monotonic by taking the
max of the previous latest and target snapshot, preventing row id reuse
that breaks _ROW_ID global uniqueness on row-tracking tables.

Add IT cases covering both fixes.
@zhuxiangyi zhuxiangyi requested a review from JingsongLi June 21, 2026 16:33
targetSnapshot.properties(),
nextRowId);

return commitSnapshotImpl(newSnapshot, new ArrayList<>(PartitionEntry.merge(deltaFiles)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This restore commit bypasses the normal post-commit callbacks. Regular commits call commitCallbacks with the committed snapshot and delta files after commitSnapshotImpl succeeds, and those callbacks keep external state in sync (for example Iceberg compatibility metadata uses context.snapshot/context.deltaFiles, and chain-table overwrite handling reacts to CommitKind.OVERWRITE). restoreAsLatest also changes the table state with an overwrite delta, but it returns immediately after writing the Paimon snapshot, so those external views can remain at the pre-restore state. Could we trigger the same commit callback path after a successful restore, using the restored base/delta/index files and the new snapshot context?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks. restoreAsLatest committed via commitSnapshotImpl directly and skipped the commit callbacks, so external views (Iceberg metadata, chain-table overwrite) could stay at the pre-restore state.

Fixed by notifying the callbacks after a successful restore, like a regular commit. The context uses the restore delta (DELETE previous-only + ADD target-only) and an index delta derived the same way from the previous-latest and target index manifests. Both callbacks are idempotent, so retries stay correct.

Added an IT case testRestoreTriggersCommitCallback asserting Iceberg metadata is generated for the restore snapshot.

restoreAsLatest committed the restore snapshot directly via
commitSnapshotImpl, bypassing the commit callbacks that a regular commit
runs. External views that depend on those callbacks (Iceberg
compatibility metadata, chain-table overwrite handling) could therefore
stay at the pre-restore state.

Notify the callbacks after a successful restore using the restored
base/delta/index files and the new snapshot. The index changes are
derived from the previous latest and target index manifests, mirroring
how the data delta files are computed.

Add an IT case asserting Iceberg metadata is generated for the restore
snapshot.
targetSnapshot.properties(),
nextRowId);

boolean success =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This restore path still bypasses the normal pre-commit callbacks. Regular commits call commitPreCallbacks before commitSnapshotImpl, and ChainTableCommitPreCallback uses that hook to reject unsafe pure-DELETE overwrite commits on the snapshot branch. restoreAsLatest also creates an OVERWRITE delta, so restoring a snapshot branch back to an older state can delete snapshot partitions that a normal overwrite would validate and potentially abort. Please run the same pre-callback path (with the restore base files, delta files, index changes, and new snapshot) before making the restore snapshot visible.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restoreAsLatest only ran the post-commit callbacks and skipped the pre-commit ones. ChainTableCommitPreCallback uses that hook to reject a pure-DELETE overwrite on a chain-table snapshot branch that would drop a snapshot partition still anchoring delta partitions, so restoring such a branch to an older state could silently break the chain.

Fixed by invoking commitPreCallbacks before commitSnapshotImpl (sharing the restore base/delta/index files with the post callbacks), so an unsafe restore is now aborted before the snapshot is created — same behavior as a regular overwrite.

Added an IT case asserting such a restore is rejected and the latest snapshot is left unchanged.

options.scanManifestParallelism());
targetEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD);

List<ManifestEntry> deltaFiles = new ArrayList<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The restore delta is built only from data-file identifier differences. For tables using deletion vectors or other index-manifest state, the logical contents can change while the data files stay the same; restoring across a DV-only delete/update would leave deltaFiles empty and deltaRecordCount as 0, even though targetSnapshot.indexManifest() and totalRecordCount differ. Streaming overwrite readers call readChanges() from the DELTA data manifests, and the streaming path does not load DV indexes, so such a restore can still be skipped by streaming readers. Please include the relevant index/DV changes in the restore transition (or otherwise make the overwrite read/delta counts handle index-only restores) instead of relying on the final indexManifest alone.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. A deletion-vector-only delete/update doesn't rewrite the data file, so the identifier-based diff left an empty data delta with deltaRecordCount 0, and streaming overwrite readers (which read the DELTA data manifests and don't load DV indexes) could skip the restore.

Fixed by comparing the deletion vectors between the previous latest and the target (via the index-manifest dvRanges) and re-emitting each data file whose DV changed as DELETE(latest) + ADD(target). This makes the restore transition visible to streaming overwrite readers. The physical row count is unchanged, so the pair nets to zero in the delta record count, matching the unchanged totalRecordCount.

Note this makes restore behave like a regular overwrite for DV tables; applying the DV itself during streaming-read-overwrite is a separate, pre-existing limitation of that read path and is out of scope here.

Added a FileStoreCommitTest covering a DV-only restore transition.

restoreAsLatest only ran the post-commit callbacks; it skipped the
pre-commit callbacks that a regular commit runs before the snapshot
becomes visible. ChainTableCommitPreCallback uses that hook to reject a
pure-DELETE overwrite on a chain-table snapshot branch that would drop a
snapshot partition still anchoring delta partitions. Since restore also
creates an OVERWRITE delta, restoring such a branch to an older state
could silently break the chain.

Invoke commitPreCallbacks before commitSnapshotImpl, sharing the restore
base/delta/index files with the post callbacks, so an unsafe restore is
aborted before the snapshot is created.

Add an IT case asserting the dangerous restore is rejected and the latest
snapshot is left unchanged.
The restore delta was built only from data-file identifier differences.
A deletion-vector-only delete/update does not rewrite the data file, so
such a change was missed, leaving an empty data delta with
deltaRecordCount 0. Streaming overwrite readers read the DELTA data
manifests and do not load DV indexes, so they could skip the restore
entirely.

Compare the deletion vectors (via index-manifest dvRanges) between the
previous latest and the target, and re-emit each data file whose DV
changed as DELETE(latest) + ADD(target). The physical row count is
unchanged, so the pair nets to zero in the delta record count, matching
the unchanged totalRecordCount.

Add a FileStoreCommitTest covering a DV-only restore transition.
@zhuxiangyi zhuxiangyi requested a review from JingsongLi June 25, 2026 09:33
// DELETE(latest) + ADD(target) makes the restore transition visible. The physical row count
// is unchanged, so the pair nets to zero in the delta record count, consistent with the
// unchanged totalRecordCount.
addDeletionVectorOnlyChanges(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still does not make DV-only restores correct for streaming readers. readChanges() always calls toIncrementalPlan(true, ...), and SnapshotReaderImpl only scans DV indexes when !isStreaming, so the IncrementalSplit built for this DELETE+ADD pair has both beforeDeletionFiles and afterDeletionFiles as null. IncrementalChangelogReadProvider then reads the same physical data file on both sides without applying either snapshot's DV: restoring from a DV snapshot back to a no-DV snapshot emits a full-file retract plus a full-file add instead of only adding the previously deleted rows (and the opposite direction similarly fails to delete only the DV rows). The new test only checks that the delta manifest is non-empty, so it does not catch this consumer-path behavior. Please either make the restore streaming delta carry the snapshot-specific DV files, or cover this with a streaming changelog/diff read test that proves the logical rows produced by a DV-only restore are correct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants