From 11813343e122126851440cce75429b7b20e9de63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Wed, 28 Aug 2024 14:27:58 +0200 Subject: [PATCH 1/3] Enable HTTP Sink request retries --- README.md | 4 +- .../getindata/connectors/http/HttpSink.java | 3 + .../connectors/http/HttpSinkBuilder.java | 16 ++++ .../http/internal/SinkHttpClientResponse.java | 32 +++++-- .../http/internal/sink/HttpSinkInternal.java | 10 ++- .../http/internal/sink/HttpSinkWriter.java | 85 +++++++++++++------ .../httpclient/JavaNetSinkHttpClient.java | 17 ++-- .../internal/table/sink/HttpDynamicSink.java | 2 + .../sink/HttpDynamicSinkConnectorOptions.java | 9 ++ .../sink/HttpDynamicTableSinkFactory.java | 8 ++ .../internal/sink/HttpSinkWriterTest.java | 2 + 11 files changed, 138 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index 3b3ce57a..3c9a3214 100644 --- a/README.md +++ b/README.md @@ -451,6 +451,7 @@ is set to `'true'`, it will be used as header value as is, without any extra mod | format | required | Specify what format to use. | | insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. | | sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. | +| sink.delivery-guarantee | optional | Defines the delivery semantic for the HTTP sink. Accepted enumerations are 'at-least-once', and 'none' (actually 'none' is the same as 'at-most-once'. 'exactly-once' semantic is not supported. | | sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. | | sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. | | sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. | @@ -573,9 +574,6 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json - Think about Retry Policy for Http Request - Check other `//TODO`'s. -### HTTP Sink -- Make `HttpSink` retry the failed requests. Currently, it does not retry those at all, only adds their count to the `numRecordsSendErrors` metric. It should be thoroughly thought over how to do it efficiently and then implemented. - ### [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#lookup-join
diff --git a/src/main/java/com/getindata/connectors/http/HttpSink.java b/src/main/java/com/getindata/connectors/http/HttpSink.java index 23faf100..8dc42b5a 100644 --- a/src/main/java/com/getindata/connectors/http/HttpSink.java +++ b/src/main/java/com/getindata/connectors/http/HttpSink.java @@ -3,6 +3,7 @@ import java.util.Properties; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.ElementConverter; import com.getindata.connectors.http.internal.HeaderPreprocessor; @@ -41,6 +42,7 @@ public class HttpSink extends HttpSinkInternal { long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, + DeliveryGuarantee deliveryGuarantee, String endpointUrl, HttpPostRequestCallback httpPostRequestCallback, HeaderPreprocessor headerPreprocessor, @@ -54,6 +56,7 @@ public class HttpSink extends HttpSinkInternal { maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, + deliveryGuarantee, endpointUrl, httpPostRequestCallback, headerPreprocessor, diff --git a/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java b/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java index 399eb35b..1574b232 100644 --- a/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java +++ b/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java @@ -4,8 +4,10 @@ import java.util.Properties; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.Preconditions; import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.SinkHttpClient; @@ -71,6 +73,8 @@ public class HttpSinkBuilder extends private final Properties properties = new Properties(); + private DeliveryGuarantee deliveryGuarantee; + // Mandatory field private String endpointUrl; @@ -92,6 +96,17 @@ public class HttpSinkBuilder extends this.headerPreprocessor = DEFAULT_HEADER_PREPROCESSOR; } + /** + * @param deliveryGuarantee HTTP Sink delivery guarantee + * @return {@link HttpSinkBuilder} itself + */ + public HttpSinkBuilder setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) { + Preconditions.checkArgument(deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE, + "Only at-least-once and none delivery guarantees are supported."); + this.deliveryGuarantee = deliveryGuarantee; + return this; + } + /** * @param endpointUrl the URL of the endpoint * @return {@link HttpSinkBuilder} itself @@ -181,6 +196,7 @@ public HttpSink build() { Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B), Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS), Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B), + Optional.ofNullable(deliveryGuarantee).orElse(DeliveryGuarantee.NONE), endpointUrl, httpPostRequestCallback, headerPreprocessor, diff --git a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java index b5637377..f75307a8 100644 --- a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java +++ b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java @@ -1,6 +1,7 @@ package com.getindata.connectors.http.internal; import java.util.List; +import java.util.stream.Collectors; import lombok.Data; import lombok.NonNull; @@ -11,21 +12,36 @@ /** * Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted - * to write, divided into two lists — successful and failed ones. + * to write. */ @Data @ToString public class SinkHttpClientResponse { /** - * A list of successfully written requests. + * A list of requests along with write status. */ @NonNull - private final List successfulRequests; + private final List requests; - /** - * A list of requests that {@link SinkHttpClient} failed to write. - */ - @NonNull - private final List failedRequests; + public List getSuccessfulRequests() { + return requests.stream() + .filter(ResponseItem::isSuccessful) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + public List getFailedRequests() { + return requests.stream() + .filter(r -> !r.isSuccessful()) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + @Data + @ToString + public static class ResponseItem { + private final HttpRequest request; + private final boolean successful; + } } diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java index de37faac..c460a6f1 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java @@ -5,6 +5,7 @@ import java.util.Collections; import java.util.Properties; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.AsyncSinkBase; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; @@ -61,6 +62,8 @@ public class HttpSinkInternal extends AsyncSinkBase httpPostRequestCallback, HeaderPreprocessor headerPreprocessor, @@ -94,9 +98,11 @@ protected HttpSinkInternal( maxTimeInBufferMS, maxRecordSizeInBytes ); - Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(endpointUrl), "The endpoint URL must be set when initializing HTTP Sink."); + Preconditions.checkArgument(deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE, + "Only at-least-once and none delivery guarantees are supported."); + this.deliveryGuarantee = deliveryGuarantee; this.endpointUrl = endpointUrl; this.httpPostRequestCallback = Preconditions.checkNotNull( @@ -132,6 +138,7 @@ public StatefulSinkWriter> cr getMaxBatchSizeInBytes(), getMaxTimeInBufferMS(), getMaxRecordSizeInBytes(), + deliveryGuarantee, endpointUrl, sinkHttpClientBuilder.build( properties, @@ -159,6 +166,7 @@ public StatefulSinkWriter> re getMaxBatchSizeInBytes(), getMaxTimeInBufferMS(), getMaxRecordSizeInBytes(), + deliveryGuarantee, endpointUrl, sinkHttpClientBuilder.build( properties, diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java index d17e9213..7fe06e79 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java @@ -1,5 +1,6 @@ package com.getindata.connectors.http.internal.sink; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -10,6 +11,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; @@ -17,6 +19,7 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory; import com.getindata.connectors.http.internal.SinkHttpClient; +import com.getindata.connectors.http.internal.SinkHttpClientResponse; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.utils.ThreadUtils; @@ -45,6 +48,8 @@ public class HttpSinkWriter extends AsyncSinkWriter elementConverter, Sink.InitContext context, @@ -54,6 +59,7 @@ public HttpSinkWriter( long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, + DeliveryGuarantee deliveryGuarantee, String endpointUrl, SinkHttpClient sinkHttpClient, Collection> bufferedRequestStates, @@ -61,6 +67,7 @@ public HttpSinkWriter( super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates); + this.deliveryGuarantee = deliveryGuarantee; this.endpointUrl = endpointUrl; this.sinkHttpClient = sinkHttpClient; @@ -87,37 +94,61 @@ protected void submitRequestEntries( var future = sinkHttpClient.putRequests(requestEntries, endpointUrl); future.whenCompleteAsync((response, err) -> { if (err != null) { - int failedRequestsNumber = requestEntries.size(); - log.error( - "Http Sink fatally failed to write all {} requests", - failedRequestsNumber); - numRecordsSendErrorsCounter.inc(failedRequestsNumber); - - // TODO: Make `HttpSinkInternal` retry the failed requests. - // Currently, it does not retry those at all, only adds their count - // to the `numRecordsSendErrors` metric. It is due to the fact we do not have - // a clear image how we want to do it, so it would be both efficient and correct. - //requestResult.accept(requestEntries); - } else if (response.getFailedRequests().size() > 0) { - int failedRequestsNumber = response.getFailedRequests().size(); - log.error("Http Sink failed to write and will retry {} requests", - failedRequestsNumber); - numRecordsSendErrorsCounter.inc(failedRequestsNumber); - - // TODO: Make `HttpSinkInternal` retry the failed requests. Currently, - // it does not retry those at all, only adds their count to the - // `numRecordsSendErrors` metric. It is due to the fact we do not have - // a clear image how we want to do it, so it would be both efficient and correct. - - //requestResult.accept(response.getFailedRequests()); - //} else { - //requestResult.accept(Collections.emptyList()); - //} + handleFullyFailedRequest(err, requestEntries, requestResult); + } else if (response.getRequests().stream().anyMatch(r -> !r.isSuccessful())) { + handlePartiallyFailedRequest(response, requestEntries, requestResult); + } else { + requestResult.accept(Collections.emptyList()); } - requestResult.accept(Collections.emptyList()); }, sinkWriterThreadPool); } + private void handleFullyFailedRequest(Throwable err, + List requestEntries, + Consumer> requestResult) { + int failedRequestsNumber = requestEntries.size(); + log.error( + "Http Sink fatally failed to write all {} requests", + failedRequestsNumber); + numRecordsSendErrorsCounter.inc(failedRequestsNumber); + + if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) { + // Retry all requests. + requestResult.accept(requestEntries); + } else if (deliveryGuarantee == DeliveryGuarantee.NONE) { + // Do not retry failed requests. + requestResult.accept(Collections.emptyList()); + } + } + + private void handlePartiallyFailedRequest(SinkHttpClientResponse response, + List requestEntries, + Consumer> requestResult) { + long failedRequestsNumber = response.getRequests().stream() + .filter(r -> !r.isSuccessful()) + .count(); + log.error("Http Sink failed to write and will retry {} requests", + failedRequestsNumber); + numRecordsSendErrorsCounter.inc(failedRequestsNumber); + + if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) { + // Assumption: the order of response.requests is the same as requestEntries. + // See com.getindata.connectors.http.internal.sink.httpclient. + // JavaNetSinkHttpClient#putRequests where requests are submitted sequentially and + // then their futures are joined sequentially too. + List failedRequestEntries = new ArrayList<>(); + for (int i = 0; i < response.getRequests().size(); ++i) { + if (!response.getRequests().get(i).isSuccessful()) { + failedRequestEntries.add(requestEntries.get(i)); + } + } + requestResult.accept(failedRequestEntries); + } else if (deliveryGuarantee == DeliveryGuarantee.NONE) { + // Do not retry failed requests. + requestResult.accept(Collections.emptyList()); + } + } + @Override protected long getSizeInBytes(HttpSinkRequestEntry s) { return s.getSizeInBytes(); diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 7e4c19ff..98ce763c 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -16,6 +16,7 @@ import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.SinkHttpClientResponse; +import com.getindata.connectors.http.internal.SinkHttpClientResponse.ResponseItem; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; @@ -92,8 +93,7 @@ private CompletableFuture> submitRequests( private SinkHttpClientResponse prepareSinkHttpClientResponse( List responses, String endpointUrl) { - var successfulResponses = new ArrayList(); - var failedResponses = new ArrayList(); + var responseItems = new ArrayList(); for (var response : responses) { var sinkRequestEntry = response.getHttpRequest(); @@ -101,17 +101,12 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse( httpPostRequestCallback.call( optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); - - // TODO Add response processor here and orchestrate it with statusCodeChecker. - if (optResponse.isEmpty() || - statusCodeChecker.isErrorCode(optResponse.get().statusCode())) { - failedResponses.add(sinkRequestEntry); - } else { - successfulResponses.add(sinkRequestEntry); - } + boolean isFailed = optResponse.isEmpty() || + statusCodeChecker.isErrorCode(optResponse.get().statusCode()); + responseItems.add(new ResponseItem(sinkRequestEntry, !isFailed)); } - return new SinkHttpClientResponse(successfulResponses, failedResponses); + return new SinkHttpClientResponse(responseItems); } @VisibleForTesting diff --git a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java index ec7c53bd..ea55d094 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java @@ -25,6 +25,7 @@ import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient; import com.getindata.connectors.http.internal.table.SerializationSchemaElementConverter; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; +import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.DELIVERY_GUARANTEE; import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.INSERT_METHOD; import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.URL; @@ -125,6 +126,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { HttpSinkBuilder builder = HttpSink .builder() + .setDeliveryGuarantee(tableOptions.get(DELIVERY_GUARANTEE)) .setEndpointUrl(tableOptions.get(URL)) .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) .setHttpPostRequestCallback(httpPostRequestCallback) diff --git a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java index b87b6eb7..79036051 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java @@ -2,6 +2,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.base.DeliveryGuarantee; import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SINK_REQUEST_CALLBACK_IDENTIFIER; @@ -24,4 +25,12 @@ public class HttpDynamicSinkConnectorOptions { ConfigOptions.key(SINK_REQUEST_CALLBACK_IDENTIFIER) .stringType() .defaultValue(Slf4jHttpPostRequestCallbackFactory.IDENTIFIER); + + public static final ConfigOption DELIVERY_GUARANTEE = + ConfigOptions.key("sink.delivery-guarantee") + .enumType(DeliveryGuarantee.class) + .defaultValue(DeliveryGuarantee.NONE) + .withDescription("Defines the delivery semantic for the HTTP sink. " + + "Accepted enumerations are 'at-least-once', and 'none'. " + + "'exactly-once' semantic is not supported."); } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java index 549ff650..8bb01e34 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java @@ -5,6 +5,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory; import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator; import org.apache.flink.table.connector.sink.DynamicTableSink; @@ -82,6 +83,7 @@ public Set> optionalOptions() { var options = super.optionalOptions(); options.add(INSERT_METHOD); options.add(REQUEST_CALLBACK_IDENTIFIER); + options.add(DELIVERY_GUARANTEE); return options; } @@ -96,5 +98,11 @@ private void validateHttpSinkOptions(ReadableConfig tableOptions) )); } }); + tableOptions.getOptional(DELIVERY_GUARANTEE).ifPresent(deliveryGuarantee -> { + if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { + throw new IllegalArgumentException("'exactly-once' semantic is not supported. " + + "It is expected to be either 'none' or 'at-least-once."); + } + }); } } diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java index db1975ed..d045e2aa 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java @@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.metrics.Counter; @@ -70,6 +71,7 @@ public void setUp() { 10, 10, 10, + DeliveryGuarantee.NONE, "http://localhost/client", httpClient, stateBuffer, From c68fca92edd4472eb548b257efff39d61277b7ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Wed, 28 Aug 2024 15:40:21 +0200 Subject: [PATCH 2/3] Add tests --- .../http/internal/sink/HttpSinkInternal.java | 2 - .../internal/sink/HttpSinkWriterTest.java | 121 +++++++++++++++--- .../sink/HttpDynamicTableSinkFactoryTest.java | 20 +++ 3 files changed, 123 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java index c460a6f1..b95e7bb9 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java @@ -100,8 +100,6 @@ protected HttpSinkInternal( ); Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(endpointUrl), "The endpoint URL must be set when initializing HTTP Sink."); - Preconditions.checkArgument(deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE, - "Only at-least-once and none delivery guarantees are supported."); this.deliveryGuarantee = deliveryGuarantee; this.endpointUrl = endpointUrl; this.httpPostRequestCallback = diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java index d045e2aa..52307ff8 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java @@ -1,8 +1,8 @@ package com.getindata.connectors.http.internal.sink; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -21,6 +21,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; @@ -28,6 +29,7 @@ import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.SinkHttpClientResponse; +import com.getindata.connectors.http.internal.SinkHttpClientResponse.ResponseItem; @Slf4j @ExtendWith(MockitoExtension.class) @@ -59,42 +61,125 @@ public void setUp() { when(metricGroup.getNumRecordsSendErrorsCounter()).thenReturn(errorCounter); when(metricGroup.getIOMetricGroup()).thenReturn(operatorIOMetricGroup); when(context.metricGroup()).thenReturn(metricGroup); + } + private void createHttpSinkWriter(DeliveryGuarantee deliveryGuarantee) { Collection> stateBuffer = new ArrayList<>(); this.httpSinkWriter = new HttpSinkWriter<>( - elementConverter, - context, - 10, - 10, - 100, - 10, - 10, - 10, - DeliveryGuarantee.NONE, - "http://localhost/client", - httpClient, - stateBuffer, - new Properties()); + elementConverter, + context, + 10, + 10, + 100, + 10, + 10, + 10, + deliveryGuarantee, + "http://localhost/client", + httpClient, + stateBuffer, + new Properties()); } @Test - public void testErrorMetric() throws InterruptedException { + public void testErrorMetricWhenAllRequestsFailed() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.NONE); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Test Exception")); when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); - HttpSinkRequestEntry request = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); Consumer> requestResult = httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries)); - List requestEntries = Collections.singletonList(request); + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(2); + } + + @Test + public void testErrorMetricWhenAPartOfRequestsFailed() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.NONE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new ResponseItem(null, false), + new ResponseItem(null, true)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = + httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries)); + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(1); + } + + @Test + public void testRetryWhenAllRequestsFailed() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.AT_LEAST_ONCE); + + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new Exception("Test Exception")); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + final List entriesToRetry = new ArrayList<>(); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = entriesToRetry::addAll; + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(2); + assertEquals(2, entriesToRetry.size()); + } + + + @Test + public void testRetryWhenAPartOfRequestsFailed() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.AT_LEAST_ONCE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new ResponseItem(null, false), + new ResponseItem(null, true)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + final List entriesToRetry = new ArrayList<>(); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = entriesToRetry::addAll; + + List requestEntries = Arrays.asList(request1, request2); this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); // would be good to use Countdown Latch instead sleep... Thread.sleep(2000); - verify(errorCounter).inc(requestEntries.size()); + verify(errorCounter).inc(1); + assertEquals(1, entriesToRetry.size()); } } diff --git a/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java index 269ee87b..e8c51505 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java @@ -91,4 +91,24 @@ public void nonexistentOptionsTest() { assertThrows(ValidationException.class, () -> tEnv.executeSql("INSERT INTO http VALUES (1)").await()); } + + @Test + public void invalidSinkDeliveryGuaranteeOptionTests() { + final String invalidOptionCreateSql = + String.format( + "CREATE TABLE http (\n" + + " id bigint\n" + + ") with (\n" + + " 'connector' = '%s',\n" + + " 'url' = '%s',\n" + + " 'format' = 'json',\n" + + " 'sink.delivery-guarantee' = 'invalid'\n" + + ")", + HttpDynamicTableSinkFactory.IDENTIFIER, + "http://localhost/" + ); + tEnv.executeSql(invalidOptionCreateSql); + assertThrows(ValidationException.class, + () -> tEnv.executeSql("INSERT INTO http VALUES (1)").await()); + } } From 0729db7192152b8bde46f4205c9d3b258e034e94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Wed, 28 Aug 2024 19:58:15 +0200 Subject: [PATCH 3/3] Add RateLimitingStrategy --- .../http/internal/sink/HttpSinkWriter.java | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java index 7fe06e79..9771ef83 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java @@ -15,6 +15,10 @@ import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy; +import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy; +import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; import org.apache.flink.metrics.Counter; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -35,6 +39,9 @@ @Slf4j public class HttpSinkWriter extends AsyncSinkWriter { + private static final int AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE = 10; + private static final double AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR = 0.99D; + private static final String HTTP_SINK_WRITER_THREAD_POOL_SIZE = "4"; /** @@ -64,9 +71,20 @@ public HttpSinkWriter( SinkHttpClient sinkHttpClient, Collection> bufferedRequestStates, Properties properties) { - - super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, - maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates); + super( + elementConverter, + context, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(maxBatchSize) + .setMaxBatchSizeInBytes(maxBatchSizeInBytes) + .setMaxInFlightRequests(maxInFlightRequests) + .setMaxBufferedRequests(maxBufferedRequests) + .setMaxTimeInBufferMS(maxTimeInBufferMS) + .setMaxRecordSizeInBytes(maxRecordSizeInBytes) + .setRateLimitingStrategy( + buildRateLimitingStrategy(maxInFlightRequests, maxBatchSize)) + .build(), + bufferedRequestStates); this.deliveryGuarantee = deliveryGuarantee; this.endpointUrl = endpointUrl; this.sinkHttpClient = sinkHttpClient; @@ -86,6 +104,19 @@ public HttpSinkWriter( "http-sink-writer-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER)); } + private static RateLimitingStrategy buildRateLimitingStrategy( + int maxInFlightRequests, int maxBatchSize) { + return CongestionControlRateLimitingStrategy.builder() + .setMaxInFlightRequests(maxInFlightRequests) + .setInitialMaxInFlightMessages(maxBatchSize) + .setScalingStrategy( + AIMDScalingStrategy.builder(maxBatchSize * maxInFlightRequests) + .setIncreaseRate(AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE) + .setDecreaseFactor(AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR) + .build()) + .build(); + } + // TODO: Reintroduce retries by adding backoff policy @Override protected void submitRequestEntries(