From ece37a7bc7ef98d0701d57019bdfa636b471f706 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Mon, 14 Oct 2024 16:15:19 +0200 Subject: [PATCH 1/6] HTTP-122 Add lookup retries --- README.md | 13 +- .../http/internal/SinkHttpClientResponse.java | 38 +++- .../config/HttpConnectorConfigConstants.java | 12 +- .../internal/config/RetryStrategyType.java | 31 +++ .../http/internal/sink/HttpSinkWriter.java | 34 ++-- .../httpclient/JavaNetSinkHttpClient.java | 39 ++-- .../status/ComposeHttpStatusCodeChecker.java | 181 ++++++++--------- .../internal/status/HttpResponseStatus.java | 20 ++ .../status/HttpStatusCodeChecker.java | 13 +- ...eValueHttpStatusCodeCheckerPredicate.java} | 12 +- ...va => TypeStatusCodeCheckerPredicate.java} | 17 +- .../WhiteListHttpStatusCodeChecker.java | 23 --- .../lookup/HttpLookupConnectorOptions.java | 76 +++++-- .../lookup/HttpLookupTableSourceFactory.java | 58 +++--- .../lookup/JavaNetHttpPollingClient.java | 183 +++++++++++++---- .../internal/sink/HttpSinkConnectionTest.java | 1 + .../JavaNetSinkHttpClientConnectionTest.java | 3 +- .../ComposeHttpStatusCodeCheckerTest.java | 171 ---------------- .../ComposeHttpStatusCodeCheckerTest.java | 192 ++++++++++++++++++ 19 files changed, 683 insertions(+), 434 deletions(-) create mode 100644 src/main/java/com/getindata/connectors/http/internal/config/RetryStrategyType.java create mode 100644 src/main/java/com/getindata/connectors/http/internal/status/HttpResponseStatus.java rename src/main/java/com/getindata/connectors/http/internal/status/{SingleValueHttpStatusCodeChecker.java => SingleValueHttpStatusCodeCheckerPredicate.java} (60%) rename src/main/java/com/getindata/connectors/http/internal/status/{TypeStatusCodeChecker.java => TypeStatusCodeCheckerPredicate.java} (63%) delete mode 100644 src/main/java/com/getindata/connectors/http/internal/status/WhiteListHttpStatusCodeChecker.java delete mode 100644 src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerTest.java create mode 100644 src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java diff --git a/README.md b/README.md index 3374f642..462ff76a 100644 --- a/README.md +++ b/README.md @@ -449,16 +449,21 @@ be requested if the current time is later than the cached token expiry time minu | lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | | lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | | lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | -| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following Lookup Cache section for more details. | +| lookup.retry-strategy.type | optional | Defines the retry strategy to use in case of lookup failures. Accepted values are: `none` (default), `fixed-delay` and `exponential-delay`. | +| lookup.retry-strategy.fixed-delay.attempts | optional | The number of times that connector retries lookup execution before connector returns empty result. | +| lookup.retry-strategy.fixed-delay.delay | optional | Delay between two consecutive retry attempts. | +| lookup.retry-strategy.exponential-delay.attempts | optional | The number of times that connector retries lookup execution before connector returns empty result. | +| lookup.retry-strategy.exponential-delay.initial-delay | optional | Initial delay between two consecutive retry attempts. | +| lookup.retry-strategy.exponential-delay.max-delay | optional | The highest possible duration between two consecutive retry attempts. | | gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. | | gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. | | gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. | | gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. | | gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | | gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. | -| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding | -| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued | -| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. | +| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding | +| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued | +| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. | | gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. | | gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. | | gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. | diff --git a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java index b5637377..f14b293b 100644 --- a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java +++ b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java @@ -1,6 +1,7 @@ package com.getindata.connectors.http.internal; import java.util.List; +import java.util.stream.Collectors; import lombok.Data; import lombok.NonNull; @@ -8,6 +9,7 @@ import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest; +import com.getindata.connectors.http.internal.status.HttpResponseStatus; /** * Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted @@ -18,14 +20,36 @@ public class SinkHttpClientResponse { /** - * A list of successfully written requests. + * A list of requests along with write status. */ @NonNull - private final List successfulRequests; + private final List requests; - /** - * A list of requests that {@link SinkHttpClient} failed to write. - */ - @NonNull - private final List failedRequests; + public List getSuccessfulRequests() { + return requests.stream() + .filter(i -> i.getStatus() == HttpResponseStatus.SUCCESS) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + public List getFailedRetryableRequests() { + return requests.stream() + .filter(i -> i.getStatus() == HttpResponseStatus.FAILURE_RETRYABLE) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + public List getFailedNotRetryableRequests() { + return requests.stream() + .filter(i -> i.getStatus() == HttpResponseStatus.FAILURE_NOT_RETRYABLE) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + @Data + @ToString + public static class ResponseItem { + private final HttpRequest request; + private final HttpResponseStatus status; + } } diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index b501b29b..e914b00e 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -28,13 +28,13 @@ public final class HttpConnectorConfigConstants { + "source.lookup.header."; public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP - + "security.oidc.token.request"; + + "security.oidc.token.request"; public static final String OIDC_AUTH_TOKEN_ENDPOINT_URL = GID_CONNECTOR_HTTP - + "security.oidc.token.endpoint.url"; + + "security.oidc.token.endpoint.url"; public static final String OIDC_AUTH_TOKEN_EXPIRY_REDUCTION = GID_CONNECTOR_HTTP - + "security.oidc.token.expiry.reduction"; + + "security.oidc.token.expiry.reduction"; /** * Whether to use the raw value of the Authorization header. If set, it prevents * the special treatment of the header for Basic Authentication, thus preserving the passed @@ -54,6 +54,12 @@ public final class HttpConnectorConfigConstants { public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST = GID_CONNECTOR_HTTP + "source.lookup.error.code"; + + public static final String HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST = + GID_CONNECTOR_HTTP + "source.lookup.error-retryable.code.exclude"; + + public static final String HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST = + GID_CONNECTOR_HTTP + "source.lookup.error-retryable.code"; // ----------------------------------------------------- public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER = diff --git a/src/main/java/com/getindata/connectors/http/internal/config/RetryStrategyType.java b/src/main/java/com/getindata/connectors/http/internal/config/RetryStrategyType.java new file mode 100644 index 00000000..40a2ea08 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/config/RetryStrategyType.java @@ -0,0 +1,31 @@ +package com.getindata.connectors.http.internal.config; + +import org.apache.flink.configuration.DescribedEnum; +import org.apache.flink.configuration.description.InlineElement; +import static org.apache.flink.configuration.description.TextElement.text; + +public enum RetryStrategyType implements DescribedEnum { + + NONE("none", text("None")), + FIXED_DELAY("fixed-delay", text("Fixed delay strategy")), + EXPONENTIAL_DELAY("exponential-delay", text("Exponential delay strategy")); + + private final String value; + private final InlineElement inlineElement; + + RetryStrategyType(String value, InlineElement inlineElement) { + this.value = value; + this.inlineElement = inlineElement; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return inlineElement; + } + +} diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java index d17e9213..46e0c057 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java @@ -46,18 +46,18 @@ public class HttpSinkWriter extends AsyncSinkWriter elementConverter, - Sink.InitContext context, - int maxBatchSize, - int maxInFlightRequests, - int maxBufferedRequests, - long maxBatchSizeInBytes, - long maxTimeInBufferMS, - long maxRecordSizeInBytes, - String endpointUrl, - SinkHttpClient sinkHttpClient, - Collection> bufferedRequestStates, - Properties properties) { + ElementConverter elementConverter, + Sink.InitContext context, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + String endpointUrl, + SinkHttpClient sinkHttpClient, + Collection> bufferedRequestStates, + Properties properties) { super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates); @@ -82,8 +82,8 @@ public HttpSinkWriter( // TODO: Reintroduce retries by adding backoff policy @Override protected void submitRequestEntries( - List requestEntries, - Consumer> requestResult) { + List requestEntries, + Consumer> requestResult) { var future = sinkHttpClient.putRequests(requestEntries, endpointUrl); future.whenCompleteAsync((response, err) -> { if (err != null) { @@ -98,8 +98,10 @@ protected void submitRequestEntries( // to the `numRecordsSendErrors` metric. It is due to the fact we do not have // a clear image how we want to do it, so it would be both efficient and correct. //requestResult.accept(requestEntries); - } else if (response.getFailedRequests().size() > 0) { - int failedRequestsNumber = response.getFailedRequests().size(); + } else if (response.getFailedNotRetryableRequests().size() + + response.getFailedRetryableRequests().size() > 0) { + int failedRequestsNumber = response.getFailedNotRetryableRequests().size() + + response.getFailedRetryableRequests().size(); log.error("Http Sink failed to write and will retry {} requests", failedRequestsNumber); numRecordsSendErrorsCounter.inc(failedRequestsNumber); diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 7e4c19ff..d45028e5 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -16,12 +16,16 @@ import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.SinkHttpClientResponse; +import com.getindata.connectors.http.internal.SinkHttpClientResponse.ResponseItem; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig; +import com.getindata.connectors.http.internal.status.HttpResponseStatus; import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST; /** * An implementation of {@link SinkHttpClient} that uses Java 11's {@link HttpClient}. This @@ -41,10 +45,10 @@ public class JavaNetSinkHttpClient implements SinkHttpClient { private final RequestSubmitter requestSubmitter; public JavaNetSinkHttpClient( - Properties properties, - HttpPostRequestCallback httpPostRequestCallback, - HeaderPreprocessor headerPreprocessor, - RequestSubmitterFactory requestSubmitterFactory) { + Properties properties, + HttpPostRequestCallback httpPostRequestCallback, + HeaderPreprocessor headerPreprocessor, + RequestSubmitterFactory requestSubmitterFactory) { this.httpPostRequestCallback = httpPostRequestCallback; this.headerMap = HttpHeaderUtils.prepareHeaderMap( @@ -58,8 +62,10 @@ public JavaNetSinkHttpClient( ComposeHttpStatusCodeCheckerConfig checkerConfig = ComposeHttpStatusCodeCheckerConfig.builder() .properties(properties) - .whiteListPrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST) - .errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST) + .errorWhiteListPrefix(HTTP_ERROR_SINK_CODE_WHITE_LIST) + .errorCodePrefix(HTTP_ERROR_SINK_CODES_LIST) + .retryableWhiteListPrefix("") // TODO: sink retries not implemented yet + .retryableCodePrefix("") .build(); this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); @@ -80,8 +86,8 @@ public CompletableFuture putRequests( } private CompletableFuture> submitRequests( - List requestEntries, - String endpointUrl) { + List requestEntries, + String endpointUrl) { var responseFutures = requestSubmitter.submit(endpointUrl, requestEntries); var allFutures = CompletableFuture.allOf(responseFutures.toArray(new CompletableFuture[0])); @@ -92,8 +98,7 @@ private CompletableFuture> submitRequests( private SinkHttpClientResponse prepareSinkHttpClientResponse( List responses, String endpointUrl) { - var successfulResponses = new ArrayList(); - var failedResponses = new ArrayList(); + var responseItems = new ArrayList(); for (var response : responses) { var sinkRequestEntry = response.getHttpRequest(); @@ -102,16 +107,14 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse( httpPostRequestCallback.call( optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); - // TODO Add response processor here and orchestrate it with statusCodeChecker. - if (optResponse.isEmpty() || - statusCodeChecker.isErrorCode(optResponse.get().statusCode())) { - failedResponses.add(sinkRequestEntry); - } else { - successfulResponses.add(sinkRequestEntry); - } + // Empty indicates that HttpClient.sendAsync() failed with exception. + HttpResponseStatus status = optResponse.isEmpty() + ? HttpResponseStatus.FAILURE_RETRYABLE + : statusCodeChecker.checkStatus(optResponse.get().statusCode()); + responseItems.add(new ResponseItem(sinkRequestEntry, status)); } - return new SinkHttpClientResponse(successfulResponses, failedResponses); + return new SinkHttpClientResponse(responseItems); } @VisibleForTesting diff --git a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java index 015c068c..a44d469d 100644 --- a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java +++ b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java @@ -1,10 +1,9 @@ package com.getindata.connectors.http.internal.status; import java.util.Arrays; -import java.util.HashSet; +import java.util.Optional; import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; +import java.util.function.Predicate; import lombok.AccessLevel; import lombok.Builder; @@ -17,122 +16,73 @@ /** * An implementation of {@link HttpStatusCodeChecker} that checks Http Status code against - * white list, concrete value or {@link HttpResponseCodeType} + * white list, concrete value or {@link HttpResponseCodeType}. */ public class ComposeHttpStatusCodeChecker implements HttpStatusCodeChecker { - private static final Set DEFAULT_ERROR_CODES = - Set.of( - new TypeStatusCodeChecker(HttpResponseCodeType.CLIENT_ERROR), - new TypeStatusCodeChecker(HttpResponseCodeType.SERVER_ERROR) - ); - private static final int MIN_HTTP_STATUS_CODE = 100; + private static final int MAX_HTTP_STATUS_CODE = 599; - /** - * Set of {@link HttpStatusCodeChecker} for white listed status codes. - */ - private final Set excludedCodes; + private static final Predicate DEFAULT_ERROR_CODES = + new TypeStatusCodeCheckerPredicate(HttpResponseCodeType.CLIENT_ERROR); + private static final Predicate DEFAULT_RETRYABLE_ERROR_CODES = + new TypeStatusCodeCheckerPredicate(HttpResponseCodeType.SERVER_ERROR); - /** - * Set of {@link HttpStatusCodeChecker} that check status code againts value match or {@link - * HttpResponseCodeType} match. - */ - private final Set errorCodes; + private final Predicate retryableErrorStatusCodes; + private final Predicate notRetryableErrorStatusCodes; public ComposeHttpStatusCodeChecker(ComposeHttpStatusCodeCheckerConfig config) { - excludedCodes = prepareWhiteList(config); - errorCodes = prepareErrorCodes(config); - } - - /** - * Checks whether given status code is considered as a error code. - * This implementation checks if status code matches any single value mask like "404" - * or http type mask such as "4XX". Code that matches one of those masks and is not on a - * white list will be considered as error code. - * @param statusCode http status code to assess. - * @return true if status code is considered as error or false if not. - */ - public boolean isErrorCode(int statusCode) { - - Preconditions.checkArgument( - statusCode >= MIN_HTTP_STATUS_CODE, - String.format( - "Provided invalid Http status code %s," - + " status code should be equal or bigger than %d.", - statusCode, - MIN_HTTP_STATUS_CODE) - ); - - boolean isWhiteListed = excludedCodes.stream() - .anyMatch(check -> check.isWhiteListed(statusCode)); - - return !isWhiteListed - && errorCodes.stream() - .anyMatch(httpStatusCodeChecker -> httpStatusCodeChecker.isErrorCode(statusCode)); + retryableErrorStatusCodes = buildPredicate(config, config.getRetryableCodePrefix(), + config.getRetryableWhiteListPrefix(), DEFAULT_RETRYABLE_ERROR_CODES); + notRetryableErrorStatusCodes = buildPredicate(config, config.getErrorCodePrefix(), + config.getErrorWhiteListPrefix(), DEFAULT_ERROR_CODES); } - private Set prepareErrorCodes( - ComposeHttpStatusCodeCheckerConfig config) { - + private Predicate buildPredicate( + ComposeHttpStatusCodeCheckerConfig config, + String errorCodePrefix, + String whiteListPrefix, + Predicate defaultErrorCodes) { Properties properties = config.getProperties(); - String errorCodePrefix = config.getErrorCodePrefix(); - String errorCodes = - properties.getProperty(errorCodePrefix, ""); + String errorCodes = properties.getProperty(errorCodePrefix, ""); + String whitelistCodes = properties.getProperty(whiteListPrefix, ""); - if (StringUtils.isNullOrWhitespaceOnly(errorCodes)) { - return DEFAULT_ERROR_CODES; - } else { - String[] splitCodes = errorCodes.split(HttpConnectorConfigConstants.PROP_DELIM); - return prepareErrorCodes(splitCodes); - } + Predicate errorPredicate = + prepareErrorCodes(errorCodes).orElse(defaultErrorCodes); + Predicate whitelistPredicate = + prepareErrorCodes(whitelistCodes).orElse(i -> false); + + return errorPredicate.and(Predicate.not(whitelistPredicate)); } /** - * Process given array of status codes and assign them to - * {@link SingleValueHttpStatusCodeChecker} for full codes such as 100, 404 etc. or to - * {@link TypeStatusCodeChecker} for codes that were constructed with "XX" mask + * Process given string containing comma-separated list of status codes and assign them to + * {@link SingleValueHttpStatusCodeCheckerPredicate} for full codes such as 100, 404 etc. or to + * {@link TypeStatusCodeCheckerPredicate} for codes that were constructed with "XX" mask. + * In the end, all conditions are reduced to a single predicate. */ - private Set prepareErrorCodes(String[] statusCodes) { - - Set errorCodes = new HashSet<>(); - for (String sCode : statusCodes) { - if (!StringUtils.isNullOrWhitespaceOnly(sCode)) { - String trimCode = sCode.toUpperCase().trim(); + private Optional> prepareErrorCodes(String statusCodesStr) { + return Arrays.stream(statusCodesStr.split(HttpConnectorConfigConstants.PROP_DELIM)) + .filter(code -> !StringUtils.isNullOrWhitespaceOnly(code)) + .map(code -> code.toUpperCase().trim()) + .map(codeStr -> { Preconditions.checkArgument( - trimCode.length() == 3, + codeStr.length() == 3, "Status code should contain three characters. Provided [%s]", - trimCode); + codeStr); // at this point we have trim, upper case 3 character status code. - if (isTypeCode(trimCode)) { - int code = Integer.parseInt(trimCode.replace("X", "")); - errorCodes.add(new TypeStatusCodeChecker(HttpResponseCodeType.getByCode(code))); + if (isTypeCode(codeStr)) { + int code = Integer.parseInt(codeStr.replace("X", "")); + return new TypeStatusCodeCheckerPredicate( + HttpResponseCodeType.getByCode(code)); } else { - errorCodes.add( - new SingleValueHttpStatusCodeChecker(Integer.parseInt(trimCode)) - ); + return new SingleValueHttpStatusCodeCheckerPredicate( + Integer.parseInt(codeStr)); } - } - } - return (errorCodes.isEmpty()) ? DEFAULT_ERROR_CODES : errorCodes; - } - - private Set prepareWhiteList( - ComposeHttpStatusCodeCheckerConfig config) { - - Properties properties = config.getProperties(); - String whiteListPrefix = config.getWhiteListPrefix(); - - return Arrays.stream( - properties.getProperty(whiteListPrefix, "") - .split(HttpConnectorConfigConstants.PROP_DELIM)) - .filter(sCode -> !StringUtils.isNullOrWhitespaceOnly(sCode)) - .map(String::trim) - .mapToInt(Integer::parseInt) - .mapToObj(WhiteListHttpStatusCodeChecker::new) - .collect(Collectors.toSet()); + }) + .reduce(Predicate::or); } /** @@ -147,15 +97,52 @@ private boolean isTypeCode(final String code) { return code.charAt(1) == 'X' && code.charAt(2) == 'X'; } + /** + * Checks whether given status code is considered as an error code. + * This implementation checks if status code matches any single value mask like "404" + * or http type mask such as "4XX". Code that matches one of those masks and is not on a + * white list will be considered as error code. + * + * @param statusCode http status code to assess. + * @return SUCCESS if statusCode is considered as success, + * FAILURE_RETRYABLE if the status code indicates transient error, + * otherwise FAILURE_NON_RETRYABLE. + */ + @Override + public HttpResponseStatus checkStatus(int statusCode) { + Preconditions.checkArgument( + statusCode >= MIN_HTTP_STATUS_CODE && statusCode <= MAX_HTTP_STATUS_CODE, + String.format( + "Provided invalid Http status code %s," + + " status code should be equal or bigger than %d." + + " and equal or lower than %d.", + statusCode, + MIN_HTTP_STATUS_CODE, + MAX_HTTP_STATUS_CODE) + ); + + if (notRetryableErrorStatusCodes.test(statusCode)) { + return HttpResponseStatus.FAILURE_NOT_RETRYABLE; + } else if (retryableErrorStatusCodes.test(statusCode)) { + return HttpResponseStatus.FAILURE_RETRYABLE; + } else { + return HttpResponseStatus.SUCCESS; + } + } + @Data @Builder @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public static class ComposeHttpStatusCodeCheckerConfig { - private final String whiteListPrefix; + private final String errorWhiteListPrefix; private final String errorCodePrefix; + private final String retryableWhiteListPrefix; + + private final String retryableCodePrefix; + private final Properties properties; } } diff --git a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseStatus.java b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseStatus.java new file mode 100644 index 00000000..6fb7f3c9 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseStatus.java @@ -0,0 +1,20 @@ +package com.getindata.connectors.http.internal.status; + +/** + * Describes HttpResponse status, whether it is successful or not. In case of error, + * it also indicates if, according to configuration, the request can be retried. + */ +public enum HttpResponseStatus { + /** + * Successful request. + */ + SUCCESS, + /** + * Request failed but can be retried. + */ + FAILURE_RETRYABLE, + /** + * Request failed but cannot be retried. + */ + FAILURE_NOT_RETRYABLE, +} diff --git a/src/main/java/com/getindata/connectors/http/internal/status/HttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/HttpStatusCodeChecker.java index 6af0344c..dbdf1f07 100644 --- a/src/main/java/com/getindata/connectors/http/internal/status/HttpStatusCodeChecker.java +++ b/src/main/java/com/getindata/connectors/http/internal/status/HttpStatusCodeChecker.java @@ -2,15 +2,18 @@ /** * Base interface for all classes that would validate HTTP status - * code whether it is an error or not. + * code whether it is a success, an retryable error or not retryable error. */ public interface HttpStatusCodeChecker { /** - * Validates http status code wheter it is considered as error code. The logic for - * what status codes are considered as "errors" depends on the concreted implementation + * Validates http status code whether it is considered as an error code. The logic for + * what status codes are considered as "errors" depends on the concrete implementation. + * * @param statusCode http status code to assess. - * @return true if statusCode is considered as Error and false if not. + * @return SUCCESS if statusCode is considered as success, + * FAILURE_RETRYABLE if the status code indicates transient error, + * otherwise FAILURE_NON_RETRYABLE. */ - boolean isErrorCode(int statusCode); + HttpResponseStatus checkStatus(int statusCode); } diff --git a/src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeCheckerPredicate.java similarity index 60% rename from src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeChecker.java rename to src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeCheckerPredicate.java index b52951ed..7eff0af8 100644 --- a/src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeChecker.java +++ b/src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeCheckerPredicate.java @@ -1,15 +1,16 @@ package com.getindata.connectors.http.internal.status; +import java.util.function.Predicate; + import lombok.EqualsAndHashCode; import lombok.RequiredArgsConstructor; /** - * An implementation of {@link HttpStatusCodeChecker} that validates status code against - * constant value. + * Predicate that validates status code against constant value. */ @RequiredArgsConstructor @EqualsAndHashCode -public class SingleValueHttpStatusCodeChecker implements HttpStatusCodeChecker { +class SingleValueHttpStatusCodeCheckerPredicate implements Predicate { /** * A reference http status code to compare with. @@ -18,11 +19,12 @@ public class SingleValueHttpStatusCodeChecker implements HttpStatusCodeChecker { /** * Validates given statusCode against constant value. + * * @param statusCode http status code to assess. - * @return true if status code is considered as error or false if not. + * @return true if status code is equal to expected value. */ @Override - public boolean isErrorCode(int statusCode) { + public boolean test(Integer statusCode) { return errorCode == statusCode; } } diff --git a/src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeCheckerPredicate.java similarity index 63% rename from src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeChecker.java rename to src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeCheckerPredicate.java index df942879..851e40db 100644 --- a/src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeChecker.java +++ b/src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeCheckerPredicate.java @@ -1,13 +1,15 @@ package com.getindata.connectors.http.internal.status; +import java.util.function.Predicate; + import lombok.EqualsAndHashCode; /** - * Implementation of {@link HttpStatusCodeChecker} that verifies if given Http status code - * belongs to specific HTTP code type family. For example if it any of 100's 200's or 500's code. + * Predicate that verifies if given Http status code belongs to specific HTTP code type + * family. For example if it any of 100's, 200's or 500's code. */ @EqualsAndHashCode -public class TypeStatusCodeChecker implements HttpStatusCodeChecker { +class TypeStatusCodeCheckerPredicate implements Predicate { /** * First digit from HTTP status code that describes a type of code, @@ -19,9 +21,9 @@ public class TypeStatusCodeChecker implements HttpStatusCodeChecker { * Creates TypeStatusCodeChecker for given {@link HttpResponseCodeType} * * @param httpResponseCodeType {@link HttpResponseCodeType} for this {@link - * TypeStatusCodeChecker} instance. + * TypeStatusCodeCheckerPredicate} instance. */ - public TypeStatusCodeChecker(HttpResponseCodeType httpResponseCodeType) { + TypeStatusCodeCheckerPredicate(HttpResponseCodeType httpResponseCodeType) { this.httpTypeCode = httpResponseCodeType.getHttpTypeCode(); } @@ -33,11 +35,12 @@ public TypeStatusCodeChecker(HttpResponseCodeType httpResponseCodeType) { * checker.isErrorCode(505); <- will return true. * } * + * * @param statusCode http status code to assess. - * @return true if status code is considered as error or false if not. + * @return true if status code belongs to Http code status type. */ @Override - public boolean isErrorCode(int statusCode) { + public boolean test(Integer statusCode) { return statusCode / 100 == httpTypeCode; } } diff --git a/src/main/java/com/getindata/connectors/http/internal/status/WhiteListHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/WhiteListHttpStatusCodeChecker.java deleted file mode 100644 index 2aa65c65..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/WhiteListHttpStatusCodeChecker.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -import lombok.EqualsAndHashCode; -import lombok.RequiredArgsConstructor; - -/** - * Class that implements logic of a "white list" against single constant value. - */ -@RequiredArgsConstructor -@EqualsAndHashCode -public class WhiteListHttpStatusCodeChecker { - - private final int whiteListCode; - - /** - * Checks if given statusCode is considered as "white listed" - * @param statusCode status code to check. - * @return true if given statusCode is white listed and false if not. - */ - public boolean isWhiteListed(int statusCode) { - return whiteListCode == statusCode; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java index 9947d52d..bc767407 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java @@ -5,6 +5,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import com.getindata.connectors.http.internal.config.RetryStrategyType; import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.*; public class HttpLookupConnectorOptions { @@ -50,28 +51,67 @@ public class HttpLookupConnectorOptions { .withDescription("Whether to use the raw value of Authorization header"); public static final ConfigOption REQUEST_CALLBACK_IDENTIFIER = - ConfigOptions.key(SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER) - .stringType() - .defaultValue(Slf4jHttpLookupPostRequestCallbackFactory.IDENTIFIER); + ConfigOptions.key(SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER) + .stringType() + .defaultValue(Slf4jHttpLookupPostRequestCallbackFactory.IDENTIFIER); public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL = - ConfigOptions.key(OIDC_AUTH_TOKEN_ENDPOINT_URL) - .stringType() - .noDefaultValue() - .withDescription("OIDC Token endpoint url."); + ConfigOptions.key(OIDC_AUTH_TOKEN_ENDPOINT_URL) + .stringType() + .noDefaultValue() + .withDescription("OIDC Token endpoint url."); public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST = - ConfigOptions.key(OIDC_AUTH_TOKEN_REQUEST) - .stringType() - .noDefaultValue() - .withDescription("OIDC token request."); + ConfigOptions.key(OIDC_AUTH_TOKEN_REQUEST) + .stringType() + .noDefaultValue() + .withDescription("OIDC token request."); public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION = - ConfigOptions.key(OIDC_AUTH_TOKEN_EXPIRY_REDUCTION) - .durationType() - .defaultValue(Duration.ofSeconds(1)) - .withDescription("OIDC authorization access token expiry" + - " reduction as a Duration." + - " A new access token is obtained if the token" + - " is older than it's expiry time minus this value."); + ConfigOptions.key(OIDC_AUTH_TOKEN_EXPIRY_REDUCTION) + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("OIDC authorization access token expiry" + + " reduction as a Duration." + + " A new access token is obtained if the token" + + " is older than it's expiry time minus this value."); + + public static final ConfigOption LOOKUP_RETRY_STRATEGY_TYPE = + ConfigOptions.key("lookup.retry-strategy.type") + .enumType(RetryStrategyType.class) + .defaultValue(RetryStrategyType.NONE) + .withDescription("Lookup HTTP request retry strategy."); + + public static final ConfigOption LOOKUP_RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS = + ConfigOptions.key("lookup.retry-strategy.fixed-delay.attempts") + .intType() + .defaultValue(3) + .withDescription("The number of times that the connector retires request" + + "before the emtpy result is returned."); + + public static final ConfigOption LOOKUP_RESTART_STRATEGY_FIXED_DELAY_DELAY = + ConfigOptions.key("lookup.retry-strategy.fixed-delay.delay") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("Delay between two consecutive retry attempts."); + + public static final ConfigOption LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS = + ConfigOptions.key("lookup.retry-strategy.exponential-delay.attempts") + .intType() + .defaultValue(3) + .withDescription("The number of times that the connector retires request" + + "before the emtpy result is returned."); + + public static final ConfigOption + LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_DELAY = + ConfigOptions.key("lookup.retry-strategy.exponential-delay.initial-delay") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("Initial delay after the first lookup failure."); + + public static final ConfigOption LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_DELAY = + ConfigOptions.key("lookup.retry-strategy.exponential-delay.max-delay") + .durationType() + .defaultValue(Duration.ofMinutes(1)) + .withDescription("Maximal delay between two consecutive retry attempts."); } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java index 6c2edf20..fcc8a46b 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java @@ -30,13 +30,21 @@ import com.getindata.connectors.http.HttpPostRequestCallbackFactory; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.utils.ConfigUtils; -import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.ASYNC_POLLING; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_METHOD; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_REQUEST_FORMAT; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.REQUEST_CALLBACK_IDENTIFIER; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.URL; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.URL_ARGS; public class HttpLookupTableSourceFactory implements DynamicTableSourceFactory { private static DataTypes.Field columnToField(Column column) { return FIELD( - column.getName(), + column.getName(), // only a column in a schema should have a time attribute, // a field should not propagate the attribute because it might be used in a // completely different context @@ -82,14 +90,15 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext) getLookupCache(readable) ); } + protected void validateHttpLookupSourceOptions(ReadableConfig tableOptions) - throws IllegalArgumentException { + throws IllegalArgumentException { // ensure that there is an OIDC token request if we have an OIDC token endpoint tableOptions.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL).ifPresent(url -> { if (tableOptions.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST).isEmpty()) { throw new IllegalArgumentException("Config option " + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key() + " is required, if " + - SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key() + " is configured."); + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key() + " is configured."); } }); } @@ -107,19 +116,18 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { return Set.of( - URL_ARGS, - ASYNC_POLLING, - LOOKUP_METHOD, - REQUEST_CALLBACK_IDENTIFIER, - LookupOptions.CACHE_TYPE, - LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, - LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE, - LookupOptions.PARTIAL_CACHE_MAX_ROWS, - LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY, - LookupOptions.MAX_RETRIES, - SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION, - SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST, - SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL + URL_ARGS, + ASYNC_POLLING, + LOOKUP_METHOD, + REQUEST_CALLBACK_IDENTIFIER, + LookupOptions.CACHE_TYPE, + LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, + LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE, + LookupOptions.PARTIAL_CACHE_MAX_ROWS, + LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY, + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION, + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST, + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL ); } @@ -129,12 +137,12 @@ private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig re ConfigUtils.getHttpConnectorProperties(context.getCatalogTable().getOptions()); final HttpPostRequestCallbackFactory - postRequestCallbackFactory = - FactoryUtil.discoverFactory( - context.getClassLoader(), - HttpPostRequestCallbackFactory.class, - readableConfig.get(REQUEST_CALLBACK_IDENTIFIER) - ); + postRequestCallbackFactory = + FactoryUtil.discoverFactory( + context.getClassLoader(), + HttpPostRequestCallbackFactory.class, + readableConfig.get(REQUEST_CALLBACK_IDENTIFIER) + ); return HttpLookupConfig.builder() .lookupMethod(readableConfig.get(LOOKUP_METHOD)) @@ -151,8 +159,8 @@ private LookupCache getLookupCache(ReadableConfig tableOptions) { LookupCache cache = null; // Do not support legacy cache options if (tableOptions - .get(LookupOptions.CACHE_TYPE) - .equals(LookupOptions.LookupCacheType.PARTIAL)) { + .get(LookupOptions.CACHE_TYPE) + .equals(LookupOptions.LookupCacheType.PARTIAL)) { cache = DefaultLookupCache.fromConfig(tableOptions); } return cache; diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index ce3a31cc..017af0a7 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -4,21 +4,40 @@ import java.net.http.HttpClient; import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; +import java.time.Duration; import java.util.Collections; import java.util.Optional; +import static java.lang.String.format; +import lombok.AllArgsConstructor; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.data.RowData; import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.ExponentialBackoffRetryStrategy; +import org.apache.flink.util.concurrent.FixedRetryStrategy; +import org.apache.flink.util.concurrent.RetryStrategy; import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.internal.PollingClient; -import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; +import com.getindata.connectors.http.internal.config.RetryStrategyType; import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig; +import com.getindata.connectors.http.internal.status.HttpResponseStatus; import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_DELAY; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_DELAY; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RESTART_STRATEGY_FIXED_DELAY_DELAY; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RETRY_STRATEGY_TYPE; /** * An implementation of {@link PollingClient} that uses Java 11's {@link HttpClient}. @@ -37,32 +56,77 @@ public class JavaNetHttpPollingClient implements PollingClient { private final HttpPostRequestCallback httpPostRequestCallback; + private final RetryStrategy retryStrategy; + + public JavaNetHttpPollingClient( - HttpClient httpClient, - DeserializationSchema responseBodyDecoder, - HttpLookupConfig options, - HttpRequestFactory requestFactory) { + HttpClient httpClient, + DeserializationSchema responseBodyDecoder, + HttpLookupConfig options, + HttpRequestFactory requestFactory) { this.httpClient = httpClient; this.responseBodyDecoder = responseBodyDecoder; this.requestFactory = requestFactory; this.httpPostRequestCallback = options.getHttpPostRequestCallback(); + this.retryStrategy = buildRestartStrategy(options.getReadableConfig()); // TODO Inject this via constructor when implementing a response processor. // Processor will be injected and it will wrap statusChecker implementation. ComposeHttpStatusCodeCheckerConfig checkerConfig = ComposeHttpStatusCodeCheckerConfig.builder() .properties(options.getProperties()) - .whiteListPrefix( - HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST - ) - .errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST) + .errorWhiteListPrefix(HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST) + .errorCodePrefix(HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST) + .retryableWhiteListPrefix(HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST) + .retryableCodePrefix(HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST) .build(); this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); } + private RetryStrategy buildRestartStrategy(ReadableConfig readableConfig) { + // RetryStrategy interface is not serializable, so we need to create it here. + RetryStrategyType retryStrategyType = readableConfig.get(LOOKUP_RETRY_STRATEGY_TYPE); + + if (retryStrategyType.equals(RetryStrategyType.NONE)) { + return new RetryStrategy() { + @Override + public int getNumRemainingRetries() { + return 0; + } + + @Override + public Duration getRetryDelay() { + return null; + } + + @Override + public RetryStrategy getNextRetryStrategy() { + return null; + } + }; + } + if (retryStrategyType.equals(RetryStrategyType.FIXED_DELAY)) { + return new FixedRetryStrategy( + readableConfig.get(LOOKUP_RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS), + readableConfig.get(LOOKUP_RESTART_STRATEGY_FIXED_DELAY_DELAY) + ); + } else if (retryStrategyType.equals(RetryStrategyType.EXPONENTIAL_DELAY)) { + return new ExponentialBackoffRetryStrategy( + readableConfig.get(LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS), + readableConfig.get(LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_DELAY), + readableConfig.get(LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_DELAY) + ); + } else { + throw new IllegalArgumentException( + String.format("Invalid restart strategy type. Actual: %s. ", + retryStrategyType) + ); + } + } + @Override public Optional pull(RowData lookupRow) { try { @@ -74,48 +138,99 @@ public Optional pull(RowData lookupRow) { } } - // TODO Add Retry Policy And configure TimeOut from properties private Optional queryAndProcess(RowData lookupData) throws Exception { - HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData); - HttpResponse response = httpClient.send( - request.getHttpRequest(), - BodyHandlers.ofString() - ); - return processHttpResponse(response, request); + + RetryStrategy currentRetryStrategy = retryStrategy; + final int maxRetryCount = currentRetryStrategy.getNumRemainingRetries(); + int tryCount = 0; + + do { + tryCount++; + HttpResponse httpResponse = httpClient.send( + request.getHttpRequest(), + BodyHandlers.ofString() + ); + Response parsedResponse = processHttpResponse(httpResponse, request); + if (parsedResponse.getStatus() == HttpResponseStatus.SUCCESS + || parsedResponse.getStatus() == HttpResponseStatus.FAILURE_NOT_RETRYABLE) { + return parsedResponse.getRowData(); + } else { + if (tryCount == maxRetryCount) { + log.error("Maximum retry count reached. Aborting..."); + } else { + log.info( + "Attempt {}/{} failed. Retrying HTTP request.", + tryCount, + maxRetryCount); + Thread.sleep(currentRetryStrategy.getRetryDelay().toMillis()); + } + } + currentRetryStrategy = currentRetryStrategy.getNextRetryStrategy(); + } while (currentRetryStrategy.getNumRemainingRetries() > 0); + return Optional.empty(); } - private Optional processHttpResponse( - HttpResponse response, - HttpLookupSourceRequestEntry request) throws IOException { + private Response processHttpResponse( + HttpResponse response, + HttpLookupSourceRequestEntry request) throws IOException { this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap()); if (response == null) { - return Optional.empty(); + log.error("Empty HTTP response."); + // TODO: when response is null? + return Response.retryable(); } String responseBody = response.body(); int statusCode = response.statusCode(); - log.debug(String.format("Received status code [%s] for RestTableSource request " + - "with Server response body [%s] ", statusCode, responseBody)); - - if (notErrorCodeAndNotEmptyBody(responseBody, statusCode)) { - return Optional.ofNullable(responseBodyDecoder.deserialize(responseBody.getBytes())); + log.debug(format("Received status code [%s] for RestTableSource request " + + "with Server response body [%s].", statusCode, responseBody)); + + HttpResponseStatus httpResponseStatus = statusCodeChecker.checkStatus(statusCode); + if (httpResponseStatus == HttpResponseStatus.SUCCESS) { + log.trace("Returned successful status code [%s]."); + return Response.success(responseBodyDecoder.deserialize(responseBody.getBytes())); + } else if (httpResponseStatus == HttpResponseStatus.FAILURE_NOT_RETRYABLE) { + log.warn(format("Returned not retryable error status code [%s].", statusCode)); + return Response.notRetryable(); + } else if (httpResponseStatus == HttpResponseStatus.FAILURE_RETRYABLE) { + log.warn(format("Returned retryable error status code [%s].", statusCode)); + return Response.retryable(); + } else if (StringUtils.isNullOrWhitespaceOnly(responseBody)) { + // TODO: When it is possible? + log.error(format("Returned body was empty. Status Code [%s].", statusCode)); + return Response.retryable(); } else { - log.warn( - String.format("Returned Http status code was invalid or returned body was empty. " - + "Status Code [%s]", statusCode) - ); - - return Optional.empty(); + throw new IllegalStateException( + format("Unexpected state. Status Code [%s].", statusCode)); } } - private boolean notErrorCodeAndNotEmptyBody(String body, int statusCode) { - return !(StringUtils.isNullOrWhitespaceOnly(body) || statusCodeChecker.isErrorCode( - statusCode)); + @AllArgsConstructor + private static final class Response { + + static Response success(RowData rowData) { + return new Response(HttpResponseStatus.SUCCESS, rowData); + } + + static Response notRetryable() { + return new Response(HttpResponseStatus.FAILURE_NOT_RETRYABLE, null); + } + + static Response retryable() { + return new Response(HttpResponseStatus.FAILURE_RETRYABLE, null); + } + + @Getter + private final HttpResponseStatus status; + private final RowData rowData; + + public Optional getRowData() { + return Optional.ofNullable(rowData); + } } @VisibleForTesting diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java index cbf33c2f..3607f7da 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java @@ -274,6 +274,7 @@ public void testFailedConnection404OnWhiteList() throws Exception { .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) .setProperty("gid.connector.http.sink.error.code.exclude", "404, 405") .setProperty("gid.connector.http.sink.error.code", "4XX") + .setProperty("gid.connector.http.sink.error-retryable.code.exclude", "404, 405") .build(); source.sinkTo(httpSink); env.execute("Http Sink test failed connection"); diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java index 345687b5..50fc9be1 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java @@ -341,7 +341,8 @@ private void testSinkClientForConnection( ).get(); assertThat(response.getSuccessfulRequests()).isNotEmpty(); - assertThat(response.getFailedRequests()).isEmpty(); + assertThat(response.getFailedNotRetryableRequests()).isEmpty(); + assertThat(response.getFailedRetryableRequests()).isEmpty(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerTest.java deleted file mode 100644 index 23baaead..00000000 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerTest.java +++ /dev/null @@ -1,171 +0,0 @@ -package com.getindata.connectors.http.internal.sink.httpclient.status; - -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.apache.flink.util.StringUtils; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertAll; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; -import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; -import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig; - -class ComposeHttpStatusCodeCheckerTest { - - private static final String STRING_CODES = "403, 100,200, 300, , 303 ,200"; - - private static final List CODES = - Arrays.stream(STRING_CODES.split(HttpConnectorConfigConstants.PROP_DELIM)) - .filter(code -> !StringUtils.isNullOrWhitespaceOnly(code)) - .map(String::trim) - .mapToInt(Integer::parseInt) - .boxed() - .collect(Collectors.toList()); - - private ComposeHttpStatusCodeChecker codeChecker; - - @BeforeAll - public static void beforeAll() { - assertThat(CODES).isNotEmpty(); - } - - private static Stream propertiesArguments() { - return Stream.of( - Arguments.of(new Properties()), - Arguments.of(prepareErrorCodeProperties("", "")), - Arguments.of(prepareErrorCodeProperties(" ", " ")), - Arguments.of(prepareErrorCodeProperties(",,,", ",,,,")) - ); - } - - @ParameterizedTest - @MethodSource("propertiesArguments") - public void shouldPassOnDefault(Properties properties) { - - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); - - assertAll(() -> { - assertThat(codeChecker.isErrorCode(100)).isFalse(); - assertThat(codeChecker.isErrorCode(200)).isFalse(); - assertThat(codeChecker.isErrorCode(500)).isTrue(); - assertThat(codeChecker.isErrorCode(501)).isTrue(); - assertThat(codeChecker.isErrorCode(400)).isTrue(); - assertThat(codeChecker.isErrorCode(404)).isTrue(); - }); - } - - @Test - public void shouldParseWhiteList() { - - Properties properties = new Properties(); - properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST, - STRING_CODES); - properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, - "1XX, 2XX, 3XX, 4XX, 5XX" - ); - - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); - - assertAll(() -> { - CODES.forEach(code -> assertThat(codeChecker.isErrorCode(code)).isFalse()); - - assertThat(codeChecker.isErrorCode(301)) - .withFailMessage( - "Not on a white list but matches 3XX range. " - + "Should be considered as error code.") - .isTrue(); - }); - } - - @Test - public void shouldParseErrorCodeList() { - - Properties properties = new Properties(); - properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, - STRING_CODES); - - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); - - assertAll(() -> CODES.forEach(code -> assertThat(codeChecker.isErrorCode(code)).isTrue())); - } - - @Test - public void shouldParseErrorCodeRange() { - - Properties properties = new Properties(); - properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, "1xx, 2XX "); - - List codes = List.of(100, 110, 200, 220); - - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); - - assertAll(() -> { - codes.forEach(code -> assertThat(codeChecker.isErrorCode(code)).isTrue()); - - assertThat(codeChecker.isErrorCode(303)) - .withFailMessage( - "Out ot Error code type range therefore should be not marked as error code.") - .isFalse(); - }); - } - - @ParameterizedTest - @ValueSource(strings = {"X", "XXX", " X X", "1X1", "XX1", "XX1XX", "XX1 XX"}) - public void shouldThrowOnInvalidCodeRange(String listCode) { - - Properties properties = new Properties(); - properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, listCode); - - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - assertThrows( - Exception.class, - () -> new ComposeHttpStatusCodeChecker(checkerConfig) - ); - } - - private static Properties prepareErrorCodeProperties(String errorCodeList, String whiteList) { - Properties properties = new Properties(); - properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST, - whiteList - ); - properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, - errorCodeList - ); - return properties; - } - - private ComposeHttpStatusCodeCheckerConfig prepareCheckerConfig(Properties properties) { - return ComposeHttpStatusCodeCheckerConfig.builder() - .properties(properties) - .whiteListPrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST) - .errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST) - .build(); - } -} diff --git a/src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java b/src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java new file mode 100644 index 00000000..689c8955 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java @@ -0,0 +1,192 @@ +package com.getindata.connectors.http.internal.status; + +import java.util.List; +import java.util.Properties; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig; +import static com.getindata.connectors.http.internal.status.HttpResponseStatus.FAILURE_NOT_RETRYABLE; +import static com.getindata.connectors.http.internal.status.HttpResponseStatus.FAILURE_RETRYABLE; +import static com.getindata.connectors.http.internal.status.HttpResponseStatus.SUCCESS; + +class ComposeHttpStatusCodeCheckerTest { + + private static final String NOT_RETRYABLE_CODE_PROPERTY = "error.code"; + private static final String NOT_RETRYABLE_WHITELIST_PROPERTY = "error.code.exclude"; + private static final String RETRYABLE_CODE_PROPERTY = "retryable.code"; + private static final String RETRYABLE_WHITELIST_PROPERTY = "retryable.code.exclude"; + + @Test + void shouldReturnAppropriateStatusByDefault() { + Properties properties = new Properties(); + ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); + HttpStatusCodeChecker codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + + assertAll(() -> { + assertThat(codeChecker.checkStatus(100)).isEqualTo(SUCCESS); + assertThat(codeChecker.checkStatus(200)).isEqualTo(SUCCESS); + assertThat(codeChecker.checkStatus(302)).isEqualTo(SUCCESS); + assertThat(codeChecker.checkStatus(400)).isEqualTo(FAILURE_NOT_RETRYABLE); + assertThat(codeChecker.checkStatus(404)).isEqualTo(FAILURE_NOT_RETRYABLE); + assertThat(codeChecker.checkStatus(500)).isEqualTo(FAILURE_RETRYABLE); + assertThat(codeChecker.checkStatus(501)).isEqualTo(FAILURE_RETRYABLE); + assertThat(codeChecker.checkStatus(503)).isEqualTo(FAILURE_RETRYABLE); + assertThat(codeChecker.checkStatus(505)).isEqualTo(FAILURE_RETRYABLE); + }); + } + + @Test + void shouldReturnAppropriateStatus() { + Properties properties = new Properties(); + properties.setProperty(NOT_RETRYABLE_CODE_PROPERTY, "1XX,4XX,505"); + properties.setProperty(NOT_RETRYABLE_WHITELIST_PROPERTY, "404"); + properties.setProperty(RETRYABLE_CODE_PROPERTY, "404,5XX"); + properties.setProperty(RETRYABLE_WHITELIST_PROPERTY, "501,505"); + + ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); + HttpStatusCodeChecker codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + + assertAll(() -> { + assertThat(codeChecker.checkStatus(100)).isEqualTo(FAILURE_NOT_RETRYABLE); + assertThat(codeChecker.checkStatus(200)).isEqualTo(SUCCESS); + assertThat(codeChecker.checkStatus(400)).isEqualTo(FAILURE_NOT_RETRYABLE); + assertThat(codeChecker.checkStatus(404)).isEqualTo(FAILURE_RETRYABLE); + assertThat(codeChecker.checkStatus(500)).isEqualTo(FAILURE_RETRYABLE); + assertThat(codeChecker.checkStatus(501)).isEqualTo(SUCCESS); + assertThat(codeChecker.checkStatus(503)).isEqualTo(FAILURE_RETRYABLE); + assertThat(codeChecker.checkStatus(505)).isEqualTo(FAILURE_NOT_RETRYABLE); + }); + } + + @Test + void shouldParseWhiteList() { + String rawCodes = "403, 100, 200, 300, 303, 200"; + List whitelistCodes = List.of(403, 100, 200, 300, 303, 200); + Properties properties = new Properties(); + properties.setProperty(NOT_RETRYABLE_CODE_PROPERTY, "1XX, 2XX, 3XX, 4XX"); + properties.setProperty(NOT_RETRYABLE_WHITELIST_PROPERTY, rawCodes); + properties.setProperty(RETRYABLE_CODE_PROPERTY, "5XX"); + properties.setProperty(RETRYABLE_WHITELIST_PROPERTY, rawCodes); + + ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); + HttpStatusCodeChecker codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + + assertAll(() -> { + whitelistCodes.forEach( + code -> assertThat(codeChecker.checkStatus(code)).isEqualTo(SUCCESS) + ); + + assertThat(codeChecker.checkStatus(301)) + .withFailMessage( + "Not on a white list but matches 3XX range. " + + "Should be considered as error code.") + .isEqualTo(FAILURE_NOT_RETRYABLE); + }); + } + + @Test + void shouldParseErrorCodeList() { + List notRetryableCodes = List.of(100, 202, 404); + List retryableCodes = List.of(302, 502); + Properties properties = new Properties(); + properties.setProperty(NOT_RETRYABLE_CODE_PROPERTY, "100, 202, 404"); + properties.setProperty(RETRYABLE_CODE_PROPERTY, "302, 502"); + + ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); + HttpStatusCodeChecker codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + + assertAll(() -> { + notRetryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) + .isEqualTo(FAILURE_NOT_RETRYABLE)); + retryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) + .isEqualTo(FAILURE_RETRYABLE)); + }); + } + + @Test + void shouldParseErrorCodeRange() { + Properties properties = new Properties(); + properties.setProperty(NOT_RETRYABLE_CODE_PROPERTY, "1XX, 2XX"); + properties.setProperty(RETRYABLE_CODE_PROPERTY, "3XX, 4XX"); + List notRetryableCodes = List.of(100, 110, 200, 220); + List retryableCodes = List.of(301, 404); + + ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); + HttpStatusCodeChecker codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + + assertAll(() -> { + notRetryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) + .isEqualTo(FAILURE_NOT_RETRYABLE)); + retryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) + .isEqualTo(FAILURE_RETRYABLE)); + assertThat(codeChecker.checkStatus(503)) + .withFailMessage( + "Out ot Error code type range therefore should be not marked as error code.") + .isEqualTo(SUCCESS); + }); + } + + @Test + void shouldIgnoreRedundantWhiteSpacesOrEmptyOrRepeatedValues() { + Properties properties = new Properties(); + properties.setProperty(NOT_RETRYABLE_CODE_PROPERTY, " , 100,200, 300, , 303 ,200 "); + properties.setProperty(RETRYABLE_CODE_PROPERTY, ",5XX, 4XX,, ,"); + List notRetryableCodes = List.of(100, 200, 300, 303); + List retryableCodes = List.of(500, 501, 400, 401); + + ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); + HttpStatusCodeChecker codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + + assertAll(() -> { + notRetryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) + .isEqualTo(FAILURE_NOT_RETRYABLE)); + retryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) + .isEqualTo(FAILURE_RETRYABLE)); + }); + } + + + @ParameterizedTest + @ValueSource(strings = {"X", "XXX", " X X", "1X1", "XX1", "XX1XX", "XX1 XX"}) + void shouldThrowOnInvalidCodeRangeInNonRetryableError(String listCode) { + Properties properties = new Properties(); + properties.setProperty(NOT_RETRYABLE_CODE_PROPERTY, listCode); + + ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); + + assertThrows( + Exception.class, + () -> new ComposeHttpStatusCodeChecker(checkerConfig) + ); + } + + @ParameterizedTest + @ValueSource(strings = {"X", "XXX", " X X", "1X1", "XX1", "XX1XX", "XX1 XX"}) + void shouldThrowOnInvalidCodeRangeInRetryableError(String listCode) { + Properties properties = new Properties(); + properties.setProperty(RETRYABLE_CODE_PROPERTY, listCode); + + ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); + + assertThrows( + Exception.class, + () -> new ComposeHttpStatusCodeChecker(checkerConfig) + ); + } + + private ComposeHttpStatusCodeCheckerConfig prepareCheckerConfig(Properties properties) { + return ComposeHttpStatusCodeCheckerConfig.builder() + .properties(properties) + .errorCodePrefix(NOT_RETRYABLE_CODE_PROPERTY) + .errorWhiteListPrefix(NOT_RETRYABLE_WHITELIST_PROPERTY) + .retryableCodePrefix(RETRYABLE_CODE_PROPERTY) + .retryableWhiteListPrefix(RETRYABLE_WHITELIST_PROPERTY) + .build(); + } +} From beefad1f6664b6956c955f2ecf28d60d01f0510c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Mon, 14 Oct 2024 16:47:35 +0200 Subject: [PATCH 2/6] Revert Sink changes --- .../http/internal/SinkHttpClientResponse.java | 38 ++++--------------- .../http/internal/sink/HttpSinkWriter.java | 6 +-- .../httpclient/JavaNetSinkHttpClient.java | 21 ++++++---- .../JavaNetSinkHttpClientConnectionTest.java | 3 +- 4 files changed, 23 insertions(+), 45 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java index f14b293b..b5637377 100644 --- a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java +++ b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java @@ -1,7 +1,6 @@ package com.getindata.connectors.http.internal; import java.util.List; -import java.util.stream.Collectors; import lombok.Data; import lombok.NonNull; @@ -9,7 +8,6 @@ import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest; -import com.getindata.connectors.http.internal.status.HttpResponseStatus; /** * Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted @@ -20,36 +18,14 @@ public class SinkHttpClientResponse { /** - * A list of requests along with write status. + * A list of successfully written requests. */ @NonNull - private final List requests; + private final List successfulRequests; - public List getSuccessfulRequests() { - return requests.stream() - .filter(i -> i.getStatus() == HttpResponseStatus.SUCCESS) - .map(ResponseItem::getRequest) - .collect(Collectors.toList()); - } - - public List getFailedRetryableRequests() { - return requests.stream() - .filter(i -> i.getStatus() == HttpResponseStatus.FAILURE_RETRYABLE) - .map(ResponseItem::getRequest) - .collect(Collectors.toList()); - } - - public List getFailedNotRetryableRequests() { - return requests.stream() - .filter(i -> i.getStatus() == HttpResponseStatus.FAILURE_NOT_RETRYABLE) - .map(ResponseItem::getRequest) - .collect(Collectors.toList()); - } - - @Data - @ToString - public static class ResponseItem { - private final HttpRequest request; - private final HttpResponseStatus status; - } + /** + * A list of requests that {@link SinkHttpClient} failed to write. + */ + @NonNull + private final List failedRequests; } diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java index 46e0c057..3b42f815 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java @@ -98,10 +98,8 @@ protected void submitRequestEntries( // to the `numRecordsSendErrors` metric. It is due to the fact we do not have // a clear image how we want to do it, so it would be both efficient and correct. //requestResult.accept(requestEntries); - } else if (response.getFailedNotRetryableRequests().size() - + response.getFailedRetryableRequests().size() > 0) { - int failedRequestsNumber = response.getFailedNotRetryableRequests().size() - + response.getFailedRetryableRequests().size(); + } else if (response.getFailedRequests().size() > 0) { + int failedRequestsNumber = response.getFailedRequests().size(); log.error("Http Sink failed to write and will retry {} requests", failedRequestsNumber); numRecordsSendErrorsCounter.inc(failedRequestsNumber); diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index d45028e5..3e68e1e3 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -16,7 +16,6 @@ import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.SinkHttpClientResponse; -import com.getindata.connectors.http.internal.SinkHttpClientResponse.ResponseItem; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; @@ -98,7 +97,8 @@ private CompletableFuture> submitRequests( private SinkHttpClientResponse prepareSinkHttpClientResponse( List responses, String endpointUrl) { - var responseItems = new ArrayList(); + var successfulResponses = new ArrayList(); + var failedResponses = new ArrayList(); for (var response : responses) { var sinkRequestEntry = response.getHttpRequest(); @@ -107,14 +107,19 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse( httpPostRequestCallback.call( optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); - // Empty indicates that HttpClient.sendAsync() failed with exception. - HttpResponseStatus status = optResponse.isEmpty() - ? HttpResponseStatus.FAILURE_RETRYABLE - : statusCodeChecker.checkStatus(optResponse.get().statusCode()); - responseItems.add(new ResponseItem(sinkRequestEntry, status)); + // TODO Add response processor here and orchestrate it with statusCodeChecker. + if (optResponse.isEmpty() || + statusCodeChecker.checkStatus(optResponse.get().statusCode()).equals( + HttpResponseStatus.FAILURE_RETRYABLE) || + statusCodeChecker.checkStatus(optResponse.get().statusCode()).equals( + HttpResponseStatus.FAILURE_NOT_RETRYABLE)) { + failedResponses.add(sinkRequestEntry); + } else { + successfulResponses.add(sinkRequestEntry); + } } - return new SinkHttpClientResponse(responseItems); + return new SinkHttpClientResponse(successfulResponses, failedResponses); } @VisibleForTesting diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java index 50fc9be1..345687b5 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java @@ -341,8 +341,7 @@ private void testSinkClientForConnection( ).get(); assertThat(response.getSuccessfulRequests()).isNotEmpty(); - assertThat(response.getFailedNotRetryableRequests()).isEmpty(); - assertThat(response.getFailedRetryableRequests()).isEmpty(); + assertThat(response.getFailedRequests()).isEmpty(); } catch (Exception e) { throw new RuntimeException(e); } From 6a2a994bc95eaa9323f97a0d76ed31b8ee654e24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Tue, 15 Oct 2024 10:40:53 +0200 Subject: [PATCH 3/6] Refactor error retry configuration parameters --- README.md | 28 +++++++++++++------ .../config/HttpConnectorConfigConstants.java | 12 ++++++-- .../status/ComposeHttpStatusCodeChecker.java | 22 ++++++++++++--- .../lookup/JavaNetHttpPollingClient.java | 12 +++++--- ...avaNetHttpPollingClientConnectionTest.java | 4 +-- 5 files changed, 58 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 462ff76a..f8c8a6ac 100644 --- a/README.md +++ b/README.md @@ -379,18 +379,26 @@ is provided. ## HTTP status code handler -Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors. -By default all 400s and 500s response codes will be interpreted as error code. + +Http Sink and Lookup Source connectors allow defining list of HTTP status codes for which connector either retry +request according to defined strategy, or returns empty result. +By default, all 400s are interpreted as non-retryable errors, while 500s response codes as retryable errors. This behavior can be changed by using below properties in table definition (DDL) for Sink and Lookup Source or passing it via `setProperty' method from Sink's builder. The property names are: -- `gid.connector.http.sink.error.code` and `gid.connector.http.source.lookup.error.code` used to defined HTTP status code value that should be treated as error for example 404. +- `gid.connector.http.sink.error.non-retryable.code` and `gid.connector.http.source.lookup.error.non-retryable.code`, allow to define HTTP status code values that should be treated as non-retryable error. +- `gid.connector.http.sink.error.retryable.code` and `gid.connector.http.source.lookup.error.retryable.code`, allow to define HTTP status code values that should be treated as retryable error. + Many status codes can be defined in one value, where each code should be separated with comma, for example: `401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors. An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors. -- `gid.connector.http.sink.error.code.exclude` and `gid.connector.http.source.lookup.error.code.exclude` used to exclude a HTTP code from error list. - Many status codes can be defined in one value, where each code should be separated with comma, for example: - `401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes. + +Another set of properties are: +- `gid.connector.http.sink.error.non-retryable.code.exclude` and `gid.connector.http.source.lookup.error.non-retryable.code.exclude` used to exclude an HTTP code from non-retryable error list. +- `gid.connector.http.sink.error.retryable.code.exclude` and `gid.connector.http.source.lookup.error.retryable.code.exclude` used to exclude an HTTP code from retryable error list. + +Many status codes can be defined in one value, where each code should be separated with comma, for example: +`401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes. ## TLS and mTLS support Both Http Sink and Lookup Source connectors supports Https communication using TLS 1.2 and mTLS. @@ -455,8 +463,12 @@ be requested if the current time is later than the cached token expiry time minu | lookup.retry-strategy.exponential-delay.attempts | optional | The number of times that connector retries lookup execution before connector returns empty result. | | lookup.retry-strategy.exponential-delay.initial-delay | optional | Initial delay between two consecutive retry attempts. | | lookup.retry-strategy.exponential-delay.max-delay | optional | The highest possible duration between two consecutive retry attempts. | -| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. | -| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. | +| gid.connector.http.lookup.error.code | optional | (Deprecated) List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. | +| gid.connector.http.lookup.error.code.exclude | optional | (Deprecated) List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. | +| gid.connector.http.lookup.error.non-retryable.code | optional | List of HTTP status codes that should be treated as errors for which HTTP Source should not retry request, separated with comma. | +| gid.connector.http.lookup.error.non-retryable.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. | +| gid.connector.http.lookup.error.retryable.code | optional | List of HTTP status codes that should be treated as errors for which HTTP Source should retry request, separated with comma. | +| gid.connector.http.lookup.error.retryable.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup-retryable.error.code` list, separated with comma. | | gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. | | gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. | | gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index e914b00e..ca8b3835 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -49,17 +49,25 @@ public final class HttpConnectorConfigConstants { public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code"; + @Deprecated public static final String HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "source.lookup.error.code.exclude"; + @Deprecated public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST = GID_CONNECTOR_HTTP + "source.lookup.error.code"; + public static final String HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST = + GID_CONNECTOR_HTTP + "source.lookup.error.non-retryable.code.exclude"; + + public static final String HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODES_LIST = + GID_CONNECTOR_HTTP + "source.lookup.error.non-retryable.code"; + public static final String HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST = - GID_CONNECTOR_HTTP + "source.lookup.error-retryable.code.exclude"; + GID_CONNECTOR_HTTP + "source.lookup.error.retryable.code.exclude"; public static final String HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST = - GID_CONNECTOR_HTTP + "source.lookup.error-retryable.code"; + GID_CONNECTOR_HTTP + "source.lookup.error.retryable.code"; // ----------------------------------------------------- public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER = diff --git a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java index a44d469d..f315809d 100644 --- a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java +++ b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java @@ -27,15 +27,25 @@ public class ComposeHttpStatusCodeChecker implements HttpStatusCodeChecker { new TypeStatusCodeCheckerPredicate(HttpResponseCodeType.CLIENT_ERROR); private static final Predicate DEFAULT_RETRYABLE_ERROR_CODES = new TypeStatusCodeCheckerPredicate(HttpResponseCodeType.SERVER_ERROR); + private static final Predicate DEFAULT_DEPRECATED_ERROR_CODES = + DEFAULT_ERROR_CODES.or(DEFAULT_RETRYABLE_ERROR_CODES); private final Predicate retryableErrorStatusCodes; private final Predicate notRetryableErrorStatusCodes; public ComposeHttpStatusCodeChecker(ComposeHttpStatusCodeCheckerConfig config) { - retryableErrorStatusCodes = buildPredicate(config, config.getRetryableCodePrefix(), - config.getRetryableWhiteListPrefix(), DEFAULT_RETRYABLE_ERROR_CODES); - notRetryableErrorStatusCodes = buildPredicate(config, config.getErrorCodePrefix(), - config.getErrorWhiteListPrefix(), DEFAULT_ERROR_CODES); + // Handle deprecated configuration for backward compatibility. + if (!StringUtils.isNullOrWhitespaceOnly(config.getDeprecatedCodePrefix()) || + !StringUtils.isNullOrWhitespaceOnly(config.getDeprecatedErrorWhiteListPrefix())) { + notRetryableErrorStatusCodes = buildPredicate(config, config.getDeprecatedCodePrefix(), + config.getDeprecatedErrorWhiteListPrefix(), DEFAULT_DEPRECATED_ERROR_CODES); + retryableErrorStatusCodes = integer -> false; + } else { + retryableErrorStatusCodes = buildPredicate(config, config.getRetryableCodePrefix(), + config.getRetryableWhiteListPrefix(), DEFAULT_RETRYABLE_ERROR_CODES); + notRetryableErrorStatusCodes = buildPredicate(config, config.getErrorCodePrefix(), + config.getErrorWhiteListPrefix(), DEFAULT_ERROR_CODES); + } } private Predicate buildPredicate( @@ -135,6 +145,10 @@ public HttpResponseStatus checkStatus(int statusCode) { @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public static class ComposeHttpStatusCodeCheckerConfig { + private final String deprecatedErrorWhiteListPrefix; + + private final String deprecatedCodePrefix; + private final String errorWhiteListPrefix; private final String errorCodePrefix; diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 017af0a7..3ace6f88 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -7,6 +7,10 @@ import java.time.Duration; import java.util.Collections; import java.util.Optional; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODES_LIST; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST; import static java.lang.String.format; import lombok.AllArgsConstructor; @@ -30,8 +34,6 @@ import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker; import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST; import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_DELAY; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_DELAY; @@ -77,8 +79,10 @@ public JavaNetHttpPollingClient( ComposeHttpStatusCodeCheckerConfig checkerConfig = ComposeHttpStatusCodeCheckerConfig.builder() .properties(options.getProperties()) - .errorWhiteListPrefix(HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST) - .errorCodePrefix(HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST) + .deprecatedErrorWhiteListPrefix(HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST) + .deprecatedCodePrefix(HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST) + .errorWhiteListPrefix(HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST) + .errorCodePrefix(HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODES_LIST) .retryableWhiteListPrefix(HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST) .retryableCodePrefix(HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST) .build(); diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java index 27f62df0..fa54dfda 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java @@ -436,11 +436,11 @@ private StubMapping setupServerStubForBasicAuth() { private static Properties prepareErrorCodeProperties(String errorCodeList, String whiteList) { Properties properties = new Properties(); properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST, + HttpConnectorConfigConstants.HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST, whiteList ); properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST, + HttpConnectorConfigConstants.HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODES_LIST, errorCodeList ); From 1ad0fa98fd01668fc3d46ba6861c445d7adbe91b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Tue, 15 Oct 2024 11:31:41 +0200 Subject: [PATCH 4/6] Support backward compatibility --- .../httpclient/JavaNetSinkHttpClient.java | 8 ++++--- .../status/ComposeHttpStatusCodeChecker.java | 24 ++++++++++++++----- .../lookup/JavaNetHttpPollingClient.java | 8 +++---- .../internal/sink/HttpSinkConnectionTest.java | 1 - .../ComposeHttpStatusCodeCheckerTest.java | 9 +++---- 5 files changed, 32 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 3e68e1e3..61602254 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -61,9 +61,11 @@ public JavaNetSinkHttpClient( ComposeHttpStatusCodeCheckerConfig checkerConfig = ComposeHttpStatusCodeCheckerConfig.builder() .properties(properties) - .errorWhiteListPrefix(HTTP_ERROR_SINK_CODE_WHITE_LIST) - .errorCodePrefix(HTTP_ERROR_SINK_CODES_LIST) - .retryableWhiteListPrefix("") // TODO: sink retries not implemented yet + .deprecatedErrorWhiteListPrefix(HTTP_ERROR_SINK_CODE_WHITE_LIST) + .deprecatedCodePrefix(HTTP_ERROR_SINK_CODES_LIST) + .errorWhiteListPrefix("") // TODO: sink not refactored yet + .errorCodePrefix("") + .retryableWhiteListPrefix("") .retryableCodePrefix("") .build(); diff --git a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java index f315809d..64df60d2 100644 --- a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java +++ b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java @@ -10,7 +10,7 @@ import lombok.Data; import lombok.RequiredArgsConstructor; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; @@ -35,8 +35,7 @@ public class ComposeHttpStatusCodeChecker implements HttpStatusCodeChecker { public ComposeHttpStatusCodeChecker(ComposeHttpStatusCodeCheckerConfig config) { // Handle deprecated configuration for backward compatibility. - if (!StringUtils.isNullOrWhitespaceOnly(config.getDeprecatedCodePrefix()) || - !StringUtils.isNullOrWhitespaceOnly(config.getDeprecatedErrorWhiteListPrefix())) { + if (areDeprecatedPropertiesUsed(config)) { notRetryableErrorStatusCodes = buildPredicate(config, config.getDeprecatedCodePrefix(), config.getDeprecatedErrorWhiteListPrefix(), DEFAULT_DEPRECATED_ERROR_CODES); retryableErrorStatusCodes = integer -> false; @@ -48,6 +47,17 @@ public ComposeHttpStatusCodeChecker(ComposeHttpStatusCodeCheckerConfig config) { } } + private boolean areDeprecatedPropertiesUsed(ComposeHttpStatusCodeCheckerConfig config) { + boolean whiteListDefined = + !isNullOrWhitespaceOnly(config.getDeprecatedErrorWhiteListPrefix()); + boolean codeListDefined = !isNullOrWhitespaceOnly(config.getDeprecatedCodePrefix()); + + return (whiteListDefined && !isNullOrWhitespaceOnly( + config.getProperties().getProperty(config.getDeprecatedErrorWhiteListPrefix()))) + || (codeListDefined && !isNullOrWhitespaceOnly( + config.getProperties().getProperty(config.getDeprecatedCodePrefix()))); + } + private Predicate buildPredicate( ComposeHttpStatusCodeCheckerConfig config, String errorCodePrefix, @@ -55,8 +65,10 @@ private Predicate buildPredicate( Predicate defaultErrorCodes) { Properties properties = config.getProperties(); - String errorCodes = properties.getProperty(errorCodePrefix, ""); - String whitelistCodes = properties.getProperty(whiteListPrefix, ""); + String errorCodes = + errorCodePrefix == null ? "" : properties.getProperty(errorCodePrefix, ""); + String whitelistCodes = + whiteListPrefix == null ? "" : properties.getProperty(whiteListPrefix, ""); Predicate errorPredicate = prepareErrorCodes(errorCodes).orElse(defaultErrorCodes); @@ -74,7 +86,7 @@ private Predicate buildPredicate( */ private Optional> prepareErrorCodes(String statusCodesStr) { return Arrays.stream(statusCodesStr.split(HttpConnectorConfigConstants.PROP_DELIM)) - .filter(code -> !StringUtils.isNullOrWhitespaceOnly(code)) + .filter(code -> !isNullOrWhitespaceOnly(code)) .map(code -> code.toUpperCase().trim()) .map(codeStr -> { Preconditions.checkArgument( diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 3ace6f88..a38bf7d8 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -7,10 +7,6 @@ import java.time.Duration; import java.util.Collections; import java.util.Optional; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODES_LIST; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST; import static java.lang.String.format; import lombok.AllArgsConstructor; @@ -32,8 +28,12 @@ import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig; import com.getindata.connectors.http.internal.status.HttpResponseStatus; import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODES_LIST; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST; import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST; import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_DELAY; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_DELAY; diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java index 3607f7da..cbf33c2f 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java @@ -274,7 +274,6 @@ public void testFailedConnection404OnWhiteList() throws Exception { .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) .setProperty("gid.connector.http.sink.error.code.exclude", "404, 405") .setProperty("gid.connector.http.sink.error.code", "4XX") - .setProperty("gid.connector.http.sink.error-retryable.code.exclude", "404, 405") .build(); source.sinkTo(httpSink); env.execute("Http Sink test failed connection"); diff --git a/src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java b/src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java index 689c8955..6fd74be1 100644 --- a/src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java @@ -17,10 +17,11 @@ class ComposeHttpStatusCodeCheckerTest { - private static final String NOT_RETRYABLE_CODE_PROPERTY = "error.code"; - private static final String NOT_RETRYABLE_WHITELIST_PROPERTY = "error.code.exclude"; - private static final String RETRYABLE_CODE_PROPERTY = "retryable.code"; - private static final String RETRYABLE_WHITELIST_PROPERTY = "retryable.code.exclude"; + private static final String NOT_RETRYABLE_CODE_PROPERTY = "error.non-retryable.code"; + private static final String NOT_RETRYABLE_WHITELIST_PROPERTY = + "error.non-retryable.code.exclude"; + private static final String RETRYABLE_CODE_PROPERTY = "error.retryable.code"; + private static final String RETRYABLE_WHITELIST_PROPERTY = "error.retryable.code.exclude"; @Test void shouldReturnAppropriateStatusByDefault() { From 2589b952be66a3ddbd0bf1686f78c324688a19ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Tue, 15 Oct 2024 11:50:35 +0200 Subject: [PATCH 5/6] Refactoring --- .../http/internal/sink/HttpSinkWriter.java | 28 ++++++++-------- .../httpclient/JavaNetSinkHttpClient.java | 12 +++---- .../status/ComposeHttpStatusCodeChecker.java | 32 +++++++++---------- 3 files changed, 35 insertions(+), 37 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java index 3b42f815..d17e9213 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java @@ -46,18 +46,18 @@ public class HttpSinkWriter extends AsyncSinkWriter elementConverter, - Sink.InitContext context, - int maxBatchSize, - int maxInFlightRequests, - int maxBufferedRequests, - long maxBatchSizeInBytes, - long maxTimeInBufferMS, - long maxRecordSizeInBytes, - String endpointUrl, - SinkHttpClient sinkHttpClient, - Collection> bufferedRequestStates, - Properties properties) { + ElementConverter elementConverter, + Sink.InitContext context, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + String endpointUrl, + SinkHttpClient sinkHttpClient, + Collection> bufferedRequestStates, + Properties properties) { super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates); @@ -82,8 +82,8 @@ public HttpSinkWriter( // TODO: Reintroduce retries by adding backoff policy @Override protected void submitRequestEntries( - List requestEntries, - Consumer> requestResult) { + List requestEntries, + Consumer> requestResult) { var future = sinkHttpClient.putRequests(requestEntries, endpointUrl); future.whenCompleteAsync((response, err) -> { if (err != null) { diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 61602254..6180261a 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -110,14 +110,12 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse( optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); // TODO Add response processor here and orchestrate it with statusCodeChecker. - if (optResponse.isEmpty() || - statusCodeChecker.checkStatus(optResponse.get().statusCode()).equals( - HttpResponseStatus.FAILURE_RETRYABLE) || - statusCodeChecker.checkStatus(optResponse.get().statusCode()).equals( - HttpResponseStatus.FAILURE_NOT_RETRYABLE)) { - failedResponses.add(sinkRequestEntry); - } else { + if (optResponse.isPresent() && + statusCodeChecker.checkStatus(optResponse.get().statusCode()) + .equals(HttpResponseStatus.SUCCESS)) { successfulResponses.add(sinkRequestEntry); + } else { + failedResponses.add(sinkRequestEntry); } } diff --git a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java index 64df60d2..9c5c8cd1 100644 --- a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java +++ b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java @@ -88,25 +88,25 @@ private Optional> prepareErrorCodes(String statusCodesStr) { return Arrays.stream(statusCodesStr.split(HttpConnectorConfigConstants.PROP_DELIM)) .filter(code -> !isNullOrWhitespaceOnly(code)) .map(code -> code.toUpperCase().trim()) - .map(codeStr -> { - Preconditions.checkArgument( - codeStr.length() == 3, - "Status code should contain three characters. Provided [%s]", - codeStr); - - // at this point we have trim, upper case 3 character status code. - if (isTypeCode(codeStr)) { - int code = Integer.parseInt(codeStr.replace("X", "")); - return new TypeStatusCodeCheckerPredicate( - HttpResponseCodeType.getByCode(code)); - } else { - return new SingleValueHttpStatusCodeCheckerPredicate( - Integer.parseInt(codeStr)); - } - }) + .map(this::prepareErrorCode) .reduce(Predicate::or); } + private Predicate prepareErrorCode(String codeString) { + Preconditions.checkArgument( + codeString.length() == 3, + "Status code should contain three characters. Provided [%s]", + codeString); + + // at this point we have trim, upper case 3 character status code. + if (isTypeCode(codeString)) { + int code = Integer.parseInt(codeString.replace("X", "")); + return new TypeStatusCodeCheckerPredicate(HttpResponseCodeType.getByCode(code)); + } else { + return new SingleValueHttpStatusCodeCheckerPredicate(Integer.parseInt(codeString)); + } + } + /** * This method checks if "code" param matches "digit + XX" mask. This method expects that * provided string will be 3 elements long, trim and upper case. From 65f49b375bc49dcb4c501ef661d1fe40df83d0dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Tue, 15 Oct 2024 12:58:09 +0200 Subject: [PATCH 6/6] Change properties names --- .../httpclient/JavaNetSinkHttpClient.java | 12 ++--- .../status/ComposeHttpStatusCodeChecker.java | 47 ++++++++++--------- .../internal/status/HttpResponseStatus.java | 2 +- .../lookup/JavaNetHttpPollingClient.java | 23 ++++----- .../ComposeHttpStatusCodeCheckerTest.java | 40 ++++++++-------- 5 files changed, 63 insertions(+), 61 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 6180261a..5baac0f3 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -61,12 +61,12 @@ public JavaNetSinkHttpClient( ComposeHttpStatusCodeCheckerConfig checkerConfig = ComposeHttpStatusCodeCheckerConfig.builder() .properties(properties) - .deprecatedErrorWhiteListPrefix(HTTP_ERROR_SINK_CODE_WHITE_LIST) - .deprecatedCodePrefix(HTTP_ERROR_SINK_CODES_LIST) - .errorWhiteListPrefix("") // TODO: sink not refactored yet - .errorCodePrefix("") - .retryableWhiteListPrefix("") - .retryableCodePrefix("") + .whiteListPrefix(HTTP_ERROR_SINK_CODE_WHITE_LIST) + .errorCodePrefix(HTTP_ERROR_SINK_CODES_LIST) + .nonRetryableErrorWhiteListPrefix("") // TODO: sink not refactored yet + .nonRetryableErrorCodePrefix("") + .retryableErrorWhiteListPrefix("") + .retryableErrorCodePrefix("") .build(); this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); diff --git a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java index 9c5c8cd1..7f75dad4 100644 --- a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java +++ b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java @@ -23,39 +23,40 @@ public class ComposeHttpStatusCodeChecker implements HttpStatusCodeChecker { private static final int MIN_HTTP_STATUS_CODE = 100; private static final int MAX_HTTP_STATUS_CODE = 599; - private static final Predicate DEFAULT_ERROR_CODES = + private static final Predicate DEFAULT_NON_RETRYABLE_ERROR_CODES = new TypeStatusCodeCheckerPredicate(HttpResponseCodeType.CLIENT_ERROR); private static final Predicate DEFAULT_RETRYABLE_ERROR_CODES = new TypeStatusCodeCheckerPredicate(HttpResponseCodeType.SERVER_ERROR); private static final Predicate DEFAULT_DEPRECATED_ERROR_CODES = - DEFAULT_ERROR_CODES.or(DEFAULT_RETRYABLE_ERROR_CODES); + DEFAULT_NON_RETRYABLE_ERROR_CODES.or(DEFAULT_RETRYABLE_ERROR_CODES); private final Predicate retryableErrorStatusCodes; - private final Predicate notRetryableErrorStatusCodes; + private final Predicate nonRetryableErrorStatusCodes; public ComposeHttpStatusCodeChecker(ComposeHttpStatusCodeCheckerConfig config) { // Handle deprecated configuration for backward compatibility. if (areDeprecatedPropertiesUsed(config)) { - notRetryableErrorStatusCodes = buildPredicate(config, config.getDeprecatedCodePrefix(), - config.getDeprecatedErrorWhiteListPrefix(), DEFAULT_DEPRECATED_ERROR_CODES); + nonRetryableErrorStatusCodes = buildPredicate(config, config.getErrorCodePrefix(), + config.getWhiteListPrefix(), DEFAULT_DEPRECATED_ERROR_CODES); retryableErrorStatusCodes = integer -> false; } else { - retryableErrorStatusCodes = buildPredicate(config, config.getRetryableCodePrefix(), - config.getRetryableWhiteListPrefix(), DEFAULT_RETRYABLE_ERROR_CODES); - notRetryableErrorStatusCodes = buildPredicate(config, config.getErrorCodePrefix(), - config.getErrorWhiteListPrefix(), DEFAULT_ERROR_CODES); + retryableErrorStatusCodes = buildPredicate(config, config.getRetryableErrorCodePrefix(), + config.getRetryableErrorWhiteListPrefix(), DEFAULT_RETRYABLE_ERROR_CODES); + nonRetryableErrorStatusCodes = + buildPredicate(config, config.getNonRetryableErrorCodePrefix(), + config.getNonRetryableErrorWhiteListPrefix(), + DEFAULT_NON_RETRYABLE_ERROR_CODES); } } private boolean areDeprecatedPropertiesUsed(ComposeHttpStatusCodeCheckerConfig config) { - boolean whiteListDefined = - !isNullOrWhitespaceOnly(config.getDeprecatedErrorWhiteListPrefix()); - boolean codeListDefined = !isNullOrWhitespaceOnly(config.getDeprecatedCodePrefix()); + boolean whiteListDefined = !isNullOrWhitespaceOnly(config.getWhiteListPrefix()); + boolean codeListDefined = !isNullOrWhitespaceOnly(config.getErrorCodePrefix()); return (whiteListDefined && !isNullOrWhitespaceOnly( - config.getProperties().getProperty(config.getDeprecatedErrorWhiteListPrefix()))) - || (codeListDefined && !isNullOrWhitespaceOnly( - config.getProperties().getProperty(config.getDeprecatedCodePrefix()))); + config.getProperties().getProperty(config.getWhiteListPrefix()))) || + (codeListDefined && !isNullOrWhitespaceOnly( + config.getProperties().getProperty(config.getErrorCodePrefix()))); } private Predicate buildPredicate( @@ -143,8 +144,8 @@ public HttpResponseStatus checkStatus(int statusCode) { MAX_HTTP_STATUS_CODE) ); - if (notRetryableErrorStatusCodes.test(statusCode)) { - return HttpResponseStatus.FAILURE_NOT_RETRYABLE; + if (nonRetryableErrorStatusCodes.test(statusCode)) { + return HttpResponseStatus.FAILURE_NON_RETRYABLE; } else if (retryableErrorStatusCodes.test(statusCode)) { return HttpResponseStatus.FAILURE_RETRYABLE; } else { @@ -157,17 +158,17 @@ public HttpResponseStatus checkStatus(int statusCode) { @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public static class ComposeHttpStatusCodeCheckerConfig { - private final String deprecatedErrorWhiteListPrefix; + private final String whiteListPrefix; - private final String deprecatedCodePrefix; + private final String errorCodePrefix; - private final String errorWhiteListPrefix; + private final String nonRetryableErrorWhiteListPrefix; - private final String errorCodePrefix; + private final String nonRetryableErrorCodePrefix; - private final String retryableWhiteListPrefix; + private final String retryableErrorWhiteListPrefix; - private final String retryableCodePrefix; + private final String retryableErrorCodePrefix; private final Properties properties; } diff --git a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseStatus.java b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseStatus.java index 6fb7f3c9..0c17babc 100644 --- a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseStatus.java +++ b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseStatus.java @@ -16,5 +16,5 @@ public enum HttpResponseStatus { /** * Request failed but cannot be retried. */ - FAILURE_NOT_RETRYABLE, + FAILURE_NON_RETRYABLE, } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index a38bf7d8..11d3a48d 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -79,12 +79,13 @@ public JavaNetHttpPollingClient( ComposeHttpStatusCodeCheckerConfig checkerConfig = ComposeHttpStatusCodeCheckerConfig.builder() .properties(options.getProperties()) - .deprecatedErrorWhiteListPrefix(HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST) - .deprecatedCodePrefix(HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST) - .errorWhiteListPrefix(HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST) - .errorCodePrefix(HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODES_LIST) - .retryableWhiteListPrefix(HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST) - .retryableCodePrefix(HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST) + .whiteListPrefix(HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST) + .errorCodePrefix(HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST) + .nonRetryableErrorWhiteListPrefix( + HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST) + .nonRetryableErrorCodePrefix(HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODES_LIST) + .retryableErrorWhiteListPrefix(HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST) + .retryableErrorCodePrefix(HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST) .build(); this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); @@ -157,7 +158,7 @@ private Optional queryAndProcess(RowData lookupData) throws Exception { ); Response parsedResponse = processHttpResponse(httpResponse, request); if (parsedResponse.getStatus() == HttpResponseStatus.SUCCESS - || parsedResponse.getStatus() == HttpResponseStatus.FAILURE_NOT_RETRYABLE) { + || parsedResponse.getStatus() == HttpResponseStatus.FAILURE_NON_RETRYABLE) { return parsedResponse.getRowData(); } else { if (tryCount == maxRetryCount) { @@ -197,9 +198,9 @@ private Response processHttpResponse( if (httpResponseStatus == HttpResponseStatus.SUCCESS) { log.trace("Returned successful status code [%s]."); return Response.success(responseBodyDecoder.deserialize(responseBody.getBytes())); - } else if (httpResponseStatus == HttpResponseStatus.FAILURE_NOT_RETRYABLE) { + } else if (httpResponseStatus == HttpResponseStatus.FAILURE_NON_RETRYABLE) { log.warn(format("Returned not retryable error status code [%s].", statusCode)); - return Response.notRetryable(); + return Response.nonRetryable(); } else if (httpResponseStatus == HttpResponseStatus.FAILURE_RETRYABLE) { log.warn(format("Returned retryable error status code [%s].", statusCode)); return Response.retryable(); @@ -220,8 +221,8 @@ static Response success(RowData rowData) { return new Response(HttpResponseStatus.SUCCESS, rowData); } - static Response notRetryable() { - return new Response(HttpResponseStatus.FAILURE_NOT_RETRYABLE, null); + static Response nonRetryable() { + return new Response(HttpResponseStatus.FAILURE_NON_RETRYABLE, null); } static Response retryable() { diff --git a/src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java b/src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java index 6fd74be1..8dac2472 100644 --- a/src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeCheckerTest.java @@ -11,7 +11,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig; -import static com.getindata.connectors.http.internal.status.HttpResponseStatus.FAILURE_NOT_RETRYABLE; +import static com.getindata.connectors.http.internal.status.HttpResponseStatus.FAILURE_NON_RETRYABLE; import static com.getindata.connectors.http.internal.status.HttpResponseStatus.FAILURE_RETRYABLE; import static com.getindata.connectors.http.internal.status.HttpResponseStatus.SUCCESS; @@ -33,8 +33,8 @@ void shouldReturnAppropriateStatusByDefault() { assertThat(codeChecker.checkStatus(100)).isEqualTo(SUCCESS); assertThat(codeChecker.checkStatus(200)).isEqualTo(SUCCESS); assertThat(codeChecker.checkStatus(302)).isEqualTo(SUCCESS); - assertThat(codeChecker.checkStatus(400)).isEqualTo(FAILURE_NOT_RETRYABLE); - assertThat(codeChecker.checkStatus(404)).isEqualTo(FAILURE_NOT_RETRYABLE); + assertThat(codeChecker.checkStatus(400)).isEqualTo(FAILURE_NON_RETRYABLE); + assertThat(codeChecker.checkStatus(404)).isEqualTo(FAILURE_NON_RETRYABLE); assertThat(codeChecker.checkStatus(500)).isEqualTo(FAILURE_RETRYABLE); assertThat(codeChecker.checkStatus(501)).isEqualTo(FAILURE_RETRYABLE); assertThat(codeChecker.checkStatus(503)).isEqualTo(FAILURE_RETRYABLE); @@ -54,14 +54,14 @@ void shouldReturnAppropriateStatus() { HttpStatusCodeChecker codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); assertAll(() -> { - assertThat(codeChecker.checkStatus(100)).isEqualTo(FAILURE_NOT_RETRYABLE); + assertThat(codeChecker.checkStatus(100)).isEqualTo(FAILURE_NON_RETRYABLE); assertThat(codeChecker.checkStatus(200)).isEqualTo(SUCCESS); - assertThat(codeChecker.checkStatus(400)).isEqualTo(FAILURE_NOT_RETRYABLE); + assertThat(codeChecker.checkStatus(400)).isEqualTo(FAILURE_NON_RETRYABLE); assertThat(codeChecker.checkStatus(404)).isEqualTo(FAILURE_RETRYABLE); assertThat(codeChecker.checkStatus(500)).isEqualTo(FAILURE_RETRYABLE); assertThat(codeChecker.checkStatus(501)).isEqualTo(SUCCESS); assertThat(codeChecker.checkStatus(503)).isEqualTo(FAILURE_RETRYABLE); - assertThat(codeChecker.checkStatus(505)).isEqualTo(FAILURE_NOT_RETRYABLE); + assertThat(codeChecker.checkStatus(505)).isEqualTo(FAILURE_NON_RETRYABLE); }); } @@ -87,13 +87,13 @@ void shouldParseWhiteList() { .withFailMessage( "Not on a white list but matches 3XX range. " + "Should be considered as error code.") - .isEqualTo(FAILURE_NOT_RETRYABLE); + .isEqualTo(FAILURE_NON_RETRYABLE); }); } @Test void shouldParseErrorCodeList() { - List notRetryableCodes = List.of(100, 202, 404); + List nonRetryableCodes = List.of(100, 202, 404); List retryableCodes = List.of(302, 502); Properties properties = new Properties(); properties.setProperty(NOT_RETRYABLE_CODE_PROPERTY, "100, 202, 404"); @@ -103,8 +103,8 @@ void shouldParseErrorCodeList() { HttpStatusCodeChecker codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); assertAll(() -> { - notRetryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) - .isEqualTo(FAILURE_NOT_RETRYABLE)); + nonRetryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) + .isEqualTo(FAILURE_NON_RETRYABLE)); retryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) .isEqualTo(FAILURE_RETRYABLE)); }); @@ -115,15 +115,15 @@ void shouldParseErrorCodeRange() { Properties properties = new Properties(); properties.setProperty(NOT_RETRYABLE_CODE_PROPERTY, "1XX, 2XX"); properties.setProperty(RETRYABLE_CODE_PROPERTY, "3XX, 4XX"); - List notRetryableCodes = List.of(100, 110, 200, 220); + List nonRetryableCodes = List.of(100, 110, 200, 220); List retryableCodes = List.of(301, 404); ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); HttpStatusCodeChecker codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); assertAll(() -> { - notRetryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) - .isEqualTo(FAILURE_NOT_RETRYABLE)); + nonRetryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) + .isEqualTo(FAILURE_NON_RETRYABLE)); retryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) .isEqualTo(FAILURE_RETRYABLE)); assertThat(codeChecker.checkStatus(503)) @@ -138,15 +138,15 @@ void shouldIgnoreRedundantWhiteSpacesOrEmptyOrRepeatedValues() { Properties properties = new Properties(); properties.setProperty(NOT_RETRYABLE_CODE_PROPERTY, " , 100,200, 300, , 303 ,200 "); properties.setProperty(RETRYABLE_CODE_PROPERTY, ",5XX, 4XX,, ,"); - List notRetryableCodes = List.of(100, 200, 300, 303); + List nonRetryableCodes = List.of(100, 200, 300, 303); List retryableCodes = List.of(500, 501, 400, 401); ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); HttpStatusCodeChecker codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); assertAll(() -> { - notRetryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) - .isEqualTo(FAILURE_NOT_RETRYABLE)); + nonRetryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) + .isEqualTo(FAILURE_NON_RETRYABLE)); retryableCodes.forEach(code -> assertThat(codeChecker.checkStatus(code)) .isEqualTo(FAILURE_RETRYABLE)); }); @@ -184,10 +184,10 @@ void shouldThrowOnInvalidCodeRangeInRetryableError(String listCode) { private ComposeHttpStatusCodeCheckerConfig prepareCheckerConfig(Properties properties) { return ComposeHttpStatusCodeCheckerConfig.builder() .properties(properties) - .errorCodePrefix(NOT_RETRYABLE_CODE_PROPERTY) - .errorWhiteListPrefix(NOT_RETRYABLE_WHITELIST_PROPERTY) - .retryableCodePrefix(RETRYABLE_CODE_PROPERTY) - .retryableWhiteListPrefix(RETRYABLE_WHITELIST_PROPERTY) + .nonRetryableErrorCodePrefix(NOT_RETRYABLE_CODE_PROPERTY) + .nonRetryableErrorWhiteListPrefix(NOT_RETRYABLE_WHITELIST_PROPERTY) + .retryableErrorCodePrefix(RETRYABLE_CODE_PROPERTY) + .retryableErrorWhiteListPrefix(RETRYABLE_WHITELIST_PROPERTY) .build(); } }