-
Notifications
You must be signed in to change notification settings - Fork 47
Enable HTTP Sink request retries #120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
package com.getindata.connectors.http.internal.sink; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.List; | ||
|
@@ -10,13 +11,19 @@ | |
|
||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.flink.api.connector.sink2.Sink; | ||
import org.apache.flink.connector.base.DeliveryGuarantee; | ||
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; | ||
import org.apache.flink.connector.base.sink.writer.BufferedRequestState; | ||
import org.apache.flink.connector.base.sink.writer.ElementConverter; | ||
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; | ||
import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy; | ||
import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy; | ||
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; | ||
import org.apache.flink.metrics.Counter; | ||
import org.apache.flink.util.concurrent.ExecutorThreadFactory; | ||
|
||
import com.getindata.connectors.http.internal.SinkHttpClient; | ||
import com.getindata.connectors.http.internal.SinkHttpClientResponse; | ||
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; | ||
import com.getindata.connectors.http.internal.utils.ThreadUtils; | ||
|
||
|
@@ -32,6 +39,9 @@ | |
@Slf4j | ||
public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequestEntry> { | ||
|
||
private static final int AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE = 10; | ||
private static final double AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR = 0.99D; | ||
|
||
private static final String HTTP_SINK_WRITER_THREAD_POOL_SIZE = "4"; | ||
|
||
/** | ||
|
@@ -45,6 +55,8 @@ public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequ | |
|
||
private final Counter numRecordsSendErrorsCounter; | ||
|
||
private final DeliveryGuarantee deliveryGuarantee; | ||
|
||
public HttpSinkWriter( | ||
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter, | ||
Sink.InitContext context, | ||
|
@@ -54,13 +66,26 @@ public HttpSinkWriter( | |
long maxBatchSizeInBytes, | ||
long maxTimeInBufferMS, | ||
long maxRecordSizeInBytes, | ||
DeliveryGuarantee deliveryGuarantee, | ||
String endpointUrl, | ||
SinkHttpClient sinkHttpClient, | ||
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates, | ||
Properties properties) { | ||
|
||
super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, | ||
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates); | ||
super( | ||
elementConverter, | ||
context, | ||
AsyncSinkWriterConfiguration.builder() | ||
.setMaxBatchSize(maxBatchSize) | ||
.setMaxBatchSizeInBytes(maxBatchSizeInBytes) | ||
.setMaxInFlightRequests(maxInFlightRequests) | ||
.setMaxBufferedRequests(maxBufferedRequests) | ||
.setMaxTimeInBufferMS(maxTimeInBufferMS) | ||
.setMaxRecordSizeInBytes(maxRecordSizeInBytes) | ||
.setRateLimitingStrategy( | ||
buildRateLimitingStrategy(maxInFlightRequests, maxBatchSize)) | ||
.build(), | ||
bufferedRequestStates); | ||
this.deliveryGuarantee = deliveryGuarantee; | ||
this.endpointUrl = endpointUrl; | ||
this.sinkHttpClient = sinkHttpClient; | ||
|
||
|
@@ -79,6 +104,19 @@ public HttpSinkWriter( | |
"http-sink-writer-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER)); | ||
} | ||
|
||
private static RateLimitingStrategy buildRateLimitingStrategy( | ||
int maxInFlightRequests, int maxBatchSize) { | ||
return CongestionControlRateLimitingStrategy.builder() | ||
.setMaxInFlightRequests(maxInFlightRequests) | ||
.setInitialMaxInFlightMessages(maxBatchSize) | ||
.setScalingStrategy( | ||
AIMDScalingStrategy.builder(maxBatchSize * maxInFlightRequests) | ||
.setIncreaseRate(AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE) | ||
.setDecreaseFactor(AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR) | ||
.build()) | ||
.build(); | ||
} | ||
|
||
// TODO: Reintroduce retries by adding backoff policy | ||
@Override | ||
protected void submitRequestEntries( | ||
|
@@ -87,37 +125,61 @@ protected void submitRequestEntries( | |
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl); | ||
future.whenCompleteAsync((response, err) -> { | ||
if (err != null) { | ||
int failedRequestsNumber = requestEntries.size(); | ||
log.error( | ||
"Http Sink fatally failed to write all {} requests", | ||
failedRequestsNumber); | ||
numRecordsSendErrorsCounter.inc(failedRequestsNumber); | ||
|
||
// TODO: Make `HttpSinkInternal` retry the failed requests. | ||
// Currently, it does not retry those at all, only adds their count | ||
// to the `numRecordsSendErrors` metric. It is due to the fact we do not have | ||
// a clear image how we want to do it, so it would be both efficient and correct. | ||
//requestResult.accept(requestEntries); | ||
} else if (response.getFailedRequests().size() > 0) { | ||
int failedRequestsNumber = response.getFailedRequests().size(); | ||
log.error("Http Sink failed to write and will retry {} requests", | ||
failedRequestsNumber); | ||
numRecordsSendErrorsCounter.inc(failedRequestsNumber); | ||
|
||
// TODO: Make `HttpSinkInternal` retry the failed requests. Currently, | ||
// it does not retry those at all, only adds their count to the | ||
// `numRecordsSendErrors` metric. It is due to the fact we do not have | ||
// a clear image how we want to do it, so it would be both efficient and correct. | ||
|
||
//requestResult.accept(response.getFailedRequests()); | ||
//} else { | ||
//requestResult.accept(Collections.emptyList()); | ||
//} | ||
handleFullyFailedRequest(err, requestEntries, requestResult); | ||
} else if (response.getRequests().stream().anyMatch(r -> !r.isSuccessful())) { | ||
handlePartiallyFailedRequest(response, requestEntries, requestResult); | ||
} else { | ||
requestResult.accept(Collections.emptyList()); | ||
} | ||
requestResult.accept(Collections.emptyList()); | ||
}, sinkWriterThreadPool); | ||
} | ||
|
||
private void handleFullyFailedRequest(Throwable err, | ||
List<HttpSinkRequestEntry> requestEntries, | ||
Consumer<List<HttpSinkRequestEntry>> requestResult) { | ||
int failedRequestsNumber = requestEntries.size(); | ||
log.error( | ||
"Http Sink fatally failed to write all {} requests", | ||
failedRequestsNumber); | ||
numRecordsSendErrorsCounter.inc(failedRequestsNumber); | ||
|
||
if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) { | ||
// Retry all requests. | ||
requestResult.accept(requestEntries); | ||
} else if (deliveryGuarantee == DeliveryGuarantee.NONE) { | ||
// Do not retry failed requests. | ||
requestResult.accept(Collections.emptyList()); | ||
} | ||
} | ||
|
||
private void handlePartiallyFailedRequest(SinkHttpClientResponse response, | ||
List<HttpSinkRequestEntry> requestEntries, | ||
Consumer<List<HttpSinkRequestEntry>> requestResult) { | ||
long failedRequestsNumber = response.getRequests().stream() | ||
.filter(r -> !r.isSuccessful()) | ||
.count(); | ||
log.error("Http Sink failed to write and will retry {} requests", | ||
failedRequestsNumber); | ||
numRecordsSendErrorsCounter.inc(failedRequestsNumber); | ||
|
||
if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) { | ||
// Assumption: the order of response.requests is the same as requestEntries. | ||
// See com.getindata.connectors.http.internal.sink.httpclient. | ||
// JavaNetSinkHttpClient#putRequests where requests are submitted sequentially and | ||
// then their futures are joined sequentially too. | ||
List<HttpSinkRequestEntry> failedRequestEntries = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have multiple values for the same key in one list of |
||
for (int i = 0; i < response.getRequests().size(); ++i) { | ||
if (!response.getRequests().get(i).isSuccessful()) { | ||
failedRequestEntries.add(requestEntries.get(i)); | ||
} | ||
} | ||
requestResult.accept(failedRequestEntries); | ||
} else if (deliveryGuarantee == DeliveryGuarantee.NONE) { | ||
// Do not retry failed requests. | ||
requestResult.accept(Collections.emptyList()); | ||
} | ||
} | ||
|
||
@Override | ||
protected long getSizeInBytes(HttpSinkRequestEntry s) { | ||
return s.getSizeInBytes(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a google and found. https://stackoverflow.com/questions/47680711/which-http-errors-should-never-trigger-an-automatic-retry . It seems to be that you should only retry if the status code is retriable. Maybe group into successful, retriable and failed (no retry).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Good point. I may refactor the code so that there are 3 statuses as you suggested. The change may also affect lookup, not only sink. So as a "side-effect" I may introduce lookup retry feature :)