Skip to content

Commit 9d6746b

Browse files
leekeiabstractionhlteoh37
authored andcommitted
[FLINK-36947][Connectors/Kinesis] Fix issue where excessive GetRecords in PollingKinesisShardSplitReader calls are made on idle source causing throttling
[FLINK-36939][Connectors/Kinesis] Fix issue where excessive BlockingQueue.poll() in FanOutKinesisShardSplitReader are made on idle source causing high CPU utilisation
1 parent ca96d84 commit 9d6746b

File tree

12 files changed

+364
-23
lines changed

12 files changed

+364
-23
lines changed

flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,12 @@ under the License.
153153
<artifactId>flink-architecture-tests-test</artifactId>
154154
<scope>test</scope>
155155
</dependency>
156+
157+
<dependency>
158+
<groupId>org.awaitility</groupId>
159+
<artifactId>awaitility</artifactId>
160+
<scope>test</scope>
161+
</dependency>
156162
</dependencies>
157163

158164
<build>

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
import java.util.function.Supplier;
8282

8383
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME;
84-
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
8584
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION;
8685
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION;
8786
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MIN_DELAY_OPTION;
@@ -228,7 +227,7 @@ private Supplier<SplitReader<Record, KinesisShardSplit>> getKinesisShardSplitRea
228227
createKinesisAsyncStreamProxy(streamArn, sourceConfig),
229228
consumerArn,
230229
shardMetricGroupMap,
231-
sourceConfig.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT));
230+
sourceConfig);
232231
default:
233232
throw new IllegalArgumentException("Unsupported reader type: " + readerType);
234233
}

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java

