Skip to content

[FLINK-37627][BugFix][Connectors/Kinesis] Restarting from a checkpoint/savepoint which coincides with shard split causes data loss #198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

arunlakshman
Copy link

@arunlakshman arunlakshman commented Apr 11, 2025

Purpose of the change

[FLINK-37627][BugFix][Connectors/Kinesis] Restarting from a checkpoint/savepoint which coincides with shard split causes data loss

Today Flink does not support distributed consistency of events from subtask (Task Manager) to coordinator (Job Manager) - https://issues.apache.org/jira/browse/FLINK-28639. As a result we have a race condition that can lead to a shard and it's children shards stopped being processed after a job restart.

  • A checkpoint started
  • Enumerator took a checkpoint (shard was assigned here)
  • Enumerator sent checkpoint event to reader
  • Before taking reader checkpoint, a SplitFinishedEvent came up in reader
  • Reader took checkpoint
  • Now, just after checkpoint complete, job restarted

This can lead to a shard lineage getting lost because of a shard being in ASSIGNED state in enumerator and not being part of any task manager state.
This PR changes the behaviour by also checkpointing the finished splits events received in between two checkpoints and on restore, those events again getting replayed.

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

Copy link

boring-cyborg bot commented Apr 11, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@arunlakshman arunlakshman marked this pull request as ready for review April 15, 2025 21:27
…t/savepoint which coincides with shard split causes data loss
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants