diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java index 0a39450f2..c4302d8d7 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java @@ -137,16 +137,26 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } } - private void handleFinishedSplits(int subtask, SplitsFinishedEvent splitsFinishedEvent) { + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); - splitAssignment - .get(subtask) - .removeIf( - split -> - splitsFinishedEvent - .getFinishedSplitIds() - .contains(split.splitId())); + Set splitsAssignment = splitAssignment.get(subtaskId); + // during recovery, splitAssignment may return null since there might be no split assigned + // to the subtask, but there might be SplitsFinishedEvent from that subtask. + // We will not do child shard assignment if that is the case since that might lead to child + // shards trying to get assigned before there being any readers. + if (splitsAssignment == null) { + LOG.info( + "handleFinishedSplits called for subtask: {} which doesn't have any " + + "assigned splits right now. This might happen due to job restarts. " + + "Child shard discovery might be delayed until we have enough readers." + + "Finished split ids: {}", + subtaskId, + splitsFinishedEvent.getFinishedSplitIds()); + return; + } + splitsAssignment.removeIf( + split -> splitsFinishedEvent.getFinishedSplitIds().contains(split.splitId())); assignSplits(); } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java index f5d45dfbb..d2de3ff9d 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java @@ -44,7 +44,8 @@ public class UniformShardAssigner implements KinesisShardAssigner { public int assign(KinesisShardSplit split, Context context) { Preconditions.checkArgument( !context.getRegisteredReaders().isEmpty(), - "Expected at least one registered reader. Unable to assign split."); + "Expected at least one registered reader. Unable to assign split with id: %s.", + split.splitId()); BigInteger hashKeyStart = new BigInteger(split.getStartingHashKey()); BigInteger hashKeyEnd = new BigInteger(split.getEndingHashKey()); BigInteger hashKeyMid = hashKeyStart.add(hashKeyEnd).divide(TWO); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java index dcd60fbd1..0d38bd517 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java @@ -159,6 +159,12 @@ private boolean verifyAllParentSplitsAreFinished(KinesisShardSplit split) { return allParentsFinished; } + /** + * Checks if split with specified id is finished. + * + * @param splitId Id of the split to check + * @return true if split is finished, otherwise false + */ private boolean isFinished(String splitId) { return !knownSplits.containsKey(splitId); } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java index 281d7fe21..ccadba91b 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java @@ -33,9 +33,14 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.Record; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; /** * Coordinates the reading from assigned splits. Runs on the TaskManager. @@ -49,6 +54,8 @@ public class KinesisStreamsSourceReader private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceReader.class); private final Map shardMetricGroupMap; + private final NavigableMap> finishedSplits; + private long currentCheckpointId; public KinesisStreamsSourceReader( SingleThreadFetcherManager splitFetcherManager, @@ -58,15 +65,67 @@ public KinesisStreamsSourceReader( Map shardMetricGroupMap) { super(splitFetcherManager, recordEmitter, config, context); this.shardMetricGroupMap = shardMetricGroupMap; + this.finishedSplits = new TreeMap<>(); + this.currentCheckpointId = Long.MIN_VALUE; } @Override protected void onSplitFinished(Map finishedSplitIds) { + if (finishedSplitIds.isEmpty()) { + return; + } + finishedSplits.computeIfAbsent(currentCheckpointId, k -> new HashSet<>()); + finishedSplitIds.values().stream() + .map( + finishedSplit -> + new KinesisShardSplit( + finishedSplit.getStreamArn(), + finishedSplit.getShardId(), + finishedSplit.getNextStartingPosition(), + finishedSplit.getKinesisShardSplit().getParentShardIds(), + finishedSplit.getKinesisShardSplit().getStartingHashKey(), + finishedSplit.getKinesisShardSplit().getEndingHashKey(), + true)) + .forEach(split -> finishedSplits.get(currentCheckpointId).add(split)); + context.sendSourceEventToCoordinator( new SplitsFinishedEvent(new HashSet<>(finishedSplitIds.keySet()))); finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup); } + /** + * At snapshot, we also store the pending finished split ids in the current checkpoint so that + * in case we have to restore the reader from state, we also send the finished split ids + * otherwise we run a risk of data loss during restarts of the source because of the + * SplitsFinishedEvent going missing. + * + * @param checkpointId the checkpoint id + * @return a list of finished splits + */ + @Override + public List snapshotState(long checkpointId) { + this.currentCheckpointId = checkpointId; + List splits = new ArrayList<>(super.snapshotState(checkpointId)); + + if (!finishedSplits.isEmpty()) { + // Add all finished splits to the snapshot + finishedSplits.values().forEach(splits::addAll); + } + + return splits; + } + + /** + * During notifyCheckpointComplete, we should clean up the state of finished splits that are + * less than or equal to the checkpoint id. + * + * @param checkpointId the checkpoint id + */ + @Override + public void notifyCheckpointComplete(long checkpointId) { + finishedSplits.headMap(checkpointId, true).clear(); + } + @Override protected KinesisShardSplitState initializedState(KinesisShardSplit split) { return new KinesisShardSplitState(split); @@ -79,8 +138,17 @@ protected KinesisShardSplit toSplitType(String splitId, KinesisShardSplitState s @Override public void addSplits(List splits) { - splits.forEach(this::registerShardMetricGroup); - super.addSplits(splits); + List unfinishedSplits = new ArrayList<>(); + for (KinesisShardSplit split : splits) { + if (split.isFinished()) { + context.sendSourceEventToCoordinator( + new SplitsFinishedEvent(Collections.singleton(split.splitId()))); + } else { + unfinishedSplits.add(split); + } + } + unfinishedSplits.forEach(this::registerShardMetricGroup); + super.addSplits(unfinishedSplits); } @Override diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java index fb5fef5af..cefa24983 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java @@ -44,6 +44,7 @@ public final class KinesisShardSplit implements SourceSplit { private final Set parentShardIds; private final String startingHashKey; private final String endingHashKey; + private final boolean finished; public KinesisShardSplit( String streamArn, @@ -52,6 +53,24 @@ public KinesisShardSplit( Set parentShardIds, String startingHashKey, String endingHashKey) { + this( + streamArn, + shardId, + startingPosition, + parentShardIds, + startingHashKey, + endingHashKey, + false); + } + + public KinesisShardSplit( + String streamArn, + String shardId, + StartingPosition startingPosition, + Set parentShardIds, + String startingHashKey, + String endingHashKey, + boolean finished) { checkNotNull(streamArn, "streamArn cannot be null"); checkNotNull(shardId, "shardId cannot be null"); checkNotNull(startingPosition, "startingPosition cannot be null"); @@ -65,6 +84,11 @@ public KinesisShardSplit( this.parentShardIds = new HashSet<>(parentShardIds); this.startingHashKey = startingHashKey; this.endingHashKey = endingHashKey; + this.finished = finished; + } + + public boolean isFinished() { + return finished; } @Override @@ -116,6 +140,8 @@ public String toString() { + ", endingHashKey='" + endingHashKey + '\'' + + ", finished=" + + finished + '}'; } @@ -133,7 +159,8 @@ public boolean equals(Object o) { && Objects.equals(startingPosition, that.startingPosition) && Objects.equals(parentShardIds, that.parentShardIds) && Objects.equals(startingHashKey, that.startingHashKey) - && Objects.equals(endingHashKey, that.endingHashKey); + && Objects.equals(endingHashKey, that.endingHashKey) + && Objects.equals(finished, that.finished); } @Override @@ -144,6 +171,7 @@ public int hashCode() { startingPosition, parentShardIds, startingHashKey, - endingHashKey); + endingHashKey, + finished); } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java index 5433a64d6..5bb83c0e2 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java @@ -42,8 +42,8 @@ @Internal public class KinesisShardSplitSerializer implements SimpleVersionedSerializer { - private static final int CURRENT_VERSION = 1; - private static final Set COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1)); + private static final int CURRENT_VERSION = 2; + private static final Set COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1, 2)); @Override public int getVersion() { @@ -78,6 +78,7 @@ public byte[] serialize(KinesisShardSplit split) throws IOException { } out.writeUTF(split.getStartingHashKey()); out.writeUTF(split.getEndingHashKey()); + out.writeBoolean(split.isFinished()); out.flush(); return baos.toByteArray(); @@ -112,6 +113,41 @@ byte[] serializeV0(KinesisShardSplit split) throws IOException { } } + /** This method used only to test backwards compatibility of deserialization logic. */ + @VisibleForTesting + byte[] serializeV1(KinesisShardSplit split) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + + out.writeUTF(split.getStreamArn()); + out.writeUTF(split.getShardId()); + out.writeUTF(split.getStartingPosition().getShardIteratorType().toString()); + if (split.getStartingPosition().getStartingMarker() == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + Object startingMarker = split.getStartingPosition().getStartingMarker(); + out.writeBoolean(startingMarker instanceof Instant); + if (startingMarker instanceof Instant) { + out.writeLong(((Instant) startingMarker).toEpochMilli()); + } + out.writeBoolean(startingMarker instanceof String); + if (startingMarker instanceof String) { + out.writeUTF((String) startingMarker); + } + } + out.writeInt(split.getParentShardIds().size()); + for (String parentShardId : split.getParentShardIds()) { + out.writeUTF(parentShardId); + } + out.writeUTF(split.getStartingHashKey()); + out.writeUTF(split.getEndingHashKey()); + + out.flush(); + return baos.toByteArray(); + } + } + @Override public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); @@ -140,7 +176,8 @@ public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOEx } Set parentShardIds = new HashSet<>(); - if (version == CURRENT_VERSION) { + // parentShardIds was added in V1 + if (version >= 1) { int parentShardCount = in.readInt(); for (int i = 0; i < parentShardCount; i++) { parentShardIds.add(in.readUTF()); @@ -149,7 +186,8 @@ public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOEx String startingHashKey; String endingHashKey; - if (version == CURRENT_VERSION) { + // startingHashKey and endingHashKey were added in V1 + if (version >= 1) { startingHashKey = in.readUTF(); endingHashKey = in.readUTF(); } else { @@ -157,13 +195,20 @@ public KinesisShardSplit deserialize(int version, byte[] serialized) throws IOEx endingHashKey = "0"; } + boolean finished = false; + // isFinished was added in V2 + if (version >= 2) { + finished = in.readBoolean(); + } + return new KinesisShardSplit( streamArn, shardId, new StartingPosition(shardIteratorType, startingMarker), parentShardIds, startingHashKey, - endingHashKey); + endingHashKey, + finished); } } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java index 338b72261..dcd873d7e 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java @@ -101,7 +101,9 @@ void testNoRegisteredReaders() { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> assigner.assign(split, assignerContext)) .withMessageContaining( - "Expected at least one registered reader. Unable to assign split."); + String.format( + "Expected at least one registered reader. Unable to assign split with id: %s.", + split.splitId())); } private void createReaderWithAssignedSplits( diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java index dfdc584f0..611863877 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java @@ -36,14 +36,18 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.getFinishedTestSplit; import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit; import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplitState; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; @@ -147,4 +151,120 @@ void testAddSplitsRegistersAndUpdatesShardMetricGroup() throws Exception { TestUtil.assertMillisBehindLatest( split, TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE, metricListener); } + + @Test + void testSnapshotStateWithFinishedSplits() throws Exception { + // Create and add a split + KinesisShardSplit split = getTestSplit(); + List splits = Collections.singletonList(split); + sourceReader.addSplits(splits); + + // Set checkpoint ID by taking initial snapshot + List initialSnapshot = sourceReader.snapshotState(1L); + assertThat(initialSnapshot).hasSize(1).containsExactly(split); + + // Simulate split finishing + Map finishedSplits = new HashMap<>(); + finishedSplits.put(split.splitId(), new KinesisShardSplitState(split)); + sourceReader.onSplitFinished(finishedSplits); + + // Take another snapshot + List snapshotSplits = sourceReader.snapshotState(2L); + List snapshotFinishedSplits = + snapshotSplits.stream() + .filter(KinesisShardSplit::isFinished) + .collect(Collectors.toList()); + // Verify we have 2 splits - the original split and the finished split + assertThat(snapshotSplits).hasSize(2); + assertThat(snapshotFinishedSplits) + .hasSize(1) + .allSatisfy( + s -> { + assertThat(s.splitId()).isEqualTo(split.splitId()); + }); + } + + @Test + void testAddSplitsWithStateRestoration() throws Exception { + KinesisShardSplit finishedSplit1 = getFinishedTestSplit("finished-split-1"); + KinesisShardSplit finishedSplit2 = getFinishedTestSplit("finished-split-2"); + + // Create active split + KinesisShardSplit activeSplit = getTestSplit(); + + List allSplits = + Arrays.asList(finishedSplit1, finishedSplit2, activeSplit); + + // Clear any previous events + testingReaderContext.clearSentEvents(); + + // Add splits + sourceReader.addSplits(allSplits); + + // Verify finished events were sent + List events = testingReaderContext.getSentEvents(); + assertThat(events) + .hasSize(2) + .allMatch(e -> e instanceof SplitsFinishedEvent) + .satisfiesExactlyInAnyOrder( + e -> + assertThat(((SplitsFinishedEvent) e).getFinishedSplitIds()) + .containsExactly("finished-split-1"), + e -> + assertThat(((SplitsFinishedEvent) e).getFinishedSplitIds()) + .containsExactly("finished-split-2")); + + // Verify metrics registered only for active split + assertThat(shardMetricGroupMap).hasSize(1).containsKey(activeSplit.splitId()); + } + + @Test + void testNotifyCheckpointCompleteRemovesFinishedSplits() throws Exception { + KinesisShardSplit split = getTestSplit(); + List splits = Collections.singletonList(split); + + sourceReader.addSplits(splits); + + // Simulate splits finishing at different checkpoints + Map finishedSplits1 = new HashMap<>(); + KinesisShardSplit finishedSplit1 = getFinishedTestSplit("split-1"); + finishedSplits1.put("split-1", new KinesisShardSplitState(finishedSplit1)); + sourceReader.snapshotState(1L); // Set checkpoint ID + sourceReader.onSplitFinished(finishedSplits1); + + Map finishedSplits2 = new HashMap<>(); + KinesisShardSplit finishedSplit2 = getFinishedTestSplit("split-2"); + finishedSplits2.put("split-2", new KinesisShardSplitState(finishedSplit2)); + sourceReader.snapshotState(2L); // Set checkpoint ID + sourceReader.onSplitFinished(finishedSplits2); + + // Take snapshot to verify initial state + List snapshotSplits = sourceReader.snapshotState(3L); + + assertThat(snapshotSplits).hasSize(3); + assertThat( + snapshotSplits.stream() + .filter(KinesisShardSplit::isFinished) + .map(KinesisShardSplit::splitId) + .collect(Collectors.toList())) + .hasSize(2) + .containsExactlyInAnyOrder("split-1", "split-2"); + + // Complete checkpoint 1 + sourceReader.notifyCheckpointComplete(1L); + + // Take another snapshot to verify state after completion + snapshotSplits = sourceReader.snapshotState(4L); + + // Verify checkpoint 1 splits were removed + assertThat(snapshotSplits).hasSize(2); + assertThat( + snapshotSplits.stream() + .filter(KinesisShardSplit::isFinished) + .map(KinesisShardSplit::splitId) + .collect(Collectors.toList())) + .hasSize(1) + .first() + .isEqualTo("split-2"); + } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java index 9b0662b75..0db4fd8fb 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java @@ -117,6 +117,53 @@ void testDeserializeWrongVersion() throws Exception { .withMessageContaining(String.valueOf(wrongVersionSerializer.getVersion())); } + @Test + void testSerializeAndDeserializeWithFinishedSplits() throws Exception { + final KinesisShardSplit initialSplit = + new KinesisShardSplit( + STREAM_ARN, + generateShardId(10), + StartingPosition.continueFromSequenceNumber("some-sequence-number"), + new HashSet<>(Arrays.asList(generateShardId(2), generateShardId(5))), + STARTING_HASH_KEY_TEST_VALUE, + ENDING_HASH_KEY_TEST_VALUE, + true); + + KinesisShardSplitSerializer serializer = new KinesisShardSplitSerializer(); + + byte[] serialized = serializer.serialize(initialSplit); + KinesisShardSplit deserializedSplit = + serializer.deserialize(serializer.getVersion(), serialized); + + assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit); + assertThat(deserializedSplit.isFinished()).isTrue(); + } + + @Test + void testDeserializeVersion1() throws Exception { + final KinesisShardSplitSerializer serializer = new KinesisShardSplitSerializer(); + + final KinesisShardSplit initialSplit = + new KinesisShardSplit( + STREAM_ARN, + generateShardId(10), + StartingPosition.continueFromSequenceNumber("some-sequence-number"), + new HashSet<>(Arrays.asList(generateShardId(2), generateShardId(5))), + STARTING_HASH_KEY_TEST_VALUE, + ENDING_HASH_KEY_TEST_VALUE); + + byte[] oldSerializedState = serializer.serializeV1(initialSplit); + KinesisShardSplit deserializedSplit = serializer.deserialize(1, oldSerializedState); + + assertThat(deserializedSplit) + .usingRecursiveComparison( + RecursiveComparisonConfiguration.builder() + .withIgnoredFields("finished") + .build()) + .isEqualTo(initialSplit); + assertThat(deserializedSplit.isFinished()).isEqualTo(false); + } + private static class WrongVersionSerializer extends KinesisShardSplitSerializer { @Override public int getVersion() { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java index d00ea18bc..16dd517e6 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java @@ -22,10 +22,14 @@ import org.junit.jupiter.api.Test; import java.util.Collections; +import java.util.HashSet; +import java.util.NavigableMap; import java.util.Set; +import java.util.TreeMap; import static org.apache.flink.connector.kinesis.source.util.TestUtil.ENDING_HASH_KEY_TEST_VALUE; import static org.apache.flink.connector.kinesis.source.util.TestUtil.STARTING_HASH_KEY_TEST_VALUE; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; class KinesisShardSplitTest { @@ -130,4 +134,39 @@ void testEndingHashKeyNull() { void testEquals() { EqualsVerifier.forClass(KinesisShardSplit.class).verify(); } + + @Test + void testFinishedSplitsMapConstructor() { + NavigableMap> finishedSplitsMap = new TreeMap<>(); + Set splits = new HashSet<>(); + splits.add("split1"); + splits.add("split2"); + finishedSplitsMap.put(1L, splits); + + KinesisShardSplit split = + new KinesisShardSplit( + STREAM_ARN, + SHARD_ID, + STARTING_POSITION, + PARENT_SHARD_IDS, + STARTING_HASH_KEY_TEST_VALUE, + ENDING_HASH_KEY_TEST_VALUE, + true); + + assertThat(split.isFinished()).isTrue(); + } + + @Test + void testFinishedSplitsMapDefaultEmpty() { + KinesisShardSplit split = + new KinesisShardSplit( + STREAM_ARN, + SHARD_ID, + STARTING_POSITION, + PARENT_SHARD_IDS, + STARTING_HASH_KEY_TEST_VALUE, + ENDING_HASH_KEY_TEST_VALUE); + + assertThat(split.isFinished()).isFalse(); + } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java index 035d4496e..bdf706191 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java @@ -35,7 +35,9 @@ import java.math.BigInteger; import java.time.Instant; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.stream.IntStream; @@ -139,6 +141,17 @@ public static KinesisShardSplit getTestSplit(StartingPosition startingPosition) ENDING_HASH_KEY_TEST_VALUE); } + public static KinesisShardSplit getFinishedTestSplit(String shardId) { + return new KinesisShardSplit( + STREAM_ARN, + shardId, + StartingPosition.fromStart(), + new HashSet<>(Arrays.asList(generateShardId(2), generateShardId(5))), + STARTING_HASH_KEY_TEST_VALUE, + ENDING_HASH_KEY_TEST_VALUE, + true); + } + public static KinesisShardSplit getTestSplit( BigInteger startingHashKey, BigInteger endingHashKey) { return new KinesisShardSplit(