From 85e05e796d40534bc9b6f8d821531349a4a30a4a Mon Sep 17 00:00:00 2001 From: Yunfeng Zhou Date: Tue, 9 Jun 2026 16:17:37 +0800 Subject: [PATCH 1/6] [flink] Support stream read Chain Table --- docs/docs/primary-key-table/chain-table.md | 53 +- .../paimon/table/ChainGroupReadTable.java | 7 +- .../table/ChainTableFileStoreTable.java | 133 ++ .../paimon/table/ChainTableStreamScan.java | 427 +++++ .../paimon/table/FileStoreTableFactory.java | 2 +- .../paimon/table/source/ChainSplit.java | 18 + .../source/ContinuousFileSplitEnumerator.java | 32 + .../flink/source/FlinkSourceBuilder.java | 6 + .../paimon/flink/FlinkChainTableITCase.java | 1442 +++++++++++++++++ 9 files changed, 2116 insertions(+), 4 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java diff --git a/docs/docs/primary-key-table/chain-table.md b/docs/docs/primary-key-table/chain-table.md index 0cb2984b4cf5..b018c3f5130b 100644 --- a/docs/docs/primary-key-table/chain-table.md +++ b/docs/docs/primary-key-table/chain-table.md @@ -129,7 +129,6 @@ ALTER TABLE `default`.`t$branch_delta` SET ( Notice that: - Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table. - Chain table should ensure that the schema of each branch is consistent. -- Both Spark and Flink batch read/write are supported. Flink streaming read/write is not supported. - Deletion vector is not supported for chain table. After creating a chain table, you can read and write data in the following ways. @@ -216,6 +215,58 @@ you will get the following result: +---+----+-----+ ``` +## Streaming Read + +Chain tables support Flink streaming read. A streaming read job operates in two phases: + +1. **Full load phase**: Produces a full result by reading the latest snapshot partition (per group) + and delta partitions that come after it. For each partition group, only the most recent snapshot + partition is included — older snapshot partitions are considered outdated and excluded. +2. **Incremental phase**: Continuously reads new commits from the delta branch as they arrive. + +### Write-Side Requirements + +Streaming read assumes the chain table follows the standard write pattern described at the top of +this page: + +- **Snapshot branch** receives periodic full data (e.g., a daily ODS binlog dump job writes via + `INSERT OVERWRITE t$branch_snapshot`). Each snapshot partition represents a complete view of the + data at that point in time. +- **Delta branch** receives incremental changes between snapshots (e.g., a batch job writes the + current day's new/updated records via `INSERT INTO t$branch_delta`). Each delta partition + contains only the changes for that period. The delta branch must have a + [changelog producer](./changelog-producer) configured (e.g., `'changelog-producer' = 'input'`) + for streaming read to work. + +The streaming read relies on this pattern to produce correct results. After the full load phase, +only new delta branch commits are picked up — writes to the snapshot branch do not trigger +streaming output. To incorporate a new snapshot, restart the streaming job. + +### Usage + +```sql +SET 'execution.runtime-mode' = 'streaming'; + +INSERT INTO downstream_sink SELECT * FROM default.t; +``` + +### Limitations + +- The incremental phase only monitors the **delta branch**. Writes to the snapshot branch are + not detected until the streaming job is restarted. +- The chain-table-aware streaming scan only supports the default startup mode (`latest-full`). + When the user specifies an explicit starting position — such as `scan.snapshot-id`, + `scan.timestamp-millis`, `scan.mode = 'latest'`, or `consumer-id` with existing progress — + an `UnsupportedOperationException` is thrown. To use standard streaming read without chain + table logic, read from a specific branch table (e.g., `t$branch_delta`) instead of the main + table. +- Partition filters are not supported in chain table streaming reads. Specifying a partition + filter — either via a `WHERE` clause on partition columns or the `scan.partitions` table + option — throws an `UnsupportedOperationException`. This is because the chain table streaming + scan determines which partitions to read based on the chain-merge logic across snapshot and + delta branches, and applying a partition filter would interfere with this logic. To read a + specific partition, use batch mode instead. + ## Group Partition In real-world scenarios, a table often has multiple partition dimensions. For example, data may be diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java index 95a591ebb4d6..c65c287aecfb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java @@ -535,8 +535,11 @@ public TableRead withIOManager(IOManager ioManager) { @Override public RecordReader createReader(Split split) throws IOException { - checkArgument(split instanceof ChainSplit); - return fallbackRead.createReader(split); + if (split instanceof ChainSplit || split instanceof DataSplit) { + return fallbackRead.createReader(split); + } + throw new IllegalArgumentException( + "Unsupported split type for chain table read: " + split.getClass().getName()); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java new file mode 100644 index 000000000000..350247949faf --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.CoreOptions.StartupMode; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.source.StreamDataTableScan; + +import java.util.Map; + +/** + * Chain-table-aware extension of {@link FallbackReadFileStoreTable}. Inherits the batch read + * behavior (partition-level fallback between the current branch and {@link ChainGroupReadTable}), + * and additionally overrides {@link #newStreamScan()} to return a chain-aware {@link + * ChainTableStreamScan} that performs a partition-level full load followed by incremental + * delta-only streaming. + */ +public class ChainTableFileStoreTable extends FallbackReadFileStoreTable { + + public ChainTableFileStoreTable(FileStoreTable wrapped, FileStoreTable other) { + super(wrapped, other, true); + } + + @Override + public StreamDataTableScan newStreamScan() { + CoreOptions coreOptions = wrapped.coreOptions(); + + StartupMode effectiveMode = coreOptions.startupMode(); + boolean hasConsumerProgress = + coreOptions.consumerId() != null && !coreOptions.consumerIgnoreProgress(); + if (effectiveMode != StartupMode.LATEST_FULL || hasConsumerProgress) { + String reason = + describeUnsupportedMode(coreOptions, effectiveMode, hasConsumerProgress); + throw new UnsupportedOperationException( + "Chain table streaming read does not support startup mode '" + + reason + + "'. " + + "Chain table streaming only supports the default 'latest-full' mode, which first " + + "produces a partition-level full result and then continuously reads incremental " + + "data from the delta branch.\n" + + "Suggestions:\n" + + " - To use chain table streaming: remove the explicit scan mode/position settings " + + "so that the default 'latest-full' mode is used.\n" + + " - To use standard streaming read without chain table logic: read from a " + + "specific branch table (e.g., 't$branch_delta') instead of the main table."); + } + + // Inherited other() returns the ChainGroupReadTable directly. + ChainGroupReadTable chainGroupReadTable = (ChainGroupReadTable) other(); + + return new ChainTableStreamScan(chainGroupReadTable); + } + + private static String describeUnsupportedMode( + CoreOptions coreOptions, StartupMode effectiveMode, boolean hasConsumerProgress) { + if (hasConsumerProgress) { + return "consumer-id with existing progress"; + } + switch (effectiveMode) { + case LATEST: + return "scan.mode=latest"; + case FROM_SNAPSHOT: + if (coreOptions.scanSnapshotId() != null) { + return "scan.snapshot-id=" + coreOptions.scanSnapshotId(); + } + if (coreOptions.scanTagName() != null) { + return "scan.tag-name=" + coreOptions.scanTagName(); + } + if (coreOptions.scanWatermark() != null) { + return "scan.watermark=" + coreOptions.scanWatermark(); + } + return "from-snapshot"; + case FROM_TIMESTAMP: + if (coreOptions.scanTimestampMills() != null) { + return "scan.timestamp-millis=" + coreOptions.scanTimestampMills(); + } + if (coreOptions.scanTimestamp() != null) { + return "scan.timestamp=" + coreOptions.scanTimestamp(); + } + return "from-timestamp"; + default: + return effectiveMode.name().toLowerCase().replace('_', '-'); + } + } + + @Override + public FileStoreTable copy(Map dynamicOptions) { + return new ChainTableFileStoreTable( + wrapped.copy(dynamicOptions), other().copy(rewriteOtherOptions(dynamicOptions))); + } + + @Override + public FileStoreTable copy(TableSchema newTableSchema) { + return new ChainTableFileStoreTable( + wrapped.copy(newTableSchema), + other().copy(newTableSchema.copy(rewriteOtherOptions(newTableSchema.options())))); + } + + @Override + public FileStoreTable copyWithoutTimeTravel(Map dynamicOptions) { + return new ChainTableFileStoreTable( + wrapped.copyWithoutTimeTravel(dynamicOptions), + other().copyWithoutTimeTravel(rewriteOtherOptions(dynamicOptions))); + } + + @Override + public FileStoreTable copyWithLatestSchema() { + return new ChainTableFileStoreTable( + wrapped.copyWithLatestSchema(), other().copyWithLatestSchema()); + } + + @Override + public FileStoreTable switchToBranch(String branchName) { + return new ChainTableFileStoreTable(switchWrappedToBranch(branchName), other()); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java new file mode 100644 index 000000000000..7378744cf5a4 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.predicate.PartitionPredicateVisitor; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.source.ChainSplit; +import org.apache.paimon.table.source.DataFilePlan; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.DataTableStreamScan; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.table.source.SnapshotNotExistPlan; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.StreamDataTableScan; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.table.source.snapshot.StartingContext; +import org.apache.paimon.utils.ChainPartitionProjector; +import org.apache.paimon.utils.ChainTableUtils; +import org.apache.paimon.utils.SnapshotManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Streaming scan for chain tables with a two-phase design: + * + *
    + *
  • Phase 1 (Starting): Outputs the latest snapshot partition (per group) and delta + * partitions that come after it. Older snapshot partitions are excluded as they are + * considered outdated. Each primary key appears exactly once under its natural partition. + * Unlike batch full scan, anchor-based chain merging is intentionally skipped to keep Phase 1 + * lightweight — this avoids split explosion in long-running jobs with many partitions. + *
  • Phase 2 (Incremental): Stream new snapshots from the delta branch only, picking up + * from where Phase 1 left off. + *
+ * + *

Checkpoint state is a single {@code Long} — the delta branch's next snapshot id. On stateful + * restart, Phase 1 is skipped and incremental streaming resumes from the checkpointed position. On + * stateless restart (null state), a fresh starting scan is performed. + */ +public class ChainTableStreamScan implements StreamDataTableScan { + + private static final Logger LOG = LoggerFactory.getLogger(ChainTableStreamScan.class); + + private final ChainGroupReadTable chainGroupReadTable; + + /** Phase 1: batch scan used to access snapshot branch data via {@code mainScan}. */ + private final ChainGroupReadTable.ChainTableBatchScan batchScan; + + /** Phase 2: delta-only stream scan. */ + private final DataTableStreamScan deltaStreamScan; + + /** Projector for splitting full partition into group and chain parts. */ + private final ChainPartitionProjector partitionProjector; + + /** Comparator for chain partition keys only. */ + private final RecordComparator chainPartitionComparator; + + /** Partition keys of the table, used to reject partition filters in streaming mode. */ + private final List partitionKeys; + + /** + * Checkpoint state: the next delta snapshot id to read. Null before Phase 1 completes; non-null + * once Phase 1 is done or after a stateful restore. + */ + @Nullable private Long nextDeltaSnapshotId; + + /** Whether the starting plan (Phase 1) has been completed. */ + private boolean startingDone = false; + + /** Predicates and shard for applying to local scans created in {@link #planStarting()}. */ + private final List predicates = new ArrayList<>(); + + private int shardIndex = -1; + + private int shardCount = -1; + + public ChainTableStreamScan(ChainGroupReadTable chainGroupReadTable) { + this.chainGroupReadTable = chainGroupReadTable; + this.batchScan = + new ChainGroupReadTable.ChainTableBatchScan( + chainGroupReadTable.schema(), chainGroupReadTable); + this.deltaStreamScan = (DataTableStreamScan) chainGroupReadTable.other().newStreamScan(); + + // Initialize partition projector and chain comparator using the established pattern + // from ChainTableBatchScan. + List chainKeys = + ChainTableUtils.chainPartitionKeys( + chainGroupReadTable.coreOptions(), + chainGroupReadTable.schema().partitionKeys()); + this.partitionProjector = + new ChainPartitionProjector( + chainGroupReadTable.schema().logicalPartitionType(), chainKeys.size()); + this.chainPartitionComparator = + CodeGenUtils.newRecordComparator( + partitionProjector.chainPartitionType().getFieldTypes()); + this.partitionKeys = chainGroupReadTable.schema().partitionKeys(); + } + + @Override + public StartingContext startingContext() { + if (!startingDone) { + return StartingContext.EMPTY; + } + return deltaStreamScan.startingContext(); + } + + @Override + public TableScan.Plan plan() { + if (!startingDone) { + return planStarting(); + } + TableScan.Plan plan = deltaStreamScan.plan(); + // Never return SnapshotNotExistPlan — it would cause the Flink enumerator to + // set stopTriggerScan=true and permanently stop polling for new data. + if (plan instanceof SnapshotNotExistPlan) { + return new DataFilePlan<>(Collections.emptyList()); + } + return plan; + } + + /** + * Starting plan: outputs the latest snapshot partition (per group) and delta partitions that + * come after it. Older snapshot partitions are excluded. Each primary key appears exactly once + * under its natural partition. + * + *

Unlike batch full scan, anchor-based chain merging is not performed. This keeps Phase 1 + * lightweight for long-running jobs. + */ + private TableScan.Plan planStarting() { + FileStoreTable deltaTable = chainGroupReadTable.other(); + String deltaBranch = deltaTable.coreOptions().branch(); + String snapshotBranch = chainGroupReadTable.wrapped.coreOptions().branch(); + + Long latestId = captureDeltaPosition(deltaTable); + + // 1. Read delta branch data at the pinned snapshot, grouped by partition. + Map> deltaSplitsByPartition; + if (latestId != null) { + FileStoreTable pinnedDelta = + deltaTable.copy( + Collections.singletonMap("scan.snapshot-id", String.valueOf(latestId))); + DataTableScan pinnedDeltaScan = pinnedDelta.newScan(); + applyPredicatesAndShard(pinnedDeltaScan); + deltaSplitsByPartition = groupByPartition(pinnedDeltaScan); + } else { + deltaSplitsByPartition = Collections.emptyMap(); + } + + // 2. List snapshot partitions (lightweight — partition metadata only, no file I/O). + // Find the latest chain partition per group, then scan only those partitions for files. + // This avoids reading file manifests for hundreds of historical partitions that will be + // discarded (only the latest per group is kept). + Map latestChainPartitionPerGroup = new HashMap<>(); + if (chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null) { + DataTableScan partitionListingScan = chainGroupReadTable.wrapped.newScan(); + for (BinaryRow partition : partitionListingScan.listPartitions()) { + Object groupKey = toGroupKey(partition); + BinaryRow existingLatest = latestChainPartitionPerGroup.get(groupKey); + if (existingLatest == null + || chainPartitionComparator.compare( + partitionProjector.extractChainPartition(partition), + partitionProjector.extractChainPartition(existingLatest)) + > 0) { + latestChainPartitionPerGroup.put(groupKey, partition); + } + } + } + + // 3. Scan file splits for latest snapshot partitions only. + List latestPartitions = new ArrayList<>(latestChainPartitionPerGroup.values()); + Map> snapshotSplitsByPartition; + if (!latestPartitions.isEmpty()) { + DataTableScan snapshotScan = chainGroupReadTable.wrapped.newScan(); + snapshotScan.withPartitionFilter(latestPartitions); + applyPredicatesAndShard(snapshotScan); + snapshotSplitsByPartition = groupByPartition(snapshotScan); + } else { + snapshotSplitsByPartition = Collections.emptyMap(); + } + + // 4. Build ChainSplits: + // - Snapshot partitions are already filtered to latest per group. + // - Delta partitions: include if (a) chain key > latest for that group, or + // (b) no snapshot exists for that group. + List allSplits = new ArrayList<>(); + + for (Map.Entry> entry : snapshotSplitsByPartition.entrySet()) { + for (DataSplit ds : entry.getValue()) { + allSplits.add(ChainSplit.from(ds, snapshotBranch)); + } + } + + for (Map.Entry> entry : deltaSplitsByPartition.entrySet()) { + BinaryRow partition = entry.getKey(); + Object groupKey = toGroupKey(partition); + BinaryRow latestPartition = latestChainPartitionPerGroup.get(groupKey); + // Include delta partition if: + // - No snapshot exists for this group, OR + // - Chain key > latest snapshot chain key + if (latestPartition == null + || chainPartitionComparator.compare( + partitionProjector.extractChainPartition(partition), + partitionProjector.extractChainPartition(latestPartition)) + > 0) { + for (DataSplit ds : entry.getValue()) { + allSplits.add(ChainSplit.from(ds, deltaBranch)); + } + } + } + + LOG.info( + "ChainTableStreamScan.planStarting [snapshot={}, delta={}]: " + + "{} delta partitions, {} snapshot partitions, " + + "{} latest snapshot groups, {} total splits", + snapshotBranch, + deltaBranch, + deltaSplitsByPartition.size(), + snapshotSplitsByPartition.size(), + latestChainPartitionPerGroup.size(), + allSplits.size()); + + startingDone = true; + return new DataFilePlan<>(allSplits); + } + + /** + * Captures the delta branch's latest snapshot id and positions the Phase 2 stream scan to start + * from the next snapshot. This makes the Phase 1 / Phase 2 boundary deterministic: Phase 1 + * reads delta data pinned at the returned snapshot id, Phase 2 starts from the snapshot after. + * + * @return the latest delta snapshot id, or {@code null} if the delta branch has no snapshots + */ + @Nullable + private Long captureDeltaPosition(FileStoreTable deltaTable) { + SnapshotManager deltaSnapshotManager = deltaTable.snapshotManager(); + Long latestId = deltaSnapshotManager.latestSnapshotId(); + nextDeltaSnapshotId = latestId != null ? latestId + 1 : Snapshot.FIRST_SNAPSHOT_ID; + LOG.info( + "ChainTableStreamScan: pinned delta branch '{}' at snapshot {}, " + + "nextDeltaSnapshotId={}", + deltaTable.coreOptions().branch(), + latestId, + nextDeltaSnapshotId); + deltaStreamScan.restore(nextDeltaSnapshotId); + return latestId; + } + + /** Plans a scan and groups the resulting splits by partition. */ + private static Map> groupByPartition(DataTableScan scan) { + Map> grouped = new LinkedHashMap<>(); + for (Split s : scan.plan().splits()) { + DataSplit ds = (DataSplit) s; + grouped.computeIfAbsent(ds.partition(), k -> new ArrayList<>()).add(ds); + } + return grouped; + } + + /** + * Extracts a stable group key from a full partition row. When there is no group partition (all + * fields are chain keys), returns a shared singleton to avoid zero-field {@link BinaryRow} + * instances that may have inconsistent {@code hashCode}/{@code equals} across different + * partitions. + */ + private Object toGroupKey(BinaryRow fullPartition) { + if (!partitionProjector.hasGroupPartition()) { + return Collections.emptyList(); + } + return partitionProjector.extractGroupPartition(fullPartition); + } + + @Override + public InnerTableScan withFilter(Predicate predicate) { + if (predicate == null) { + return this; + } + if (!partitionKeys.isEmpty() + && predicate.visit(new PartitionPredicateVisitor(partitionKeys))) { + throw new UnsupportedOperationException( + "Partition filter is not supported in chain table streaming read. " + + "The chain table streaming scan determines which partitions to read " + + "based on the chain-merge logic across snapshot and delta branches. " + + "Applying a partition filter would interfere with this logic. " + + "If you need to read a specific partition, use batch mode instead."); + } + predicates.add(predicate); + batchScan.withFilter(predicate); + deltaStreamScan.withFilter(predicate); + return this; + } + + @Override + public InnerTableScan withPartitionFilter(Map partitionSpec) { + throw new UnsupportedOperationException( + "Partition filter is not supported in chain table streaming read."); + } + + @Override + public InnerTableScan withPartitionFilter(List partitions) { + throw new UnsupportedOperationException( + "Partition filter is not supported in chain table streaming read."); + } + + @Override + public InnerTableScan withPartitionFilter(PartitionPredicate partitionPredicate) { + if (partitionPredicate != null) { + throw new UnsupportedOperationException( + "Partition filter is not supported in chain table streaming read."); + } + return this; + } + + @Override + public InnerTableScan withPartitionFilter(Predicate predicate) { + if (predicate != null) { + throw new UnsupportedOperationException( + "Partition filter is not supported in chain table streaming read."); + } + return this; + } + + @Override + public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { + shardIndex = indexOfThisSubtask; + shardCount = numberOfParallelSubtasks; + batchScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks); + deltaStreamScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks); + return this; + } + + /** + * Applies all previously set predicates and shard to a newly created scan. Used for the pinned + * delta scan in {@link #planStarting()}. + */ + private void applyPredicatesAndShard(DataTableScan scan) { + for (Predicate p : predicates) { + scan.withFilter(p); + } + if (shardIndex >= 0) { + scan.withShard(shardIndex, shardCount); + } + } + + @Nullable + @Override + public Long checkpoint() { + if (startingDone) { + return deltaStreamScan.checkpoint(); + } + return nextDeltaSnapshotId; + } + + @Nullable + @Override + public Long watermark() { + if (!startingDone) { + return null; + } + return deltaStreamScan.watermark(); + } + + @Override + public void restore(@Nullable Long nextSnapshotId) { + this.nextDeltaSnapshotId = nextSnapshotId; + if (nextSnapshotId != null) { + startingDone = true; + deltaStreamScan.restore(nextSnapshotId); + } else { + startingDone = false; + } + } + + @Override + public void restore(@Nullable Long nextSnapshotId, boolean scanAllSnapshot) { + if (scanAllSnapshot) { + startingDone = false; + this.nextDeltaSnapshotId = nextSnapshotId; + // No need to call deltaStreamScan.restore() here — Phase 1 will re-run and + // captureDeltaPosition() will re-position the delta stream scan. + } else { + restore(nextSnapshotId); + } + } + + @Override + public void notifyCheckpointComplete(@Nullable Long nextSnapshot) { + deltaStreamScan.notifyCheckpointComplete(nextSnapshot); + } + + @Override + public List listPartitionEntries() { + throw new UnsupportedOperationException( + "List Partition Entries is not supported in Chain Table Stream Scan."); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index e4a865679910..53be2803ce70 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -181,7 +181,7 @@ public static FileStoreTable createChainTable( catalogEnvironment); FileStoreTable chainGroupFileStoreTable = new ChainGroupReadTable(snapshotTable, deltaTable); - return new FallbackReadFileStoreTable(table, chainGroupFileStoreTable, true); + return new ChainTableFileStoreTable(table, chainGroupFileStoreTable); } private static FileStoreTable createOtherBranchTable( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java index dfa364f96a94..6b3512c615df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java @@ -79,6 +79,24 @@ public Map fileBucketPathMapping() { return fileBucketPathMapping; } + /** + * Creates a {@link ChainSplit} from a {@link DataSplit} where all data files belong to the same + * branch. + */ + public static ChainSplit from(DataSplit dataSplit, String branch) { + HashMap fileBranchMapping = new HashMap<>(); + HashMap fileBucketPathMapping = new HashMap<>(); + for (DataFileMeta file : dataSplit.dataFiles()) { + fileBranchMapping.put(file.fileName(), branch); + fileBucketPathMapping.put(file.fileName(), dataSplit.bucketPath()); + } + return new ChainSplit( + dataSplit.partition(), + dataSplit.dataFiles(), + fileBranchMapping, + fileBucketPathMapping); + } + @Override public long rowCount() { long sum = 0; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java index 77aeeb85081d..a8c77aab7fd6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java @@ -25,6 +25,7 @@ import org.apache.paimon.postpone.PostponeBucketFileStoreWrite; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.sink.ChannelComputer; +import org.apache.paimon.table.source.ChainSplit; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.IncrementalSplit; @@ -328,6 +329,8 @@ protected synchronized void assignSplits() { protected int assignSuggestedTask(FileStoreSourceSplit split) { if (split.split() instanceof DataSplit) { return assignSuggestedTask((DataSplit) split.split()); + } else if (split.split() instanceof ChainSplit) { + return assignSuggestedTask((ChainSplit) split.split()); } else { return assignSuggestedTask((IncrementalSplit) split.split()); } @@ -364,6 +367,35 @@ protected int assignSuggestedTask(IncrementalSplit split) { } } + protected int assignSuggestedTask(ChainSplit split) { + int parallelism = context.currentParallelism(); + // Extract bucket id from the bucket path stored in fileBucketPathMapping. + // The bucket path ends with "bucket-{id}". + int bucketId = 0; + if (!split.fileBucketPathMapping().isEmpty()) { + String bucketPath = split.fileBucketPathMapping().values().iterator().next(); + int lastSlash = bucketPath.lastIndexOf('/'); + if (lastSlash >= 0) { + String bucketDir = bucketPath.substring(lastSlash + 1); + if (bucketDir.startsWith("bucket-")) { + try { + bucketId = Integer.parseInt(bucketDir.substring("bucket-".length())); + } catch (NumberFormatException e) { + LOG.warn( + "Failed to parse bucket id from path '{}', falling back to 0.", + bucketPath, + e); + } + } + } + } + if (shuffleBucketWithPartition) { + return ChannelComputer.select(split.logicalPartition(), bucketId, parallelism); + } else { + return ChannelComputer.select(bucketId, parallelism); + } + } + protected SplitAssigner createSplitAssigner(boolean unordered) { return unordered ? new FIFOSplitAssigner(Collections.emptyList()) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index 3e96dec1ea50..2e9c8ae494c2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -326,6 +326,12 @@ public DataStream build() { TableScanUtils.streamingReadingValidate(table); if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) { + if (conf.get(CoreOptions.CHAIN_TABLE_ENABLED)) { + throw new UnsupportedOperationException( + "Chain table streaming is not compatible with checkpoint-align mode. " + + "Please disable 'source.checkpoint-align.enabled' when reading " + + "a chain table in streaming mode."); + } return buildAlignedContinuousFileSource(); } else if (conf.contains(CoreOptions.CONSUMER_ID) && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java index 8ac39d14308a..ab7727a00eea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java @@ -18,14 +18,40 @@ package org.apache.paimon.flink; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.flink.sink.FlinkSinkBuilder; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.table.ChainTableStreamScan; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.utils.BlockingIterator; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableResult; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT cases for chain table using Flink SQL. */ public class FlinkChainTableITCase extends CatalogITCaseBase { @@ -688,4 +714,1420 @@ public void testChainTableWithGroupPartition() throws Exception { .containsExactlyInAnyOrder( "+I[2, 2, 1-1, CN, 20250811]", "+I[4, 1, 1, CN, 20250811]"); } + + /** Write Row data (with RowKind) to a specific branch using DataStream API. */ + private void writeChangelogToBranch(String db, String tableName, String branch, Row... rows) + throws Exception { + FileStoreTable table = paimonTable(tableName + "$branch_" + branch); + + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(100) + .parallelism(1) + .build(); + + DataStream stream = env.fromCollection(Arrays.asList(rows)); + + new FlinkSinkBuilder(table) + .forRow( + stream, + DataTypes.ROW( + DataTypes.FIELD("k", DataTypes.BIGINT()), + DataTypes.FIELD("seq", DataTypes.BIGINT()), + DataTypes.FIELD("v", DataTypes.STRING()), + DataTypes.FIELD("dt", DataTypes.STRING()))) + .build(); + env.execute(); + } + + /** + * Collect n rows from a streaming iterator with a timeout. If no data arrives within + * timeoutSeconds, the iterator is closed and an AssertionError is thrown. This is necessary + * because it.next() blocks indefinitely when no data is available, and JUnit @Timeout cannot + * interrupt it. + */ + /** + * Collects {@code n} rows from a streaming iterator using the project-standard {@link + * BlockingIterator}. + */ + private List collectRows(CloseableIterator it, int n) throws Exception { + return BlockingIterator.of(it).collect(n, 30, TimeUnit.SECONDS).stream() + .map(Row::toString) + .collect(Collectors.toList()); + } + + /** + * Polls the given table until it contains at least {@code minRows} rows. Used instead of + * fixed-duration Thread.sleep to avoid flaky tests on slow CI. + */ + private void waitForRowCount(String tableName, int minRows) throws Exception { + long deadline = System.currentTimeMillis() + 60_000; + int count = 0; + while (System.currentTimeMillis() < deadline) { + List rows = sql("SELECT * FROM " + tableName); + count = rows.size(); + if (count >= minRows) { + return; + } + Thread.sleep(1000); + } + throw new AssertionError( + "Timed out waiting for " + minRows + " rows in " + tableName + ", got " + count); + } + + /** + * Tests the streaming read lifecycle for a chain table with changelog-producer=input. + * + *

Verifies: initial full read from delta-only → delta incremental visible with changelog + * records (-U/+U) → snapshot OVERWRITE has no effect → more delta visible → stateless restart + * reads chain-merged state. + */ + @Test + @Timeout(120) + public void testStreamingReadChainTableLifecycleWithInputChangelog() throws Exception { + // Create chain table with changelog-producer=input + sql( + "CREATE TABLE chain_life_cl (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_life_cl', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_life_cl', 'delta')", db); + for (String tbl : + new String[] { + "chain_life_cl", "chain_life_cl$branch_snapshot", "chain_life_cl$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // === Phase 1: Delta-only initial data (all inserts) === + sql( + "INSERT INTO `chain_life_cl$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 'base_3')," + + " (4, 1, 'base_4'), (5, 1, 'base_5')"); + + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_life_cl").collect(); + + List phase1 = collectRows(it, 5); + assertThat(phase1) + .containsExactlyInAnyOrder( + "+I[1, 1, base_1, 20250808]", + "+I[2, 1, base_2, 20250808]", + "+I[3, 1, base_3, 20250808]", + "+I[4, 1, base_4, 20250808]", + "+I[5, 1, base_5, 20250808]"); + + // === Phase 2: Write changelog data (with -U/+U for update) via DataStream API === + writeChangelogToBranch( + db, + "chain_life_cl", + "delta", + Row.ofKind(RowKind.UPDATE_BEFORE, 3L, 1L, "base_3", "20250809"), + Row.ofKind(RowKind.UPDATE_AFTER, 3L, 2L, "upd_3", "20250809"), + Row.ofKind(RowKind.INSERT, 6L, 1L, "new_6", "20250809"), + Row.ofKind(RowKind.INSERT, 7L, 1L, "new_7", "20250809")); + + List phase2 = collectRows(it, 4); + // changelog-producer=input: explicit -U/+U for updates + assertThat(phase2) + .containsExactlyInAnyOrder( + "-U[3, 1, base_3, 20250809]", + "+U[3, 2, upd_3, 20250809]", + "+I[6, 1, new_6, 20250809]", + "+I[7, 1, new_7, 20250809]"); + + // === Phase 3: Snapshot OVERWRITE should have NO effect === + sql( + "INSERT OVERWRITE `chain_life_cl$branch_snapshot` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 'base_3')," + + " (4, 1, 'base_4'), (5, 1, 'base_5')"); + + // Write delta AFTER snapshot — this proves snapshot writes don't trigger output. + // If snapshot writes were detected, we'd see duplicate or unexpected rows. + writeChangelogToBranch( + db, + "chain_life_cl", + "delta", + Row.ofKind(RowKind.INSERT, 100L, 1L, "phase3_probe", "20250810")); + + List phase3 = collectRows(it, 1); + assertThat(phase3) + .as("Only delta write should produce output, snapshot OVERWRITE should be ignored") + .containsExactlyInAnyOrder("+I[100, 1, phase3_probe, 20250810]"); + + // === Phase 4: Write more delta via DataStream API === + writeChangelogToBranch( + db, + "chain_life_cl", + "delta", + Row.ofKind(RowKind.INSERT, 8L, 1L, "new_8", "20250810"), + Row.ofKind(RowKind.INSERT, 9L, 1L, "new_9", "20250810")); + + List phase4 = collectRows(it, 2); + assertThat(phase4) + .containsExactlyInAnyOrder( + "+I[8, 1, new_8, 20250810]", "+I[9, 1, new_9, 20250810]"); + + // Terminate first streaming job + it.close(); + + // === Phase 5: Stateless restart === + CloseableIterator it2 = sEnv.executeSql("SELECT * FROM chain_life_cl").collect(); + + // Phase 5 starting (matching batch semantics): + // - snapshot@20250808: k=1-5 (snapshot wins, delta@20250808 skipped since same partition + // exists in snapshot; same values here since OVERWRITE wrote identical base data) + // - delta@20250809: changelog records (+U for update, +I for inserts) + // - delta@20250810: k=8,9,100 (delta-only, no snapshot for this partition) + // Total: 11 unique rows (PK=(dt,k) makes each (dt,k) pair distinct). + List restart = collectRows(it2, 11); + assertThat(restart) + .containsExactlyInAnyOrder( + "+I[1, 1, base_1, 20250808]", + "+I[2, 1, base_2, 20250808]", + "+I[3, 1, base_3, 20250808]", + "+U[3, 2, upd_3, 20250809]", + "+I[4, 1, base_4, 20250808]", + "+I[5, 1, base_5, 20250808]", + "+I[6, 1, new_6, 20250809]", + "+I[7, 1, new_7, 20250809]", + "+I[8, 1, new_8, 20250810]", + "+I[9, 1, new_9, 20250810]", + "+I[100, 1, phase3_probe, 20250810]"); + + // Continue writing delta + writeChangelogToBranch( + db, + "chain_life_cl", + "delta", + Row.ofKind(RowKind.INSERT, 10L, 1L, "new_10", "20250811"), + Row.ofKind(RowKind.INSERT, 11L, 1L, "new_11", "20250811")); + + List phase5b = collectRows(it2, 2); + assertThat(phase5b) + .containsExactlyInAnyOrder( + "+I[10, 1, new_10, 20250811]", "+I[11, 1, new_11, 20250811]"); + + it2.close(); + } + + /** + * Tests stateful restart of a chain table streaming read job using Flink checkpoint/restore. + * + *

