Skip to content

[FLINK-37627][BugFix][Connectors/Kinesis] Restarting from a checkpoint/savepoint which coincides with shard split causes data loss #198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,26 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
}
}

private void handleFinishedSplits(int subtask, SplitsFinishedEvent splitsFinishedEvent) {
private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) {
splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
splitAssignment
.get(subtask)
.removeIf(
split ->
splitsFinishedEvent
.getFinishedSplitIds()
.contains(split.splitId()));
Set<KinesisShardSplit> splitsAssignment = splitAssignment.get(subtaskId);
// during recovery, splitAssignment may return null since there might be no split assigned
// to the subtask, but there might be SplitsFinishedEvent from that subtask.
// We will not do child shard assignment if that is the case since that might lead to child
// shards trying to get assigned before there being any readers.
if (splitsAssignment == null) {
LOG.info(
"handleFinishedSplits called for subtask: {} which doesn't have any "
+ "assigned splits right now. This might happen due to job restarts. "
+ "Child shard discovery might be delayed until we have enough readers."
+ "Finished split ids: {}",
subtaskId,
splitsFinishedEvent.getFinishedSplitIds());
return;
}

splitsAssignment.removeIf(
split -> splitsFinishedEvent.getFinishedSplitIds().contains(split.splitId()));
assignSplits();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class UniformShardAssigner implements KinesisShardAssigner {
public int assign(KinesisShardSplit split, Context context) {
Preconditions.checkArgument(
!context.getRegisteredReaders().isEmpty(),
"Expected at least one registered reader. Unable to assign split.");
"Expected at least one registered reader. Unable to assign split with id: %s.",
split.splitId());
BigInteger hashKeyStart = new BigInteger(split.getStartingHashKey());
BigInteger hashKeyEnd = new BigInteger(split.getEndingHashKey());
BigInteger hashKeyMid = hashKeyStart.add(hashKeyEnd).divide(TWO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ private boolean verifyAllParentSplitsAreFinished(KinesisShardSplit split) {
return allParentsFinished;
}

/**
* Checks if split with specified id is finished.
*
* @param splitId Id of the split to check
* @return true if split is finished, otherwise false
*/
private boolean isFinished(String splitId) {
return !knownSplits.containsKey(splitId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.Record;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;

/**
* Coordinates the reading from assigned splits. Runs on the TaskManager.
Expand All @@ -49,6 +54,8 @@ public class KinesisStreamsSourceReader<T>

private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceReader.class);
private final Map<String, KinesisShardMetrics> shardMetricGroupMap;
private final NavigableMap<Long, Set<KinesisShardSplit>> splitFinishedEvents;
private long currentCheckpointId;

public KinesisStreamsSourceReader(
SingleThreadFetcherManager<Record, KinesisShardSplit> splitFetcherManager,
Expand All @@ -58,15 +65,67 @@ public KinesisStreamsSourceReader(
Map<String, KinesisShardMetrics> shardMetricGroupMap) {
super(splitFetcherManager, recordEmitter, config, context);
this.shardMetricGroupMap = shardMetricGroupMap;
this.splitFinishedEvents = new TreeMap<>();
this.currentCheckpointId = Long.MIN_VALUE;
}

@Override
protected void onSplitFinished(Map<String, KinesisShardSplitState> finishedSplitIds) {
if (finishedSplitIds.isEmpty()) {
return;
}
splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new HashSet<>());
finishedSplitIds.values().stream()
.map(
finishedSplit ->
new KinesisShardSplit(
finishedSplit.getStreamArn(),
finishedSplit.getShardId(),
finishedSplit.getNextStartingPosition(),
finishedSplit.getKinesisShardSplit().getParentShardIds(),
finishedSplit.getKinesisShardSplit().getStartingHashKey(),
finishedSplit.getKinesisShardSplit().getEndingHashKey(),
true))
.forEach(split -> splitFinishedEvents.get(currentCheckpointId).add(split));

context.sendSourceEventToCoordinator(
new SplitsFinishedEvent(new HashSet<>(finishedSplitIds.keySet())));
finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup);
}

/**
* At snapshot, we also store the pending finished split ids in the current checkpoint so that
* in case we have to restore the reader from state, we also send the finished split ids
* otherwise we run a risk of data loss during restarts of the source because of the
* SplitsFinishedEvent going missing.
*
* @param checkpointId the checkpoint id
* @return a list of finished splits
*/
@Override
public List<KinesisShardSplit> snapshotState(long checkpointId) {
this.currentCheckpointId = checkpointId;
List<KinesisShardSplit> splits = new ArrayList<>(super.snapshotState(checkpointId));

if (!splitFinishedEvents.isEmpty()) {
// Add all finished splits to the snapshot
splitFinishedEvents.values().forEach(splits::addAll);
}

return splits;
}

/**
* During notifyCheckpointComplete, we should clean up the state of finished splits that are
* less than or equal to the checkpoint id.
*
* @param checkpointId the checkpoint id
*/
@Override
public void notifyCheckpointComplete(long checkpointId) {
splitFinishedEvents.headMap(checkpointId, true).clear();
}

@Override
protected KinesisShardSplitState initializedState(KinesisShardSplit split) {
return new KinesisShardSplitState(split);
Expand All @@ -79,8 +138,17 @@ protected KinesisShardSplit toSplitType(String splitId, KinesisShardSplitState s

@Override
public void addSplits(List<KinesisShardSplit> splits) {
splits.forEach(this::registerShardMetricGroup);
super.addSplits(splits);
List<KinesisShardSplit> kinesisShardSplits = new ArrayList<>();
for (KinesisShardSplit split : splits) {
if (split.isFinished()) {
context.sendSourceEventToCoordinator(
new SplitsFinishedEvent(Collections.singleton(split.splitId())));
} else {
kinesisShardSplits.add(split);
}
}
kinesisShardSplits.forEach(this::registerShardMetricGroup);
super.addSplits(kinesisShardSplits);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public final class KinesisShardSplit implements SourceSplit {
private final Set<String> parentShardIds;
private final String startingHashKey;
private final String endingHashKey;
private final boolean finished;

public KinesisShardSplit(
String streamArn,
Expand All @@ -52,6 +53,24 @@ public KinesisShardSplit(
Set<String> parentShardIds,
String startingHashKey,
String endingHashKey) {
this(
streamArn,
shardId,
startingPosition,
parentShardIds,
startingHashKey,
endingHashKey,
false);
}

public KinesisShardSplit(
String streamArn,
String shardId,
StartingPosition startingPosition,
Set<String> parentShardIds,
String startingHashKey,
String endingHashKey,
boolean finished) {
checkNotNull(streamArn, "streamArn cannot be null");
checkNotNull(shardId, "shardId cannot be null");
checkNotNull(startingPosition, "startingPosition cannot be null");
Expand All @@ -65,6 +84,11 @@ public KinesisShardSplit(
this.parentShardIds = new HashSet<>(parentShardIds);
this.startingHashKey = startingHashKey;
this.endingHashKey = endingHashKey;
this.finished = finished;
}

public boolean isFinished() {
return finished;
}

@Override
Expand Down Expand Up @@ -116,6 +140,8 @@ public String toString() {
+ ", endingHashKey='"
+ endingHashKey
+ '\''
+ ", finished="
+ finished
+ '}';
}

Expand All @@ -133,7 +159,8 @@ public boolean equals(Object o) {
&& Objects.equals(startingPosition, that.startingPosition)
&& Objects.equals(parentShardIds, that.parentShardIds)
&& Objects.equals(startingHashKey, that.startingHashKey)
&& Objects.equals(endingHashKey, that.endingHashKey);
&& Objects.equals(endingHashKey, that.endingHashKey)
&& Objects.equals(finished, that.finished);
}

@Override
Expand All @@ -144,6 +171,7 @@ public int hashCode() {
startingPosition,
parentShardIds,
startingHashKey,
endingHashKey);
endingHashKey,
finished);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
@Internal
public class KinesisShardSplitSerializer implements SimpleVersionedSerializer<KinesisShardSplit> {

private static final int CURRENT_VERSION = 1;
private static final Set<Integer> COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1));
private static final int CURRENT_VERSION = 2;
private static final Set<Integer> COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1, 2));

@Override
public int getVersion() {
Expand Down Expand Up @@ -78,6 +78,7 @@ public byte[] serialize(KinesisShardSplit split) throws IOException {
}
out.writeUTF(split.getStartingHashKey());
out.writeUTF(split.getEndingHashKey());
out.writeBoolean(split.isFinished());

out.flush();
return baos.toByteArray();
Expand Down Expand Up @@ -112,6 +113,41 @@ byte[] serializeV0(KinesisShardSplit split) throws IOException {
}
}

/** This method used only to test backwards compatibility of deserialization logic. */
@VisibleForTesting
byte[] serializeV1(KinesisShardSplit split) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {

out.writeUTF(split.getStreamArn());
out.writeUTF(split.getShardId());
out.writeUTF(split.getStartingPosition().getShardIteratorType().toString());
if (split.getStartingPosition().getStartingMarker() == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Object startingMarker = split.getStartingPosition().getStartingMarker();
out.writeBoolean(startingMarker instanceof Instant);
if (startingMarker instanceof Instant) {
out.writeLong(((Instant) startingMarker).toEpochMilli());
}
out.writeBoolean(startingMarker instanceof String);
if (startingMarker instanceof String) {
out.writeUTF((String) startingMarker);
}
}
out.writeInt(split.getParentShardIds().size());
for (String parentShardId : split.getParentShardIds()) {
out.writeUTF(parentShardId);
}
out.writeUTF(split.getStartingHashKey());
out.writeUTF(split.getEndingHashKey());

out.flush();
return baos.toByteArray();
}
}

@Override
public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
Expand Down Expand Up @@ -140,7 +176,8 @@ public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOEx
}

Set<String> parentShardIds = new HashSet<>();
if (version == CURRENT_VERSION) {
// parentShardIds was added in V1
if (version >= 1) {
int parentShardCount = in.readInt();
for (int i = 0; i < parentShardCount; i++) {
parentShardIds.add(in.readUTF());
Expand All @@ -149,21 +186,29 @@ public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOEx

String startingHashKey;
String endingHashKey;
if (version == CURRENT_VERSION) {
// startingHashKey and endingHashKey were added in V1
if (version >= 1) {
startingHashKey = in.readUTF();
endingHashKey = in.readUTF();
} else {
startingHashKey = "-1";
endingHashKey = "0";
}

boolean finished = false;
// isFinished was added in V2
if (version >= 2) {
finished = in.readBoolean();
}

return new KinesisShardSplit(
streamArn,
shardId,
new StartingPosition(shardIteratorType, startingMarker),
parentShardIds,
startingHashKey,
endingHashKey);
endingHashKey,
finished);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ void testNoRegisteredReaders() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> assigner.assign(split, assignerContext))
.withMessageContaining(
"Expected at least one registered reader. Unable to assign split.");
String.format(
"Expected at least one registered reader. Unable to assign split with id: %s.",
split.splitId()));
}

private void createReaderWithAssignedSplits(
Expand Down
Loading