+7
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,13 @@ public enum ConsumerLifecycle {
8585
.defaultValue(ReaderType.POLLING)
8686
.withDescription("The type of reader used to read from the Kinesis stream.");
8787

88+
public static final ConfigOption<Duration> READER_EMPTY_RECORDS_FETCH_INTERVAL =
89+
ConfigOptions.key("source.reader.empty-records-fetch-interval")
90+
.durationType()
91+
.defaultValue(Duration.ofMillis(250))
92+
.withDescription(
93+
"The interval in milliseconds between fetches with empty records");
94+
8895
public static final ConfigOption<ConsumerLifecycle> EFO_CONSUMER_LIFECYCLE =
8996
ConfigOptions.key("source.efo.lifecycle")
9097
.enumType(ConsumerLifecycle.class)

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java

+81-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package org.apache.flink.connector.kinesis.source.reader;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.configuration.Configuration;
2223
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
2324
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
2425
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
26+
import org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions;
2527
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
2628
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
2729
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
@@ -38,12 +40,14 @@
3840
import java.util.ArrayDeque;
3941
import java.util.Collection;
4042
import java.util.Collections;
43+
import java.util.Date;
4144
import java.util.Deque;
4245
import java.util.HashSet;
4346
import java.util.Iterator;
4447
import java.util.List;
4548
import java.util.Map;
4649
import java.util.Set;
50+
import java.util.WeakHashMap;
4751

4852
import static java.util.Collections.singleton;
4953

@@ -60,16 +64,30 @@ public abstract class KinesisShardSplitReaderBase
6064
private final Set<String> pausedSplitIds = new HashSet<>();
6165
private final Map<String, KinesisShardMetrics> shardMetricGroupMap;
6266

63-
protected KinesisShardSplitReaderBase(Map<String, KinesisShardMetrics> shardMetricGroupMap) {
67+
private final long emptyRecordsIntervalMillis;
68+
69+
private final Map<KinesisShardSplitState, Long> scheduledFetchTimes = new WeakHashMap<>();
70+
71+
protected KinesisShardSplitReaderBase(
72+
Map<String, KinesisShardMetrics> shardMetricGroupMap, Configuration configuration) {
6473
this.shardMetricGroupMap = shardMetricGroupMap;
74+
this.emptyRecordsIntervalMillis =
75+
configuration
76+
.get(KinesisSourceConfigOptions.READER_EMPTY_RECORDS_FETCH_INTERVAL)
77+
.toMillis();
6578
}
6679

6780
@Override
6881
public RecordsWithSplitIds<Record> fetch() throws IOException {
6982
KinesisShardSplitState splitState = assignedSplits.poll();
7083

7184
// When there are no assigned splits, return quickly
72-
if (splitState == null) {
85+
if (skipWhenNoAssignedSplit(splitState)) {
86+
return INCOMPLETE_SHARD_EMPTY_RECORDS;
87+
}
88+
89+
if (skipUntilScheduledFetchTime(splitState)) {
90+
assignedSplits.add(splitState);
7391
return INCOMPLETE_SHARD_EMPTY_RECORDS;
7492
}
7593

@@ -82,6 +100,7 @@ public RecordsWithSplitIds<Record> fetch() throws IOException {
82100
RecordBatch recordBatch;
83101
try {
84102
recordBatch = fetchRecords(splitState);
103+
scheduleNextFetchTime(splitState, recordBatch);
85104
} catch (ResourceNotFoundException e) {
86105
LOG.warn(
87106
"Failed to fetch records from shard {}: shard no longer exists. Marking split as complete",
@@ -125,6 +144,66 @@ public RecordsWithSplitIds<Record> fetch() throws IOException {
125144
recordBatch.isCompleted());
126145
}
127146

147+
private boolean skipWhenNoAssignedSplit(KinesisShardSplitState splitState) throws IOException {
148+
if (splitState == null) {
149+
try {
150+
// Small sleep to prevent busy polling
151+
Thread.sleep(1);
152+
return true;
153+
} catch (InterruptedException e) {
154+
Thread.currentThread().interrupt();
155+
throw new IOException("Sleep was interrupted while skipping no assigned split", e);
156+
}
157+
}
158+
159+
return false;
160+
}
161+
162+
private boolean skipUntilScheduledFetchTime(KinesisShardSplitState splitState)
163+
throws IOException {
164+
if (scheduledFetchTimes.containsKey(splitState)
165+
&& scheduledFetchTimes.get(splitState) > System.currentTimeMillis()) {
166+
try {
167+
// Small sleep to prevent busy polling
168+
Thread.sleep(1);
169+
return true;
170+
} catch (InterruptedException e) {
171+
Thread.currentThread().interrupt();
172+
throw new IOException(
173+
"Sleep was interrupted while skipping until scheduled fetch record time",
174+
e);
175+
}
176+
}
177+
178+
return false;
179+
}
180+
181+
/**
182+
* Schedules next fetch time, to be called immediately on the result of a fetchRecords() call.
183+
*
184+
* <p>If recordBatch does not contain records, next fetchRecords() is scheduled. Before
185+
* scheduled time, fetcher thread will skip fetching (and have small sleep) for the split.
186+
*
187+
* <p>If recordBatch is not empty, next fetchRecords() time is not scheduled resulting in next
188+
* fetch on the split is performed at first opportunity.
189+
*
190+
* @param splitState splitState on which the fetchRecords() was called on
191+
* @param recordBatch recordBatch returned by fetchRecords()
192+
*/
193+
private void scheduleNextFetchTime(KinesisShardSplitState splitState, RecordBatch recordBatch) {
194+
if (recordBatch == null || recordBatch.getRecords().isEmpty()) {
195+
long scheduledGetRecordTimeMillis =
196+
System.currentTimeMillis() + emptyRecordsIntervalMillis;
197+
this.scheduledFetchTimes.put(splitState, scheduledGetRecordTimeMillis);
198+
if (LOG.isDebugEnabled()) {
199+
LOG.debug(
200+
"Fetched zero records from split {}, scheduling next fetch to {}",
201+
splitState.getSplitId(),
202+
new Date(scheduledGetRecordTimeMillis).toInstant());
203+
}
204+
}
205+
}
206+
128207
/**
129208
* Main method implementations must implement to fetch records from Kinesis.
130209
*

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.connector.kinesis.source.reader.fanout;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.configuration.Configuration;
2223
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
2324
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
2425
import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
@@ -32,6 +33,8 @@
3233
import java.util.HashMap;
3334
import java.util.Map;
3435

36+
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
37+
3538
/**
3639
* An implementation of the KinesisShardSplitReader that consumes from Kinesis using Enhanced
3740
* Fan-Out and HTTP/2.
@@ -48,11 +51,11 @@ public FanOutKinesisShardSplitReader(
4851
AsyncStreamProxy asyncStreamProxy,
4952
String consumerArn,
5053
Map<String, KinesisShardMetrics> shardMetricGroupMap,
51-
Duration subscriptionTimeout) {
52-
super(shardMetricGroupMap);
54+
Configuration configuration) {
55+
super(shardMetricGroupMap, configuration);
5356
this.asyncStreamProxy = asyncStreamProxy;
5457
this.consumerArn = consumerArn;
55-
this.subscriptionTimeout = subscriptionTimeout;
58+
this.subscriptionTimeout = configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT);
5659
}
5760

5861
@Override

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public PollingKinesisShardSplitReader(
4444
StreamProxy kinesisProxy,
4545
Map<String, KinesisShardMetrics> shardMetricGroupMap,
4646
Configuration configuration) {
47-
super(shardMetricGroupMap);
47+
super(shardMetricGroupMap, configuration);
4848
this.kinesis = kinesisProxy;
4949
this.configuration = configuration;
5050
this.maxRecordsToGet = configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX);

0 commit comments

Comments
 (0)