Skip to content

Kafka Connect: Exclude zombie commitId from validThroughTs()#17080

Open
thswlsqls wants to merge 1 commit into
apache:mainfrom
thswlsqls:fix/kafka-connect-validthroughts-zombie-commitid
Open

Kafka Connect: Exclude zombie commitId from validThroughTs()#17080
thswlsqls wants to merge 1 commit into
apache:mainfrom
thswlsqls:fix/kafka-connect-validthroughts-zombie-commitid

Conversation

@thswlsqls

Copy link
Copy Markdown
Contributor

Closes #17077

Summary

  • CommitState.addReady() buffers every DATA_COMPLETE event in readyBuffer regardless of commitId, and validThroughTs() used that buffer unfiltered, so a zombie coordinator's stale event from a previous commit could corrupt or null out VALID_THROUGH_TS_SNAPSHOT_PROP.
  • validThroughTs() now filters readyBuffer by Objects.equals(currentCommitId, payload.commitId()), the same pattern addReady() already applies to receivedPartitionCount.
  • PR Kafka Connect: Make CommitState.isCommitReady() O(1) #16453 (merged) applied this filter to receivedPartitionCount and isCommitReady() but left validThroughTs() unfiltered — this closes that gap.

Testing done

  • Added TestCommitState#testGetValidThroughTsIgnoresZombieCoordinatorPayloads, mirroring testIsCommitReadyIgnoresZombieCoordinatorPayloads from PR Kafka Connect: Make CommitState.isCommitReady() O(1) #16453, covering a zombie coordinator's DATA_COMPLETE (different commitId, null timestamp) being excluded from the valid-through calculation.
  • Updated TestCommitState#testGetValidThroughTs to stub commitId() on its mocked DataComplete payloads so they match the new filter.
  • ./gradlew :iceberg-kafka-connect:iceberg-kafka-connect:check passes — TestCommitState 5/5, full module 123/123, 0 failures.

CommitState.addReady() buffers every DATA_COMPLETE event regardless of
commitId, and validThroughTs() used the buffer without filtering, so a
zombie coordinator's stale event (from a prior commit) could corrupt or
null out the valid-through timestamp written to
VALID_THROUGH_TS_SNAPSHOT_PROP. Filter readyBuffer by currentCommitId,
matching the pattern already used for receivedPartitionCount.

Generated-by: Claude Code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka Connect: validThroughTs() includes stale commitId entries from zombie coordinators

1 participant