Phase 1: Write initial delta data, start streaming job. Phase 2: Write incremental delta, + * let Phase 2 consume it, then checkpoint and cancel. Phase 3: Write new delta data while the + * job is down. Phase 4: Restart from checkpoint — the restored scan must NOT re-read the + * already-consumed delta (verifies checkpoint() returns the advanced cursor, not the stale + * Phase 1 boundary). Phase 5: Verify incremental streaming continues after restore. + */ + @Test + @Timeout(180) + public void testStreamingReadChainTableStatefulRestart() throws Exception { + // Create chain table (source) + sql( + "CREATE TABLE chain_restart (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + // Create a Paimon PK sink table. The Paimon sink supports upsert + // (primary key), so the planner won't need ChangelogNormalize. + // Paimon sink does NOT implement CheckpointedFunction (it uses operator + // state for in-flight files, committed during checkpoint complete), so no + // buffer leakage on checkpoint recovery — unlike CollectSinkFunction. + sql( + "CREATE TABLE chain_restart_sink (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING," + + " PRIMARY KEY (dt, k) NOT ENFORCED" + + ") PARTITIONED BY (dt) WITH (" + + " 'bucket' = '2'," + + " 'merge-engine' = 'deduplicate'," + + " 'sequence.field' = 'seq'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_restart', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_restart', 'delta')", db); + for (String tbl : + new String[] { + "chain_restart", "chain_restart$branch_snapshot", "chain_restart$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Configure checkpoint for stateful restart + org.apache.flink.configuration.Configuration config = sEnv.getConfig().getConfiguration(); + config.setString("state.checkpoints.dir", "file://" + path + "/checkpoints"); + config.set( + CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); + // Enable auto-checkpointing (1s) so Phase 2 data is committed before we take the + // savepoint. This ensures the enumerator's delta cursor has advanced past + // delta@20250809, which is the scenario the checkpoint() regression would break. + config.setString("execution.checkpointing.interval", "1000"); + + // Same SQL for both phases → operator graph matches → state recovery works + String streamSql = "INSERT INTO chain_restart_sink SELECT * FROM chain_restart"; + + // T4: Write snapshot data BEFORE starting streaming, so the starting phase + // exercises the snapshot+delta merge path (not just delta-only). + sql( + "INSERT INTO `chain_restart$branch_snapshot` PARTITION (dt = '20250807')" + + " VALUES (10, 1, 'snap_10'), (11, 1, 'snap_11')"); + + // === Phase 1: Write initial delta data and start streaming INSERT INTO === + sql( + "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 'base_3')"); + + TableResult tableResult = sEnv.executeSql(streamSql); + //noinspection OptionalGetWithoutIsPresent + JobClient jobClient = tableResult.getJobClient().get(); + + // === Phase 2: Write incremental delta, let Phase 2 consume it, THEN checkpoint === + // This exercises the checkpoint() regression: if checkpoint() returns the stale + // Phase 1 boundary instead of the advanced delta cursor, restore would re-read + // delta@20250809 and produce duplicates. + sql( + "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250809')" + + " VALUES (4, 1, 'new_4'), (5, 1, 'new_5')"); + + // Wait for auto-checkpoint to commit Phase 2 data to the sink. This proves the + // enumerator's scan has consumed delta@20250809 and its checkpoint() returned the + // advanced cursor — the exact scenario the regression would break. + waitForRowCount("chain_restart_sink", 7); + + // Create a savepoint and stop the job atomically. + // stopWithSavepoint guarantees the savepoint is fully committed to disk + // before returning. + String savepointDir = path + "/savepoints"; + new java.io.File(savepointDir).mkdirs(); + String checkpointPath = + jobClient + .stopWithSavepoint( + false, + savepointDir, + org.apache.flink.core.execution.SavepointFormatType.CANONICAL) + .get(); + + // Verify Phase 1+2 data (committed by checkpoint). + List phase1and2 = + sql("SELECT * FROM chain_restart_sink").stream() + .map(Row::toString) + .collect(Collectors.toList()); + assertThat(phase1and2) + .as("Phase 1+2: sink has snapshot, delta@20250808, and delta@20250809") + .containsExactlyInAnyOrder( + "+I[1, 1, base_1, 20250808]", + "+I[2, 1, base_2, 20250808]", + "+I[3, 1, base_3, 20250808]", + "+I[10, 1, snap_10, 20250807]", + "+I[11, 1, snap_11, 20250807]", + "+I[4, 1, new_4, 20250809]", + "+I[5, 1, new_5, 20250809]"); + + // === Phase 3: Write new delta data while job is stopped === + sql( + "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250810')" + + " VALUES (6, 1, 'new_6'), (7, 1, 'new_7')"); + + // === Phase 4: Restart from savepoint === + // The restored scan should NOT re-read delta@20250809 (already consumed before + // savepoint). If checkpoint() returned the stale Phase 1 boundary, delta@20250809 + // would be re-read and produce duplicates. + sEnv.getConfig() + .getConfiguration() + .setString("execution.state-recovery.path", checkpointPath); + + TableResult tableResult2 = sEnv.executeSql(streamSql); + //noinspection OptionalGetWithoutIsPresent + JobClient jobClient2 = tableResult2.getJobClient().get(); + + // Auto-checkpointing (1s interval) is still enabled, so data is committed + // to the sink automatically. waitForRowCount polls until data appears. + waitForRowCount("chain_restart_sink", 9); + + List phase4 = + sql("SELECT * FROM chain_restart_sink").stream() + .map(Row::toString) + .collect(Collectors.toList()); + + assertThat(phase4) + .as("Stateful restart: sink should contain new delta data") + .contains("+I[6, 1, new_6, 20250810]", "+I[7, 1, new_7, 20250810]"); + + assertThat(phase4.size()) + .as("Should have exactly 9 records (no duplicates from state recovery)") + .isEqualTo(9); + + // === Phase 5: Verify incremental streaming continues after restore === + sql( + "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250811')" + + " VALUES (8, 1, 'new_8')"); + + waitForRowCount("chain_restart_sink", 10); + + List phase5 = + sql("SELECT * FROM chain_restart_sink").stream() + .map(Row::toString) + .collect(Collectors.toList()); + assertThat(phase5) + .as("Incremental streaming should continue after restore") + .contains("+I[8, 1, new_8, 20250811]"); + + jobClient2.cancel().get(); + + // Clean up state-recovery config for other tests + sEnv.getConfig().getConfiguration().removeKey("execution.state-recovery.path"); + } + + /** + * T1: Tests streaming read with snapshot+delta overlap in the starting phase. Verifies that + * doFullLoad() correctly merges snapshot-only, delta-only, and overlapping partitions. + */ + @Test + @Timeout(120) + public void testStreamingReadWithSnapshotDeltaOverlap() throws Exception { + sql( + "CREATE TABLE chain_overlap (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_overlap', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_overlap', 'delta')", db); + for (String tbl : + new String[] { + "chain_overlap", "chain_overlap$branch_snapshot", "chain_overlap$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write snapshot data: dt=20250807 (snapshot-only) and dt=20250808 (overlapping) + sql( + "INSERT INTO `chain_overlap$branch_snapshot` PARTITION (dt = '20250807')" + + " VALUES (6, 1, 'snap_6'), (7, 1, 'snap_7'), (8, 1, 'snap_8')"); + sql( + "INSERT INTO `chain_overlap$branch_snapshot` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'snap_1'), (2, 1, 'snap_2'), (3, 1, 'snap_3')"); + + // Write delta data: dt=20250808 (overlapping) and dt=20250809 (delta-only) + sql( + "INSERT INTO `chain_overlap$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 2, 'delta_1'), (2, 2, 'delta_2')," + + " (4, 1, 'delta_4'), (5, 1, 'delta_5')"); + sql( + "INSERT INTO `chain_overlap$branch_delta` PARTITION (dt = '20250809')" + + " VALUES (10, 1, 'new_10'), (11, 1, 'new_11')"); + + // Start streaming read + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_overlap").collect(); + + // Starting behavior (new: only output latest snapshot partition and partitions after it): + // - Latest snapshot partition: dt=20250808 (k=1,2,3 from snapshot, delta at this partition + // is skipped because snapshot wins for overlapping) + // - Delta-only partition after latest snapshot: dt=20250809 (k=10,11) + // - dt=20250807 is NOT output because it's before the latest snapshot partition + List startingRows = collectRows(it, 5); + assertThat(startingRows) + .as("Starting: latest snapshot partition and delta partitions after it") + .containsExactlyInAnyOrder( + "+I[1, 1, snap_1, 20250808]", + "+I[2, 1, snap_2, 20250808]", + "+I[3, 1, snap_3, 20250808]", + "+I[10, 1, new_10, 20250809]", + "+I[11, 1, new_11, 20250809]"); + + // Incremental: write new delta and verify it streams through + writeChangelogToBranch( + db, + "chain_overlap", + "delta", + Row.ofKind(RowKind.INSERT, 20L, 1L, "incr_20", "20250810")); + + List incr = collectRows(it, 1); + assertThat(incr) + .as("Incremental: new delta data should stream through") + .containsExactlyInAnyOrder("+I[20, 1, incr_20, 20250810]"); + + it.close(); + } + + /** + * T2: Tests that non-default startup modes throw an error for chain table streaming read. When + * scan.mode=latest is specified, an {@link UnsupportedOperationException} is thrown with a + * helpful message. + */ + @Test + @Timeout(60) + public void testStreamingReadRejectsNonDefaultStartup() throws Exception { + sql( + "CREATE TABLE chain_bypass (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_bypass', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_bypass', 'delta')", db); + for (String tbl : + new String[] { + "chain_bypass", "chain_bypass$branch_snapshot", "chain_bypass$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write data to main table (so snapshots exist for copy() to resolve) + sql( + "INSERT INTO `chain_bypass$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + // Default mode: chain-table-aware scan (ChainTableStreamScan) + FileStoreTable table = paimonTable("chain_bypass"); + assertThat(table.newStreamScan()) + .as("Default startup mode should use ChainTableStreamScan") + .isInstanceOf(ChainTableStreamScan.class); + + // scan.mode=latest: should throw UnsupportedOperationException + FileStoreTable tableLatest = + table.copy(java.util.Collections.singletonMap("scan.mode", "latest")); + assertThatThrownBy(tableLatest::newStreamScan) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("scan.mode=latest") + .hasMessageContaining("Chain table streaming read does not support") + .hasMessageContaining("t$branch_delta"); + } + + /** + * T3: Tests that streaming read works with changelog-producer=none (the default). + * + *

Without a changelog producer, the incremental phase reads data files (delta manifest) + * rather than changelog files, producing only +I records. This is the same behavior as standard + * {@code DataTableStreamScan} with {@code changelog-producer=none}. + */ + @Test + @Timeout(60) + public void testStreamingReadWithNoChangelogProducer() throws Exception { + // Create chain table WITHOUT changelog-producer (defaults to none) + sql( + "CREATE TABLE chain_no_cl (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_no_cl', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_no_cl', 'delta')", db); + for (String tbl : + new String[] { + "chain_no_cl", "chain_no_cl$branch_snapshot", "chain_no_cl$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Phase 1: Insert initial data into delta branch + sql( + "INSERT INTO `chain_no_cl$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + // Start streaming read and collect Phase 1 results + try (CloseableIterator it = + sEnv.executeSql("SELECT k, v, dt FROM chain_no_cl").collect()) { + List phase1 = collectRows(it, 2); + assertThat(phase1) + .containsExactlyInAnyOrder("+I[1, v1, 20250808]", "+I[2, v2, 20250808]"); + + // Phase 2: Insert more data into delta branch (incremental) + sql( + "INSERT INTO `chain_no_cl$branch_delta` PARTITION (dt = '20250809')" + + " VALUES (3, 1, 'v3')"); + + List phase2 = collectRows(it, 1); + assertThat(phase2).containsExactly("+I[3, v3, 20250809]"); + } + } + + /** + * T6: Tests streaming read with group partitions (chain-partition-keys). Verifies that + * streaming works correctly when the table has a group dimension (e.g., region) and each group + * maintains its own independent chain. + */ + @Test + @Timeout(120) + public void testStreamingReadWithGroupPartition() throws Exception { + sql( + "CREATE TABLE chain_stream_group (" + + " k BIGINT, seq BIGINT, v STRING, region STRING, dt STRING" + + ") PARTITIONED BY (region, dt) WITH (" + + " 'primary-key' = 'region,dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'chain-table.chain-partition-keys' = 'dt'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_stream_group', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_stream_group', 'delta')", db); + for (String tbl : + new String[] { + "chain_stream_group", + "chain_stream_group$branch_snapshot", + "chain_stream_group$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write initial delta data for two regions + sql( + "INSERT INTO `chain_stream_group$branch_delta`" + + " PARTITION (region = 'CN', dt = '20250808')" + + " VALUES (1, 1, 'cn_1'), (2, 1, 'cn_2')"); + sql( + "INSERT INTO `chain_stream_group$branch_delta`" + + " PARTITION (region = 'US', dt = '20250808')" + + " VALUES (11, 1, 'us_11'), (12, 1, 'us_12')"); + + // Start streaming read + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_stream_group").collect(); + + // Starting: both regions, delta-only + List startingRows = collectRows(it, 4); + assertThat(startingRows) + .as("Starting: delta-only data for both regions") + .containsExactlyInAnyOrder( + "+I[1, 1, cn_1, CN, 20250808]", + "+I[2, 1, cn_2, CN, 20250808]", + "+I[11, 1, us_11, US, 20250808]", + "+I[12, 1, us_12, US, 20250808]"); + + // Incremental: write new delta for CN only + sql( + "INSERT INTO `chain_stream_group$branch_delta`" + + " PARTITION (region = 'CN', dt = '20250809')" + + " VALUES (3, 1, 'cn_3')"); + + List incr = collectRows(it, 1); + assertThat(incr) + .as("Incremental: new CN delta should stream through") + .containsExactlyInAnyOrder("+I[3, 1, cn_3, CN, 20250809]"); + + // Incremental: write new delta for US + sql( + "INSERT INTO `chain_stream_group$branch_delta`" + + " PARTITION (region = 'US', dt = '20250809')" + + " VALUES (13, 1, 'us_13')"); + + List incr2 = collectRows(it, 1); + assertThat(incr2) + .as("Incremental: new US delta should stream through") + .containsExactlyInAnyOrder("+I[13, 1, us_13, US, 20250809]"); + + it.close(); + } + + // ========================================================================= + // Additional coverage tests + // ========================================================================= + + /** Tests restore(id, scanAll=true): resets starting state but preserves delta position. */ + @Test + @Timeout(60) + public void testRestoreScanAll() throws Exception { + sql( + "CREATE TABLE chain_restore_all (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_restore_all', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_restore_all', 'delta')", db); + for (String tbl : + new String[] { + "chain_restore_all", + "chain_restore_all$branch_snapshot", + "chain_restore_all$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + sql( + "INSERT INTO `chain_restore_all$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + FileStoreTable table = paimonTable("chain_restore_all"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // Phase 1: starting + TableScan.Plan plan1 = scan.plan(); + assertThat(plan1.splits()).as("Phase 1 should produce splits").isNotEmpty(); + Long checkpoint = scan.checkpoint(); + assertThat(checkpoint).as("Checkpoint should be non-null after Phase 1").isNotNull(); + + // Phase 2: no new data → empty plan + TableScan.Plan plan2 = scan.plan(); + assertThat(plan2.splits()).as("Phase 2 with no new data should be empty").isEmpty(); + + // restore(id, scanAll=true): should reset to starting, preserve delta position + scan.restore(checkpoint, true); + assertThat(scan.checkpoint()) + .as("Checkpoint should be preserved after restore(id, true)") + .isEqualTo(checkpoint); + + // Starting should run again + TableScan.Plan plan3 = scan.plan(); + assertThat(plan3.splits()) + .as("Starting should produce splits again after restore(id, true)") + .isNotEmpty(); + + // Delta position should be the same (no new commits) + assertThat(scan.checkpoint()).as("Checkpoint should remain the same").isEqualTo(checkpoint); + } + + /** Tests restore(null, true): resets to fresh starting with no delta position. */ + @Test + @Timeout(60) + public void testRestoreNullScanAll() throws Exception { + sql( + "CREATE TABLE chain_restore_null (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_restore_null', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_restore_null', 'delta')", db); + for (String tbl : + new String[] { + "chain_restore_null", + "chain_restore_null$branch_snapshot", + "chain_restore_null$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + sql( + "INSERT INTO `chain_restore_null$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1')"); + + FileStoreTable table = paimonTable("chain_restore_null"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // Phase 1: starting + scan.plan(); + assertThat(scan.checkpoint()).isNotNull(); + + // restore(null, scanAll=true): fresh start, no delta position + scan.restore(null, true); + assertThat(scan.checkpoint()) + .as("Checkpoint should be null after restore(null, true)") + .isNull(); + + // Starting should run again + TableScan.Plan plan = scan.plan(); + assertThat(plan.splits()).as("Starting should produce splits").isNotEmpty(); + assertThat(scan.checkpoint()).as("Checkpoint should be set after new starting").isNotNull(); + } + + /** Tests starting when delta branch is empty (only snapshot data). */ + @Test + @Timeout(120) + public void testStreamingReadEmptyDelta() throws Exception { + sql( + "CREATE TABLE chain_empty_delta (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_empty_delta', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_empty_delta', 'delta')", db); + for (String tbl : + new String[] { + "chain_empty_delta", + "chain_empty_delta$branch_snapshot", + "chain_empty_delta$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write ONLY to snapshot branch, delta stays empty + sql( + "INSERT INTO `chain_empty_delta$branch_snapshot` PARTITION (dt = '20250807')" + + " VALUES (1, 1, 'snap_1'), (2, 1, 'snap_2')"); + + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_empty_delta").collect(); + + List startingRows = collectRows(it, 2); + assertThat(startingRows) + .as("Starting with empty delta should return only snapshot data") + .containsExactlyInAnyOrder( + "+I[1, 1, snap_1, 20250807]", "+I[2, 1, snap_2, 20250807]"); + + // Incremental: write to delta and verify it streams through + writeChangelogToBranch( + db, + "chain_empty_delta", + "delta", + Row.ofKind(RowKind.INSERT, 3L, 1L, "new_3", "20250808")); + + List incr = collectRows(it, 1); + assertThat(incr) + .as("First delta write should stream through after snapshot-only starting") + .containsExactlyInAnyOrder("+I[3, 1, new_3, 20250808]"); + + it.close(); + } + + /** Tests starting when snapshot branch is empty (only delta data). */ + @Test + @Timeout(120) + public void testStreamingReadEmptySnapshot() throws Exception { + sql( + "CREATE TABLE chain_empty_snap (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_empty_snap', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_empty_snap', 'delta')", db); + for (String tbl : + new String[] { + "chain_empty_snap", + "chain_empty_snap$branch_snapshot", + "chain_empty_snap$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write ONLY to delta branch, snapshot stays empty + sql( + "INSERT INTO `chain_empty_snap$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'delta_1'), (2, 1, 'delta_2')"); + sql( + "INSERT INTO `chain_empty_snap$branch_delta` PARTITION (dt = '20250809')" + + " VALUES (3, 1, 'delta_3')"); + + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_empty_snap").collect(); + + List startingRows = collectRows(it, 3); + assertThat(startingRows) + .as("Starting with empty snapshot should return only delta data") + .containsExactlyInAnyOrder( + "+I[1, 1, delta_1, 20250808]", + "+I[2, 1, delta_2, 20250808]", + "+I[3, 1, delta_3, 20250809]"); + + it.close(); + } + + /** Tests that withShard() is correctly forwarded to both batch scan and delta stream scan. */ + @Test + @Timeout(60) + public void testWithShardForwarding() throws Exception { + sql( + "CREATE TABLE chain_shard (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_shard', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_shard', 'delta')", db); + for (String tbl : + new String[] { + "chain_shard", "chain_shard$branch_snapshot", "chain_shard$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + sql( + "INSERT INTO `chain_shard$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + FileStoreTable table = paimonTable("chain_shard"); + + // Shard 0 of 2: should get a subset of data + ChainTableStreamScan scan0 = (ChainTableStreamScan) table.newStreamScan(); + scan0.withShard(0, 2); + TableScan.Plan plan0 = scan0.plan(); + + // Shard 1 of 2: should get the other subset + ChainTableStreamScan scan1 = (ChainTableStreamScan) table.newStreamScan(); + scan1.withShard(1, 2); + TableScan.Plan plan1 = scan1.plan(); + + // Together both shards should produce non-empty results + // (exact split depends on bucket hashing, but total should cover all data) + int totalSplits = plan0.splits().size() + plan1.splits().size(); + assertThat(totalSplits).as("Both shards together should produce splits").isGreaterThan(0); + } + + /** Tests streaming read when both snapshot and delta branches are empty. */ + @Test + @Timeout(60) + public void testStreamingReadBothBranchesEmpty() throws Exception { + sql( + "CREATE TABLE chain_both_empty (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_both_empty', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_both_empty', 'delta')", db); + for (String tbl : + new String[] { + "chain_both_empty", + "chain_both_empty$branch_snapshot", + "chain_both_empty$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Both branches are empty — Phase 1 should produce no splits + FileStoreTable table = paimonTable("chain_both_empty"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + TableScan.Plan plan1 = scan.plan(); + assertThat(plan1.splits()).as("Phase 1 with both branches empty should be empty").isEmpty(); + + // Phase 2: write new delta data and verify it streams through + sql( + "INSERT INTO `chain_both_empty$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + TableScan.Plan plan2 = scan.plan(); + assertThat(plan2.splits()).as("Phase 2 should pick up new delta data").isNotEmpty(); + } + + /** Tests that delta OVERWRITE in Phase 2 does not crash the scan. */ + @Test + @Timeout(60) + public void testStreamingReadDeltaOverwriteInPhase2() throws Exception { + sql( + "CREATE TABLE chain_overwrite_p2 (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_overwrite_p2', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_overwrite_p2', 'delta')", db); + for (String tbl : + new String[] { + "chain_overwrite_p2", + "chain_overwrite_p2$branch_snapshot", + "chain_overwrite_p2$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Initial delta data + sql( + "INSERT INTO `chain_overwrite_p2$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + FileStoreTable table = paimonTable("chain_overwrite_p2"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // Phase 1: read initial delta data + TableScan.Plan plan1 = scan.plan(); + assertThat(plan1.splits()).as("Phase 1 should produce splits").isNotEmpty(); + + // Phase 2: OVERWRITE the same partition on delta branch. + // This creates a snapshot with OVERWRITE kind. The scan should handle it + // gracefully — either producing data or empty plans, but never crashing. + sql( + "INSERT OVERWRITE `chain_overwrite_p2$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 2, 'new_v1'), (3, 1, 'v3')"); + + // Verify scan.plan() does not throw after OVERWRITE + for (int i = 0; i < 3; i++) { + TableScan.Plan planN = scan.plan(); + assertThat(planN).as("plan() should not return null after OVERWRITE").isNotNull(); + } + } + + /** Tests restore(null) re-runs Phase 1 with current data state. */ + @Test + @Timeout(60) + public void testStreamingReadRestoreAfterNewData() throws Exception { + sql( + "CREATE TABLE chain_restore_newdata (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_restore_newdata', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_restore_newdata', 'delta')", db); + for (String tbl : + new String[] { + "chain_restore_newdata", + "chain_restore_newdata$branch_snapshot", + "chain_restore_newdata$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write initial snapshot + delta data + sql( + "INSERT INTO `chain_restore_newdata$branch_snapshot` PARTITION (dt = '20250807')" + + " VALUES (1, 1, 'snap_1')"); + sql( + "INSERT INTO `chain_restore_newdata$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (2, 1, 'delta_2')"); + + FileStoreTable table = paimonTable("chain_restore_newdata"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // Phase 1: snapshot at dt=20250807 (latest), delta at dt=20250808 + TableScan.Plan plan1 = scan.plan(); + int phase1Size = plan1.splits().size(); + assertThat(phase1Size).as("Phase 1 should produce splits").isGreaterThan(0); + + // Add more data to both branches + sql( + "INSERT INTO `chain_restore_newdata$branch_snapshot` PARTITION (dt = '20250809')" + + " VALUES (3, 1, 'snap_3')"); + sql( + "INSERT INTO `chain_restore_newdata$branch_delta` PARTITION (dt = '20250810')" + + " VALUES (4, 1, 'delta_4')"); + + // restore(null) resets to fresh starting — Phase 1 should re-run with new data. + // After adding snapshot dt=20250809 and delta dt=20250810: + // - Latest snapshot: dt=20250809 (dt=20250807 excluded as older) + // - Delta dt=20250808 excluded (older than latest snapshot dt=20250809) + // - Delta dt=20250810 included (newer than dt=20250809) + scan.restore(null); + TableScan.Plan plan2 = scan.plan(); + assertThat(plan2.splits()) + .as("Restore(null) should re-run Phase 1 with current data") + .isNotEmpty(); + } + + /** + * T5: Tests that chain table streaming read rejects partition filters via {@code withFilter}. + * + *

Partition filters interfere with the chain table Phase 1 logic (which determines the + * latest snapshot partition per group). This test verifies that a partition-only predicate is + * rejected with an UnsupportedOperationException. + */ + @Test + @Timeout(60) + public void testStreamingReadRejectsPartitionFilter() throws Exception { + createChainTable("chain_pf_partition"); + setupChainTableBranches("chain_pf_partition"); + + sql( + "INSERT INTO `chain_pf_partition$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1')"); + + FileStoreTable table = paimonTable("chain_pf_partition"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // dt is the 4th field (index 3) in the schema: t1(0), t2(1), t3(2), dt(3) + PredicateBuilder builder = new PredicateBuilder(table.rowType()); + + // Partition-only filter should be rejected + Predicate partitionFilter = builder.equal(3, BinaryString.fromString("20250808")); + assertThatThrownBy(() -> scan.withFilter(partitionFilter)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Partition filter is not supported"); + } + + /** + * T6: Tests that non-partition filters work end-to-end in chain table streaming reads. + * + *

Verifies: (1) {@code withFilter} on a data column is accepted at the scan API level, (2) + * streaming {@code SELECT ... WHERE v = 'hello'} filters out non-matching rows, (3) the filter + * continues to apply to incrementally written data. + */ + @Test + @Timeout(120) + public void testStreamingReadWithNonPartitionFilter() throws Exception { + sql( + "CREATE TABLE chain_data_filter (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_data_filter', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_data_filter', 'delta')", db); + for (String tbl : + new String[] { + "chain_data_filter", + "chain_data_filter$branch_snapshot", + "chain_data_filter$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write initial delta data with mixed values of v + sql( + "INSERT INTO `chain_data_filter$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'hello'), (2, 1, 'world'), (3, 1, 'hello'), (4, 1, 'foo')"); + + // Streaming read with WHERE on data column v — should only return v='hello' rows + CloseableIterator it = + sEnv.executeSql("SELECT * FROM chain_data_filter WHERE v = 'hello'").collect(); + + List startingRows = collectRows(it, 2); + assertThat(startingRows) + .as("Starting with WHERE v='hello' should only return matching rows") + .containsExactlyInAnyOrder( + "+I[1, 1, hello, 20250808]", "+I[3, 1, hello, 20250808]"); + + // Incremental: write more data with mixed v values + writeChangelogToBranch( + db, + "chain_data_filter", + "delta", + Row.ofKind(RowKind.INSERT, 5L, 1L, "hello", "20250809"), + Row.ofKind(RowKind.INSERT, 6L, 1L, "bar", "20250809")); + + List incrRows = collectRows(it, 1); + assertThat(incrRows) + .as("Incremental: only v='hello' row should stream through") + .containsExactlyInAnyOrder("+I[5, 1, hello, 20250809]"); + + it.close(); + } + + /** + * T7: Tests that chain table streaming read rejects partition filters via {@code + * withPartitionFilter}. + * + *

The {@code withPartitionFilter} API (used for {@code scan.partitions} table option) should + * also be rejected. + */ + @Test + @Timeout(60) + public void testStreamingReadRejectsWithPartitionFilter() throws Exception { + createChainTable("chain_pf_api"); + setupChainTableBranches("chain_pf_api"); + + sql( + "INSERT INTO `chain_pf_api$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1')"); + + FileStoreTable table = paimonTable("chain_pf_api"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // withPartitionFilter(Map) should be rejected + assertThatThrownBy( + () -> + scan.withPartitionFilter( + java.util.Collections.singletonMap("dt", "20250808"))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Partition filter is not supported"); + + // withPartitionFilter(PartitionPredicate) should be rejected + PredicateBuilder ppBuilder = new PredicateBuilder(table.schema().logicalPartitionType()); + PartitionPredicate pp = + PartitionPredicate.fromPredicate( + table.schema().logicalPartitionType(), + ppBuilder.equal(0, BinaryString.fromString("20250808"))); + assertThatThrownBy(() -> scan.withPartitionFilter(pp)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Partition filter is not supported"); + } + + /** + * Tests that chain table streaming rejects checkpoint-align mode at job construction time, not + * at runtime. ChainSplit has no snapshotId and cannot participate in snapshot-aligned + * checkpoint grouping. + */ + @Test + public void testStreamingReadRejectsCheckpointAlign() throws Exception { + createChainTable("chain_align"); + setupChainTableBranches("chain_align"); + + sql( + "INSERT INTO `chain_align$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1')"); + + // Setting checkpoint-align.enabled on a chain table streaming read should throw + // at job construction time, not at runtime when ChainSplits are encountered. + assertThatThrownBy( + () -> + sEnv.executeSql( + "SELECT * FROM chain_align " + + "/*+ OPTIONS('source.checkpoint-align.enabled' = 'true') */")) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "Chain table streaming is not compatible with checkpoint-align"); + } + + /** + * Tests that primary-key predicates do NOT affect partition discovery in chain table streaming + * Phase 1. This is the scenario from JingsongLi's review comment: + * + *

"if the latest snapshot partition no longer has k=1 but an older delta partition still + * does, SELECT ... WHERE k=1 can make this listing miss the latest snapshot partition and then + * include the old delta row, even though that partition should be considered outdated." + * + *

The test creates: snapshot@20250808 with t1=1,2; snapshot@20250809 with t1=3,4 (no t1=1); + * delta@20250808 with t1=1. Then filters on t1=1 (a primary key field). Partition discovery + * must still see both snapshot partitions so the chain boundary is correct. + */ + @Test + public void testStreamingReadPKFilterDoesNotAffectPartitionDiscovery() throws Exception { + createChainTable("chain_pk_filter"); + setupChainTableBranches("chain_pk_filter"); + + // Snapshot@20250808: has t1=1 and t1=2 + sql( + "INSERT INTO `chain_pk_filter$branch_snapshot` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + // Snapshot@20250809: has t1=3 and t1=4 (NO t1=1) + sql( + "INSERT INTO `chain_pk_filter$branch_snapshot` PARTITION (dt = '20250809')" + + " VALUES (3, 1, 'v3'), (4, 1, 'v4')"); + + // Delta@20250808: has t1=1 + sql( + "INSERT INTO `chain_pk_filter$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 2, 'delta_v1')"); + + // --- Part 1: Verify listPartitions() with a PK predicate --- + FileStoreTable mainTable = paimonTable("chain_pk_filter"); + FileStoreTable snapshotTable = + mainTable.copy( + java.util.Collections.singletonMap(CoreOptions.BRANCH.key(), "snapshot")); + + DataTableScan scan = snapshotTable.newScan(); + PredicateBuilder builder = new PredicateBuilder(snapshotTable.rowType()); + // t1 is field index 0, part of primary key (dt, t1). + // Only snapshot@20250808 has t1=1. + Predicate t1Equals1 = builder.equal(0, 1L); + scan.withFilter(t1Equals1); + + // listPartitions() must return BOTH snapshot partitions even though only + // 20250808 contains t1=1. If it returned only 20250808, the chain boundary + // would be wrong and stale data could be included. + List partitions = scan.listPartitions(); + assertThat(partitions) + .as( + "listPartitions() must return all snapshot partitions even with a PK filter. " + + "If only dt=20250808 is returned, the chain boundary is wrong.") + .hasSize(2); + + // --- Part 2: Verify filtered batch SELECT returns correct data --- + // Chain-merged batch view: snapshot@20250808(t1=1,2), snapshot@20250809(t1=3,4). + // WHERE t1 = 1 should return only the snapshot row (t1=1, dt=20250808). + List filtered = collectResult("SELECT * FROM chain_pk_filter WHERE t1 = 1"); + assertThat(filtered) + .as("WHERE t1=1 should find the snapshot row at dt=20250808") + .hasSize(1) + .containsExactly("+I[1, 1, v1, 20250808]"); + } } From 9098815422cece4b8eb23d0dfeace39ed718ad70 Mon Sep 17 00:00:00 2001 From: Yunfeng Zhou Date: Wed, 24 Jun 2026 00:04:51 +0800 Subject: [PATCH 2/6] Fix comments --- .../table/ChainTableFileStoreTable.java | 84 ++++++++++ .../paimon/table/ChainTableStreamScan.java | 92 +++++++++-- .../paimon/flink/FlinkChainTableITCase.java | 156 ++++++++++++++++++ 3 files changed, 317 insertions(+), 15 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java index 350247949faf..94817a4d6a93 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java @@ -20,9 +20,20 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.StartupMode; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.source.ChainSplit; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamDataTableScan; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.RowType; +import java.io.IOException; import java.util.Map; /** @@ -100,6 +111,11 @@ private static String describeUnsupportedMode( } } + @Override + public InnerTableRead newRead() { + return new ChainTableRead(); + } + @Override public FileStoreTable copy(Map dynamicOptions) { return new ChainTableFileStoreTable( @@ -130,4 +146,72 @@ public FileStoreTable copyWithLatestSchema() { public FileStoreTable switchToBranch(String branchName) { return new ChainTableFileStoreTable(switchWrappedToBranch(branchName), other()); } + + /** + * Chain-aware read implementation that pairs with {@link ChainTableStreamScan}. Routes splits + * based on type: + * + *

    + *
  • ChainSplit / DataSplit: Streaming read splits. Routed to {@link + * ChainGroupReadTable}'s read which uses {@link + * org.apache.paimon.io.ChainKeyValueFileReaderFactory} (both branch schemas for + * ChainSplit) or delta branch read (DataSplit with correct schema). + *
  • FallbackSplit: Batch read fallback splits. Routed to inherited {@link + * FallbackReadFileStoreTable} read for partition-level fallback logic. + *
+ */ + private class ChainTableRead implements InnerTableRead { + + private final InnerTableRead chainGroupRead; + private final InnerTableRead fallbackRead; + + private ChainTableRead() { + this.chainGroupRead = other().newRead(); + this.fallbackRead = ChainTableFileStoreTable.super.newRead(); + } + + @Override + public InnerTableRead withFilter(Predicate predicate) { + chainGroupRead.withFilter(predicate); + fallbackRead.withFilter(predicate); + return this; + } + + @Override + public InnerTableRead withReadType(RowType readType) { + chainGroupRead.withReadType(readType); + fallbackRead.withReadType(readType); + return this; + } + + @Override + public InnerTableRead forceKeepDelete() { + chainGroupRead.forceKeepDelete(); + fallbackRead.forceKeepDelete(); + return this; + } + + @Override + public TableRead executeFilter() { + chainGroupRead.executeFilter(); + fallbackRead.executeFilter(); + return this; + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + chainGroupRead.withIOManager(ioManager); + fallbackRead.withIOManager(ioManager); + return this; + } + + @Override + public RecordReader createReader(Split split) throws IOException { + if (split instanceof ChainSplit || split instanceof DataSplit) { + return chainGroupRead.createReader(split); + } + // FallbackSplit and other types: use inherited fallback read logic + return fallbackRead.createReader(split); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java index 7378744cf5a4..ee7c41281ec1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java @@ -18,6 +18,7 @@ package org.apache.paimon.table; +import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.codegen.CodeGenUtils; import org.apache.paimon.codegen.RecordComparator; @@ -52,6 +53,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Streaming scan for chain tables with a two-phase design: @@ -107,6 +109,9 @@ public class ChainTableStreamScan implements StreamDataTableScan { private int shardCount = -1; + /** Maximum number of retries when race condition is detected during position capture. */ + private static final int MAX_RACE_RETRIES = 3; + public ChainTableStreamScan(ChainGroupReadTable chainGroupReadTable) { this.chainGroupReadTable = chainGroupReadTable; this.batchScan = @@ -142,12 +147,19 @@ public TableScan.Plan plan() { if (!startingDone) { return planStarting(); } + TableScan.Plan plan = deltaStreamScan.plan(); // Never return SnapshotNotExistPlan — it would cause the Flink enumerator to // set stopTriggerScan=true and permanently stop polling for new data. if (plan instanceof SnapshotNotExistPlan) { return new DataFilePlan<>(Collections.emptyList()); } + + // Phase 2 reads from the delta branch only. The delta stream scan already produces + // DataSplits with the correct branch context and all required metadata (bucket, + // isStreaming, rawConvertible, deletionFiles). Wrapping them in ChainSplit would drop + // that metadata, which breaks changelog-producer=input streaming reads because the + // reader would fall back to LSM merging instead of streaming the changelog rows. return plan; } @@ -164,14 +176,57 @@ private TableScan.Plan planStarting() { String deltaBranch = deltaTable.coreOptions().branch(); String snapshotBranch = chainGroupReadTable.wrapped.coreOptions().branch(); - Long latestId = captureDeltaPosition(deltaTable); + // Capture both delta and snapshot positions with race detection. + // We capture snapshot, delta, snapshot again. If the first and third snapshot IDs + // differ, a race occurred and we retry. This prevents data loss from snapshot commits + // between captures. + Long snapshotLatestId; + Long deltaLatestId; + int attempt = 0; + while (true) { + Long snapshotId1 = chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId(); + deltaLatestId = captureDeltaPosition(deltaTable); + Long snapshotId2 = chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId(); + + if (Objects.equals(snapshotId1, snapshotId2)) { + // No race detected + snapshotLatestId = snapshotId1; + LOG.info( + "ChainTableStreamScan: captured positions (attempt {}): " + + "snapshot={}, delta={}", + attempt + 1, + snapshotId1, + deltaLatestId); + break; + } + + // Race detected: snapshot committed between captures + LOG.warn( + "ChainTableStreamScan: race condition detected (attempt {}): " + + "snapshot changed from {} to {}", + attempt + 1, + snapshotId1, + snapshotId2); + + attempt++; + if (attempt >= MAX_RACE_RETRIES) { + throw new IllegalStateException( + "ChainTableStreamScan: failed to capture consistent positions after " + + MAX_RACE_RETRIES + + " retries due to continuous snapshot commits. " + + "This indicates high snapshot commit frequency. " + + "The job will fail and rely on Flink failover mechanism to retry."); + } + } // 1. Read delta branch data at the pinned snapshot, grouped by partition. Map> deltaSplitsByPartition; - if (latestId != null) { + if (deltaLatestId != null) { FileStoreTable pinnedDelta = deltaTable.copy( - Collections.singletonMap("scan.snapshot-id", String.valueOf(latestId))); + Collections.singletonMap( + CoreOptions.SCAN_SNAPSHOT_ID.key(), + String.valueOf(deltaLatestId))); DataTableScan pinnedDeltaScan = pinnedDelta.newScan(); applyPredicatesAndShard(pinnedDeltaScan); deltaSplitsByPartition = groupByPartition(pinnedDeltaScan); @@ -179,13 +234,19 @@ private TableScan.Plan planStarting() { deltaSplitsByPartition = Collections.emptyMap(); } - // 2. List snapshot partitions (lightweight — partition metadata only, no file I/O). - // Find the latest chain partition per group, then scan only those partitions for files. - // This avoids reading file manifests for hundreds of historical partitions that will be - // discarded (only the latest per group is kept). + // 2. List snapshot partitions at the pinned snapshot (lightweight — partition metadata + // only, no file I/O). Find the latest chain partition per group, then scan only those + // partitions for files. This avoids reading file manifests for hundreds of historical + // partitions that will be discarded (only the latest per group is kept). Map latestChainPartitionPerGroup = new HashMap<>(); - if (chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null) { - DataTableScan partitionListingScan = chainGroupReadTable.wrapped.newScan(); + FileStoreTable pinnedSnapshot = null; + if (snapshotLatestId != null) { + pinnedSnapshot = + chainGroupReadTable.wrapped.copy( + Collections.singletonMap( + CoreOptions.SCAN_SNAPSHOT_ID.key(), + String.valueOf(snapshotLatestId))); + DataTableScan partitionListingScan = pinnedSnapshot.newScan(); for (BinaryRow partition : partitionListingScan.listPartitions()) { Object groupKey = toGroupKey(partition); BinaryRow existingLatest = latestChainPartitionPerGroup.get(groupKey); @@ -199,11 +260,12 @@ private TableScan.Plan planStarting() { } } - // 3. Scan file splits for latest snapshot partitions only. + // 3. Scan file splits for latest snapshot partitions only, at the pinned snapshot. + // Reuse the pinnedSnapshot from step 2 to avoid redundant copy operations. List latestPartitions = new ArrayList<>(latestChainPartitionPerGroup.values()); Map> snapshotSplitsByPartition; - if (!latestPartitions.isEmpty()) { - DataTableScan snapshotScan = chainGroupReadTable.wrapped.newScan(); + if (!latestPartitions.isEmpty() && pinnedSnapshot != null) { + DataTableScan snapshotScan = pinnedSnapshot.newScan(); snapshotScan.withPartitionFilter(latestPartitions); applyPredicatesAndShard(snapshotScan); snapshotSplitsByPartition = groupByPartition(snapshotScan); @@ -212,9 +274,9 @@ private TableScan.Plan planStarting() { } // 4. Build ChainSplits: - // - Snapshot partitions are already filtered to latest per group. - // - Delta partitions: include if (a) chain key > latest for that group, or - // (b) no snapshot exists for that group. + // - Snapshot partitions are already filtered to latest per group at the pinned snapshot. + // - Delta partitions: include partitions with chain key > latest snapshot chain key for + // that group, or all partitions if no snapshot exists for that group. List allSplits = new ArrayList<>(); for (Map.Entry> entry : snapshotSplitsByPartition.entrySet()) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java index ab7727a00eea..daa39b7c48ea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java @@ -2130,4 +2130,160 @@ public void testStreamingReadPKFilterDoesNotAffectPartitionDiscovery() throws Ex .hasSize(1) .containsExactly("+I[1, 1, v1, 20250808]"); } + + /** + * Reproduces the snapshot branch race condition from PR comment: Phase 1 pins delta at latestId + * but snapshot branch is read from "whatever is latest". If a snapshot commit lands between + * capturing delta and listing partitions, Phase 1 excludes old delta data, and Phase 2 starts + * from latestId+1, so the old delta is never emitted. + * + *

This test simulates the race by: 1) writing delta data, 2) capturing delta position, 3) + * writing new snapshot data, 4) verifying the delta data is still emitted in Phase 1 or Phase + * 2. + */ + @Test + @Timeout(60) + public void testStreamingReadSnapshotBranchRaceCondition() throws Exception { + sql( + "CREATE TABLE chain_race (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_race', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_race', 'delta')", db); + for (String tbl : + new String[] { + "chain_race", "chain_race$branch_snapshot", "chain_race$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta'" + + ")", + tbl); + } + + // Step 1: Write delta data at dt=20250808 + sql( + "INSERT INTO `chain_race$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'delta_1'), (2, 1, 'delta_2')"); + + // Step 2: Start streaming job (captures delta position) + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_race").collect(); + + // Step 3: Collect Phase 1 output (should include delta@20250808) + List phase1 = collectRows(it, 2); + assertThat(phase1) + .as("Phase 1 should include delta@20250808 data") + .containsExactlyInAnyOrder( + "+I[1, 1, delta_1, 20250808]", "+I[2, 1, delta_2, 20250808]"); + + // Step 4: Write NEW snapshot data at dt=20250809 (simulates race condition) + sql( + "INSERT INTO `chain_race$branch_snapshot` PARTITION (dt = '20250809')" + + " VALUES (3, 1, 'snap_3')"); + + // Step 5: Write NEW delta data at dt=20250810 + sql( + "INSERT INTO `chain_race$branch_delta` PARTITION (dt = '20250810')" + + " VALUES (4, 1, 'delta_4')"); + + // Step 6: Collect Phase 2 output (should include delta@20250810) + // BUG: If snapshot branch was not pinned, Phase 1 might have excluded delta@20250808 + // after seeing snapshot@20250809, and Phase 2 would miss it too. + List phase2 = collectRows(it, 1); + assertThat(phase2) + .as("Phase 2 should include new delta@20250810 data") + .containsExactlyInAnyOrder("+I[4, 1, delta_4, 20250810]"); + + it.close(); + } + + /** + * Reproduces the Phase 2 read bypass issue from PR comment: after Phase 1, the stream emits + * normal delta DataSplits from deltaStreamScan.plan(). But ChainTableFileStoreTable inherits + * FallbackReadFileStoreTable.newRead(), whose non-FallbackSplit path falls back to + * mainRead.createReader(split), bypassing the branch-aware ChainGroupReadTable.Read logic. + * + *

This test creates a chain table where delta branch has a different column default than + * snapshot branch, then verifies Phase 2 reads produce correct results with branch-aware schema + * lookup. + */ + @Test + @Timeout(60) + public void testStreamingReadPhase2BranchAwareRead() throws Exception { + sql( + "CREATE TABLE chain_phase2 (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_phase2', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_phase2', 'delta')", db); + for (String tbl : + new String[] { + "chain_phase2", "chain_phase2$branch_snapshot", "chain_phase2$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta'" + + ")", + tbl); + } + + // Write snapshot data at dt=20250808 + sql( + "INSERT INTO `chain_phase2$branch_snapshot` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'snap_1')"); + + // Write delta data at dt=20250809 (different partition, will be read in Phase 2) + sql( + "INSERT INTO `chain_phase2$branch_delta` PARTITION (dt = '20250809')" + + " VALUES (2, 1, 'delta_2')"); + + // Start streaming and collect Phase 1 (snapshot@20250808 + delta@20250809) + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_phase2").collect(); + List phase1 = collectRows(it, 2); + assertThat(phase1) + .as("Phase 1 should include both snapshot and delta data") + .containsExactlyInAnyOrder( + "+I[1, 1, snap_1, 20250808]", "+I[2, 1, delta_2, 20250809]"); + + // Write NEW delta data at dt=20250810 (will be read in Phase 2) + sql( + "INSERT INTO `chain_phase2$branch_delta` PARTITION (dt = '20250810')" + + " VALUES (3, 1, 'delta_3')"); + + // Collect Phase 2 output + // BUG: If Phase 2 read bypasses branch-aware logic, this might fail or produce + // wrong results when snapshot/delta schemas diverge. + List phase2 = collectRows(it, 1); + assertThat(phase2) + .as("Phase 2 should correctly read delta@20250810 with branch-aware logic") + .containsExactlyInAnyOrder("+I[3, 1, delta_3, 20250810]"); + + it.close(); + } } From 4d1287db43fd9ebe200ab50e9aaf5cb3aefe9254 Mon Sep 17 00:00:00 2001 From: Yunfeng Zhou Date: Wed, 24 Jun 2026 16:01:16 +0800 Subject: [PATCH 3/6] Fix check for consumer mode --- .../table/ChainTableFileStoreTable.java | 14 ++-- .../paimon/flink/FlinkChainTableITCase.java | 79 +++++++++++++++++-- 2 files changed, 78 insertions(+), 15 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java index 94817a4d6a93..9d0b756c7fe4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java @@ -54,11 +54,9 @@ public StreamDataTableScan newStreamScan() { CoreOptions coreOptions = wrapped.coreOptions(); StartupMode effectiveMode = coreOptions.startupMode(); - boolean hasConsumerProgress = - coreOptions.consumerId() != null && !coreOptions.consumerIgnoreProgress(); - if (effectiveMode != StartupMode.LATEST_FULL || hasConsumerProgress) { - String reason = - describeUnsupportedMode(coreOptions, effectiveMode, hasConsumerProgress); + boolean hasConsumer = coreOptions.consumerId() != null; + if (effectiveMode != StartupMode.LATEST_FULL || hasConsumer) { + String reason = describeUnsupportedMode(coreOptions, effectiveMode, hasConsumer); throw new UnsupportedOperationException( "Chain table streaming read does not support startup mode '" + reason @@ -80,9 +78,9 @@ public StreamDataTableScan newStreamScan() { } private static String describeUnsupportedMode( - CoreOptions coreOptions, StartupMode effectiveMode, boolean hasConsumerProgress) { - if (hasConsumerProgress) { - return "consumer-id with existing progress"; + CoreOptions coreOptions, StartupMode effectiveMode, boolean hasConsumer) { + if (hasConsumer) { + return "consumer mode (consumer-id='" + coreOptions.consumerId() + "')"; } switch (effectiveMode) { case LATEST: diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java index daa39b7c48ea..036877a60c73 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java @@ -46,7 +46,10 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -1249,8 +1252,7 @@ public void testStreamingReadRejectsNonDefaultStartup() throws Exception { .isInstanceOf(ChainTableStreamScan.class); // scan.mode=latest: should throw UnsupportedOperationException - FileStoreTable tableLatest = - table.copy(java.util.Collections.singletonMap("scan.mode", "latest")); + FileStoreTable tableLatest = table.copy(Collections.singletonMap("scan.mode", "latest")); assertThatThrownBy(tableLatest::newStreamScan) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("scan.mode=latest") @@ -1258,6 +1260,72 @@ public void testStreamingReadRejectsNonDefaultStartup() throws Exception { .hasMessageContaining("t$branch_delta"); } + /** + * Tests that chain table streaming read rejects consumer mode. When consumer.id is configured, + * an {@link UnsupportedOperationException} is thrown because chain table streaming does not + * support consumer progress tracking. + */ + @Test + @Timeout(60) + public void testStreamingReadRejectsConsumerMode() throws Exception { + sql( + "CREATE TABLE chain_consumer (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_consumer', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_consumer', 'delta')", db); + for (String tbl : + new String[] { + "chain_consumer", + "chain_consumer$branch_snapshot", + "chain_consumer$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + sql( + "INSERT INTO `chain_consumer$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + FileStoreTable table = paimonTable("chain_consumer"); + + // consumer.id set: should throw UnsupportedOperationException + FileStoreTable tableWithConsumer = + table.copy(Collections.singletonMap("consumer-id", "my-consumer")); + assertThatThrownBy(tableWithConsumer::newStreamScan) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("consumer mode") + .hasMessageContaining("consumer-id='my-consumer'") + .hasMessageContaining("Chain table streaming read does not support"); + + // consumer-id with consumer.ignore-progress=true: still rejected + Map consumerIgnoreProgressOptions = new HashMap<>(); + consumerIgnoreProgressOptions.put("consumer-id", "my-consumer"); + consumerIgnoreProgressOptions.put("consumer.ignore-progress", "true"); + FileStoreTable tableWithConsumerIgnoreProgress = + table.copy(Collections.unmodifiableMap(consumerIgnoreProgressOptions)); + assertThatThrownBy(tableWithConsumerIgnoreProgress::newStreamScan) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("consumer mode"); + } + /** * T3: Tests that streaming read works with changelog-producer=none (the default). * @@ -2024,9 +2092,7 @@ public void testStreamingReadRejectsWithPartitionFilter() throws Exception { // withPartitionFilter(Map) should be rejected assertThatThrownBy( - () -> - scan.withPartitionFilter( - java.util.Collections.singletonMap("dt", "20250808"))) + () -> scan.withPartitionFilter(Collections.singletonMap("dt", "20250808"))) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("Partition filter is not supported"); @@ -2101,8 +2167,7 @@ public void testStreamingReadPKFilterDoesNotAffectPartitionDiscovery() throws Ex // --- Part 1: Verify listPartitions() with a PK predicate --- FileStoreTable mainTable = paimonTable("chain_pk_filter"); FileStoreTable snapshotTable = - mainTable.copy( - java.util.Collections.singletonMap(CoreOptions.BRANCH.key(), "snapshot")); + mainTable.copy(Collections.singletonMap(CoreOptions.BRANCH.key(), "snapshot")); DataTableScan scan = snapshotTable.newScan(); PredicateBuilder builder = new PredicateBuilder(snapshotTable.rowType()); From 7ad4c30100d36542dce828d6750e42f4062f2f72 Mon Sep 17 00:00:00 2001 From: Yunfeng Zhou Date: Thu, 25 Jun 2026 09:49:32 +0800 Subject: [PATCH 4/6] Fix leaking partition filter --- .../paimon/table/ChainTableStreamScan.java | 23 +++++++----- .../paimon/flink/FlinkChainTableITCase.java | 36 +++++++++++++++++++ 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java index ee7c41281ec1..276114712a04 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java @@ -25,8 +25,8 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.partition.PartitionPredicate; -import org.apache.paimon.predicate.PartitionPredicateVisitor; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateVisitor; import org.apache.paimon.table.source.ChainSplit; import org.apache.paimon.table.source.DataFilePlan; import org.apache.paimon.table.source.DataSplit; @@ -54,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * Streaming scan for chain tables with a two-phase design: @@ -368,14 +369,18 @@ public InnerTableScan withFilter(Predicate predicate) { if (predicate == null) { return this; } - if (!partitionKeys.isEmpty() - && predicate.visit(new PartitionPredicateVisitor(partitionKeys))) { - throw new UnsupportedOperationException( - "Partition filter is not supported in chain table streaming read. " - + "The chain table streaming scan determines which partitions to read " - + "based on the chain-merge logic across snapshot and delta branches. " - + "Applying a partition filter would interfere with this logic. " - + "If you need to read a specific partition, use batch mode instead."); + if (!partitionKeys.isEmpty()) { + Set referencedFields = PredicateVisitor.collectFieldNames(predicate); + boolean containsPartitionField = + referencedFields.stream().anyMatch(partitionKeys::contains); + if (containsPartitionField) { + throw new UnsupportedOperationException( + "Partition filter is not supported in chain table streaming read. " + + "The chain table streaming scan determines which partitions to read " + + "based on the chain-merge logic across snapshot and delta branches. " + + "Applying a partition filter would interfere with this logic. " + + "If you need to read a specific partition, use batch mode instead."); + } } predicates.add(predicate); batchScan.withFilter(predicate); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java index 036877a60c73..7d7a053a16cd 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java @@ -1997,6 +1997,42 @@ public void testStreamingReadRejectsPartitionFilter() throws Exception { .hasMessageContaining("Partition filter is not supported"); } + /** + * Tests that chain table streaming read rejects mixed predicates that contain partition + * conjuncts. + * + *

A predicate like {@code dt = '20250808' AND v = 'hello'} combines a partition filter with + * a data filter. In Flink, such predicates are pushed down as a single AND expression. The + * chain table stream scan must reject any predicate that contains partition fields, because the + * partition conjunct would be extracted later and interfere with the chain boundary computation + * in Phase 1. + */ + @Test + @Timeout(60) + public void testStreamingReadRejectsMixedPredicateWithPartition() throws Exception { + createChainTable("chain_pf_mixed"); + setupChainTableBranches("chain_pf_mixed"); + + sql( + "INSERT INTO `chain_pf_mixed$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'hello')"); + + FileStoreTable table = paimonTable("chain_pf_mixed"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + PredicateBuilder builder = new PredicateBuilder(table.rowType()); + // dt is the 4th field (index 3), v is the 3rd field (index 2) + Predicate dtEquals = builder.equal(3, BinaryString.fromString("20250808")); + Predicate vEquals = builder.equal(2, BinaryString.fromString("hello")); + // Mixed predicate: partition AND data field + Predicate mixedPredicate = builder.and(dtEquals, vEquals); + + // Should be rejected because it contains a partition conjunct + assertThatThrownBy(() -> scan.withFilter(mixedPredicate)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Partition filter is not supported"); + } + /** * T6: Tests that non-partition filters work end-to-end in chain table streaming reads. * From 6140c4bd481e49adca37265df21c3028ce5ea88e Mon Sep 17 00:00:00 2001 From: Yunfeng Zhou Date: Thu, 25 Jun 2026 23:26:47 +0800 Subject: [PATCH 5/6] Fix document for changelog and consumer --- docs/docs/primary-key-table/chain-table.md | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/docs/docs/primary-key-table/chain-table.md b/docs/docs/primary-key-table/chain-table.md index b018c3f5130b..e5b5bb218694 100644 --- a/docs/docs/primary-key-table/chain-table.md +++ b/docs/docs/primary-key-table/chain-table.md @@ -234,9 +234,7 @@ this page: data at that point in time. - **Delta branch** receives incremental changes between snapshots (e.g., a batch job writes the current day's new/updated records via `INSERT INTO t$branch_delta`). Each delta partition - contains only the changes for that period. The delta branch must have a - [changelog producer](./changelog-producer) configured (e.g., `'changelog-producer' = 'input'`) - for streaming read to work. + contains only the changes for that period. The streaming read relies on this pattern to produce correct results. After the full load phase, only new delta branch commits are picked up — writes to the snapshot branch do not trigger @@ -256,10 +254,9 @@ INSERT INTO downstream_sink SELECT * FROM default.t; not detected until the streaming job is restarted. - The chain-table-aware streaming scan only supports the default startup mode (`latest-full`). When the user specifies an explicit starting position — such as `scan.snapshot-id`, - `scan.timestamp-millis`, `scan.mode = 'latest'`, or `consumer-id` with existing progress — - an `UnsupportedOperationException` is thrown. To use standard streaming read without chain - table logic, read from a specific branch table (e.g., `t$branch_delta`) instead of the main - table. + `scan.timestamp-millis`, `scan.mode = 'latest'`, or `consumer-id` — an + `UnsupportedOperationException` is thrown. To use standard streaming read without chain table + logic, read from a specific branch table (e.g., `t$branch_delta`) instead of the main table. - Partition filters are not supported in chain table streaming reads. Specifying a partition filter — either via a `WHERE` clause on partition columns or the `scan.partitions` table option — throws an `UnsupportedOperationException`. This is because the chain table streaming From 39d46f1a4712a5f02b7207d43dffd94fa58c50ba Mon Sep 17 00:00:00 2001 From: Yunfeng Zhou Date: Sat, 27 Jun 2026 14:03:49 +0800 Subject: [PATCH 6/6] Fix handling strategy for fallback split --- .../org/apache/paimon/table/ChainTableFileStoreTable.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java index 9d0b756c7fe4..863cd969bc89 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java @@ -205,10 +205,14 @@ public TableRead withIOManager(IOManager ioManager) { @Override public RecordReader createReader(Split split) throws IOException { + if (split instanceof FallbackSplit) { + // FallbackSplit (including FallbackDataSplit): use inherited fallback read logic + return fallbackRead.createReader(split); + } if (split instanceof ChainSplit || split instanceof DataSplit) { return chainGroupRead.createReader(split); } - // FallbackSplit and other types: use inherited fallback read logic + // Other split types: use inherited fallback read logic return fallbackRead.createReader(split); } }