diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2ea4c8b5fa1..37815ece445 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -29,11 +29,14 @@ commons-math3 = "org.apache.commons:commons-math3:3.6.1" conscrypt = "org.conscrypt:conscrypt-openjdk-uber:2.5.2" cronet-api = "org.chromium.net:cronet-api:119.6045.31" cronet-embedded = "org.chromium.net:cronet-embedded:119.6045.31" +dev-cel-compiler = "dev.cel:compiler:0.9.1-proto3" +dev-cel-protobuf = "dev.cel:protobuf:0.9.1-proto3" +dev-cel-runtime = "dev.cel:runtime:0.9.1-proto3" # error-prone 2.31.0+ blocked on https://github.com/grpc/grpc-java/issues/10152 # It breaks Bazel (ArrayIndexOutOfBoundsException in turbine) and Dexing ("D8: # java.lang.NullPointerException"). We can trivially upgrade the Bazel CI to # 6.3.0+ (https://github.com/bazelbuild/bazel/issues/18743). -errorprone-annotations = "com.google.errorprone:error_prone_annotations:2.30.0" +errorprone-annotations = "com.google.errorprone:error_prone_annotations:2.36.0" # error-prone 2.32.0+ require Java 17+ errorprone-core = "com.google.errorprone:error_prone_core:2.31.0" google-api-protos = "com.google.api.grpc:proto-google-common-protos:2.51.0" diff --git a/xds/build.gradle b/xds/build.gradle index 90ba3709d14..895736ee222 100644 --- a/xds/build.gradle +++ b/xds/build.gradle @@ -53,6 +53,8 @@ dependencies { project(':grpc-services'), project(':grpc-auth'), project(path: ':grpc-alts', configuration: 'shadow'), + libraries.dev.cel.runtime, + libraries.dev.cel.protobuf, libraries.guava, libraries.gson, libraries.re2j, @@ -72,7 +74,8 @@ dependencies { compileOnly libraries.netty.transport.epoll testImplementation project(':grpc-testing'), - project(':grpc-testing-proto') + project(':grpc-testing-proto'), + libraries.dev.cel.compiler testImplementation (libraries.netty.transport.epoll) { artifact { classifier = "linux-x86_64" diff --git a/xds/src/main/java/io/grpc/xds/FilterRegistry.java b/xds/src/main/java/io/grpc/xds/FilterRegistry.java index 426c6d1b3f6..2e683283107 100644 --- a/xds/src/main/java/io/grpc/xds/FilterRegistry.java +++ b/xds/src/main/java/io/grpc/xds/FilterRegistry.java @@ -37,7 +37,8 @@ static synchronized FilterRegistry getDefaultRegistry() { instance = newRegistry().register( new FaultFilter.Provider(), new RouterFilter.Provider(), - new RbacFilter.Provider()); + new RbacFilter.Provider(), + new RlqsFilter.Provider()); } return instance; } diff --git a/xds/src/main/java/io/grpc/xds/MessagePrinter.java b/xds/src/main/java/io/grpc/xds/MessagePrinter.java index 5927bfd517e..f94c775776d 100644 --- a/xds/src/main/java/io/grpc/xds/MessagePrinter.java +++ b/xds/src/main/java/io/grpc/xds/MessagePrinter.java @@ -16,6 +16,8 @@ package io.grpc.xds; +import com.github.xds.type.matcher.v3.CelMatcher; +import com.github.xds.type.matcher.v3.HttpAttributesCelMatchInput; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; @@ -28,6 +30,8 @@ import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig; import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault; +import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig; +import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride; import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBAC; import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBACPerRoute; import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; @@ -58,6 +62,11 @@ private static JsonFormat.Printer newPrinter() { .add(RBAC.getDescriptor()) .add(RBACPerRoute.getDescriptor()) .add(Router.getDescriptor()) + // RLQS + .add(RateLimitQuotaFilterConfig.getDescriptor()) + .add(RateLimitQuotaOverride.getDescriptor()) + .add(HttpAttributesCelMatchInput.getDescriptor()) + .add(CelMatcher.getDescriptor()) // UpstreamTlsContext and DownstreamTlsContext in v3 are not transitively imported // by top-level resource types. .add(UpstreamTlsContext.getDescriptor()) diff --git a/xds/src/main/java/io/grpc/xds/RlqsFilter.java b/xds/src/main/java/io/grpc/xds/RlqsFilter.java new file mode 100644 index 00000000000..123c2aca590 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/RlqsFilter.java @@ -0,0 +1,284 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.xds.client.XdsResourceType.ResourceInvalidException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaBucketSettings; +import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig; +import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride; +import io.grpc.InternalLogId; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.SharedResourceHolder; +import io.grpc.xds.client.XdsLogger; +import io.grpc.xds.client.XdsLogger.XdsLogLevel; +import io.grpc.xds.internal.datatype.GrpcService; +import io.grpc.xds.internal.matchers.HttpMatchInput; +import io.grpc.xds.internal.matchers.Matcher; +import io.grpc.xds.internal.matchers.MatcherList; +import io.grpc.xds.internal.matchers.OnMatch; +import io.grpc.xds.internal.rlqs.RlqsBucketSettings; +import io.grpc.xds.internal.rlqs.RlqsCache; +import io.grpc.xds.internal.rlqs.RlqsFilterState; +import io.grpc.xds.internal.rlqs.RlqsRateLimitResult; +import java.util.ConcurrentModificationException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** RBAC Http filter implementation. */ +// TODO(sergiitk): introduce a layer between the filter and interceptor. +// lds has filter names and the names are unique - even for server instances. +final class RlqsFilter implements Filter { + private final XdsLogger logger; + + static final boolean enabled = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_ENABLE_RLQS", false); + + // TODO(sergiitk): [IMPL] remove + // Do do not fail on parsing errors, only log requests. + static final boolean dryRun = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_RLQS_DRY_RUN", false); + + static final String TYPE_URL = "type.googleapis.com/" + + "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig"; + static final String TYPE_URL_OVERRIDE_CONFIG = "type.googleapis.com/" + + "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride"; + + private final AtomicBoolean shutdown = new AtomicBoolean(); + private final AtomicReference rlqsCache = new AtomicReference<>(); + + // TODO(sergiitk): [IMPL] figure out what to use here. + private final ScheduledExecutorService scheduler = + SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); + + public RlqsFilter(String name) { + logger = XdsLogger.withLogId(InternalLogId.allocate(this.getClass(), null)); + logger.log(XdsLogLevel.DEBUG, + "Created RLQS Filter name='%s' with enabled=%s, dryRun=%s", name, enabled, dryRun); + } + + static final class Provider implements Filter.Provider { + private static final Logger logger = Logger.getLogger(Provider.class.getName()); + + @Override + public String[] typeUrls() { + return new String[]{TYPE_URL, TYPE_URL_OVERRIDE_CONFIG}; + } + + @Override + public boolean isServerFilter() { + return true; + } + + @Override + public RlqsFilter newInstance(String name) { + return new RlqsFilter(name); + } + + @Override + public ConfigOrError parseFilterConfig(Message rawProtoMessage) { + try { + RlqsFilterConfig rlqsFilterConfig = + parseRlqsFilter(unpackAny(rawProtoMessage, RateLimitQuotaFilterConfig.class)); + return ConfigOrError.fromConfig(rlqsFilterConfig); + } catch (InvalidProtocolBufferException e) { + return ConfigOrError.fromError("Can't unpack RateLimitQuotaFilterConfig proto: " + e); + } catch (ResourceInvalidException e) { + return ConfigOrError.fromError(e.getMessage()); + } + } + + @Override + public ConfigOrError parseFilterConfigOverride(Message rawProtoMessage) { + try { + RlqsFilterConfig rlqsFilterConfig = + parseRlqsFilterOverride(unpackAny(rawProtoMessage, RateLimitQuotaOverride.class)); + return ConfigOrError.fromConfig(rlqsFilterConfig); + } catch (InvalidProtocolBufferException e) { + return ConfigOrError.fromError("Can't unpack RateLimitQuotaOverride proto: " + e); + } catch (ResourceInvalidException e) { + return ConfigOrError.fromError(e.getMessage()); + } + } + + @VisibleForTesting + RlqsFilterConfig parseRlqsFilter(RateLimitQuotaFilterConfig rlqsFilterProto) + throws ResourceInvalidException, InvalidProtocolBufferException { + RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder(); + if (rlqsFilterProto.getDomain().isEmpty()) { + throw new ResourceInvalidException("RateLimitQuotaFilterConfig domain is required"); + } + builder.domain(rlqsFilterProto.getDomain()) + .rlqsService(GrpcService.fromEnvoyProto(rlqsFilterProto.getRlqsServer())); + + // TODO(sergiitk): [IMPL] Remove + if (dryRun) { + logger.finest("RLQS DRY RUN: not parsing matchers"); + return builder.build(); + } + + // TODO(sergiitk): [IMPL] actually parse, move to RlqsBucketSettings.fromProto() + RateLimitQuotaBucketSettings fallbackBucketSettingsProto = unpackAny( + rlqsFilterProto.getBucketMatchers().getOnNoMatch().getAction().getTypedConfig(), + RateLimitQuotaBucketSettings.class); + RlqsBucketSettings fallbackBucket = RlqsBucketSettings.create( + ImmutableMap.of("bucket_id", headers -> "hello"), + fallbackBucketSettingsProto.getReportingInterval()); + + // TODO(sergiitk): [IMPL] actually parse, move to Matcher.fromProto() + Matcher bucketMatchers = new RlqsMatcher(fallbackBucket); + + return builder.bucketMatchers(bucketMatchers).build(); + } + } + + @Override + public void close() { + // TODO(sergiitk): [DESIGN] besides shutting down everything, should there + // be per-route interceptor destructors? + if (!shutdown.compareAndSet(false, true)) { + throw new ConcurrentModificationException( + "Unexpected: RlqsFilter#close called multiple times"); + } + RlqsCache oldCache = rlqsCache.getAndUpdate(unused -> null); + if (oldCache != null) { + oldCache.shutdown(); + } + } + + // @Override + public boolean isEnabled() { + return enabled; + } + + @Nullable + @Override + public ServerInterceptor buildServerInterceptor( + FilterConfig config, @Nullable FilterConfig overrideConfig) { + // ScheduledExecutorService scheduler + + // Called when we get an xds update - when the LRS or RLS changes. + RlqsFilterConfig rlqsFilterConfig = (RlqsFilterConfig) checkNotNull(config, "config"); + + // Per-route and per-host configuration overrides. + if (overrideConfig != null) { + RlqsFilterConfig rlqsFilterOverride = (RlqsFilterConfig) overrideConfig; + // All fields are inherited from the main config, unless overridden. + RlqsFilterConfig.Builder overrideBuilder = rlqsFilterConfig.toBuilder(); + if (!rlqsFilterOverride.domain().isEmpty()) { + overrideBuilder.domain(rlqsFilterOverride.domain()); + } + if (rlqsFilterOverride.bucketMatchers() != null) { + overrideBuilder.bucketMatchers(rlqsFilterOverride.bucketMatchers()); + } + // Override bucket matchers if not null. + rlqsFilterConfig = overrideBuilder.build(); + } + + rlqsCache.compareAndSet(null, RlqsCache.newInstance(scheduler)); + return generateRlqsInterceptor(rlqsFilterConfig); + } + + @Nullable + private ServerInterceptor generateRlqsInterceptor(RlqsFilterConfig config) { + checkNotNull(config, "config"); + checkNotNull(config.rlqsService(), "config.rlqsService"); + RlqsCache rlqsCache = this.rlqsCache.get(); + if (rlqsCache == null) { + // Being shut down, return no interceptor. + return null; + } + + final RlqsFilterState rlqsFilterState = rlqsCache.getOrCreateFilterState(config); + + return new ServerInterceptor() { + @Override + public Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + HttpMatchInput httpMatchInput = HttpMatchInput.create(headers, call); + + // TODO(sergiitk): [IMPL] Remove + if (dryRun) { + // logger.log(XdsLogLevel.INFO, "RLQS DRY RUN: request <<" + httpMatchInput + ">>"); + return next.startCall(call, headers); + } + + RlqsRateLimitResult result = rlqsFilterState.rateLimit(httpMatchInput); + if (result.isAllowed()) { + return next.startCall(call, headers); + } + RlqsRateLimitResult.DenyResponse denyResponse = result.denyResponse().get(); + call.close(denyResponse.status(), denyResponse.headersToAdd()); + return new ServerCall.Listener(){}; + } + }; + } + + static class RlqsMatcher extends Matcher { + private final RlqsBucketSettings fallbackBucket; + + RlqsMatcher(RlqsBucketSettings fallbackBucket) { + this.fallbackBucket = fallbackBucket; + } + + @Nullable + @Override + public MatcherList matcherList() { + return null; + } + + @Override + public OnMatch onNoMatch() { + return OnMatch.ofAction(fallbackBucket); + } + + @Override + public RlqsBucketSettings match(HttpMatchInput input) { + return null; + } + } + + @VisibleForTesting + static RlqsFilterConfig parseRlqsFilterOverride(RateLimitQuotaOverride rlqsFilterProtoOverride) + throws ResourceInvalidException { + RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder(); + // TODO(sergiitk): [IMPL] bucket_matchers. + + return builder.domain(rlqsFilterProtoOverride.getDomain()).build(); + } + + private static T unpackAny( + Message message, Class clazz) throws InvalidProtocolBufferException { + if (!(message instanceof Any)) { + throw new InvalidProtocolBufferException( + "Invalid config type: " + message.getClass().getCanonicalName()); + } + return ((Any) message).unpack(clazz); + } +} diff --git a/xds/src/main/java/io/grpc/xds/RlqsFilterConfig.java b/xds/src/main/java/io/grpc/xds/RlqsFilterConfig.java new file mode 100644 index 00000000000..1ffb88709f9 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/RlqsFilterConfig.java @@ -0,0 +1,63 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import com.google.auto.value.AutoValue; +import io.grpc.xds.Filter.FilterConfig; +import io.grpc.xds.internal.datatype.GrpcService; +import io.grpc.xds.internal.matchers.HttpMatchInput; +import io.grpc.xds.internal.matchers.Matcher; +import io.grpc.xds.internal.rlqs.RlqsBucketSettings; +import javax.annotation.Nullable; + +/** Parsed RateLimitQuotaFilterConfig. */ +@AutoValue +public abstract class RlqsFilterConfig implements FilterConfig { + + @Override + public final String typeUrl() { + return RlqsFilter.TYPE_URL; + } + + public abstract String domain(); + + // TODO(sergiitk): make not nullable, introduce RlqsFilterConfigOverride + @Nullable + public abstract GrpcService rlqsService(); + + // TODO(sergiitk): make not nullable, introduce RlqsFilterConfigOverride + @Nullable + public abstract Matcher bucketMatchers(); + + public static Builder builder() { + return new AutoValue_RlqsFilterConfig.Builder(); + } + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder domain(String domain); + + abstract Builder rlqsService(GrpcService rlqsService); + + public abstract Builder bucketMatchers(Matcher matcher); + + abstract RlqsFilterConfig build(); + } + +} diff --git a/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java b/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java index 90babd1e8d0..a2c0cf8a1ef 100644 --- a/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java +++ b/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.grpc.ChannelCredentials; import io.grpc.Internal; import io.grpc.xds.client.EnvoyProtoData.Node; import java.util.List; @@ -77,6 +78,21 @@ public static ServerInfo create( } } + /** + * TODO(sergiitk): description. + */ + @AutoValue + @Internal + public abstract static class RemoteServerInfo { + public abstract String target(); + + public abstract ChannelCredentials channelCredentials(); + + public static RemoteServerInfo create(String target, ChannelCredentials channelCredentials) { + return new AutoValue_Bootstrapper_RemoteServerInfo(target, channelCredentials); + } + } + /** * Data class containing Certificate provider information: the plugin-name and an opaque * Map that represents the config for that plugin. diff --git a/xds/src/main/java/io/grpc/xds/client/XdsResourceType.java b/xds/src/main/java/io/grpc/xds/client/XdsResourceType.java index ccb622ff168..bf3ac7fa862 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsResourceType.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsResourceType.java @@ -127,6 +127,14 @@ public ResourceInvalidException(String message) { public ResourceInvalidException(String message, Throwable cause) { super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false); } + + public static ResourceInvalidException ofResource(String resourceName, String reason) { + return new ResourceInvalidException("Error parsing " + resourceName + ": " + reason); + } + + public static ResourceInvalidException ofResource(Message proto, String reason) { + return ResourceInvalidException.ofResource(proto.getClass().getCanonicalName(), reason); + } } ValidatedResourceUpdate parse(Args args, List resources) { diff --git a/xds/src/main/java/io/grpc/xds/internal/MetadataHelper.java b/xds/src/main/java/io/grpc/xds/internal/MetadataHelper.java new file mode 100644 index 00000000000..eb3589835cc --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/MetadataHelper.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.BaseEncoding; +import io.grpc.Metadata; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; + + +public class MetadataHelper { + public static ImmutableMap metadataToHeaders(Metadata metadata) { + return metadata.keys().stream().collect(ImmutableMap.toImmutableMap( + headerName -> headerName, + headerName -> Strings.nullToEmpty(deserializeHeader(metadata, headerName)))); + } + + @Nullable + public static String deserializeHeader(Metadata metadata, String headerName) { + if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + Metadata.Key key; + try { + key = Metadata.Key.of(headerName, Metadata.BINARY_BYTE_MARSHALLER); + } catch (IllegalArgumentException e) { + return null; + } + Iterable values = metadata.getAll(key); + if (values == null) { + return null; + } + List encoded = new ArrayList<>(); + for (byte[] v : values) { + encoded.add(BaseEncoding.base64().omitPadding().encode(v)); + } + return String.join(",", encoded); + } + Metadata.Key key; + try { + key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER); + } catch (IllegalArgumentException e) { + return null; + } + Iterable values = metadata.getAll(key); + return values == null ? null : String.join(",", values); + } + + public static boolean containsHeader(Metadata metadata, String headerName) { + return metadata.containsKey(Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER)); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/datatype/GrpcService.java b/xds/src/main/java/io/grpc/xds/internal/datatype/GrpcService.java new file mode 100644 index 00000000000..658544a4291 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/datatype/GrpcService.java @@ -0,0 +1,77 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.datatype; + +import static io.grpc.xds.client.XdsResourceType.ResourceInvalidException; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.Duration; +import javax.annotation.Nullable; + +@AutoValue +public abstract class GrpcService { + public abstract String targetUri(); + + // TODO(sergiitk): [QUESTION] do we need this? + // abstract String statPrefix(); + + // TODO(sergiitk): [IMPL] channelCredentials + // TODO(sergiitk): [IMPL] callCredentials + // TODO(sergiitk): [IMPL] channelArgs + + /** Optional timeout duration for the gRPC request to the service. */ + @Nullable + public abstract Duration timeout(); + + public static GrpcService fromEnvoyProto( + io.envoyproxy.envoy.config.core.v3.GrpcService grpcServiceProto) + throws ResourceInvalidException { + if (grpcServiceProto.getTargetSpecifierCase() + != io.envoyproxy.envoy.config.core.v3.GrpcService.TargetSpecifierCase.GOOGLE_GRPC) { + throw ResourceInvalidException.ofResource(grpcServiceProto, + "Only GoogleGrpc targets supported, got " + grpcServiceProto.getTargetSpecifierCase()); + } + Builder builder = GrpcService.builder(); + if (grpcServiceProto.hasTimeout()) { + builder.timeout(grpcServiceProto.getTimeout()); + } + // GoogleGrpc fields flattened. + io.envoyproxy.envoy.config.core.v3.GrpcService.GoogleGrpc googleGrpcProto = + grpcServiceProto.getGoogleGrpc(); + builder.targetUri(googleGrpcProto.getTargetUri()); + + // TODO(sergiitk): [IMPL] channelCredentials + // TODO(sergiitk): [IMPL] callCredentials + // TODO(sergiitk): [IMPL] channelArgs + // TODO(sergiitk): [IMPL] statPrefix - (maybe) + + return builder.build(); + } + + public static Builder builder() { + return new AutoValue_GrpcService.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder targetUri(String targetUri); + + public abstract Builder timeout(Duration timeout); + + public abstract GrpcService build(); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/datatype/RateLimitStrategy.java b/xds/src/main/java/io/grpc/xds/internal/datatype/RateLimitStrategy.java new file mode 100644 index 00000000000..75774954ec2 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/datatype/RateLimitStrategy.java @@ -0,0 +1,89 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.datatype; + +import com.google.auto.value.AutoOneOf; +import io.envoyproxy.envoy.type.v3.RateLimitStrategy.BlanketRule; +import io.envoyproxy.envoy.type.v3.TokenBucket; + +@AutoOneOf(RateLimitStrategy.Kind.class) +public abstract class RateLimitStrategy { + // TODO(sergiitk): instead, make RateLimitStrategy interface, + // and AllowAll DenyAll, TokenBucket extending it + public enum Kind { BLANKET_RULE, TOKEN_BUCKET } + + public static final RateLimitStrategy ALLOW_ALL = + AutoOneOf_RateLimitStrategy.blanketRule(BlanketRule.ALLOW_ALL); + public static final RateLimitStrategy DENY_ALL = + AutoOneOf_RateLimitStrategy.blanketRule(BlanketRule.DENY_ALL); + + public abstract Kind getKind(); + + public final boolean rateLimit() { + switch (getKind()) { + case BLANKET_RULE: + switch (blanketRule()) { + case DENY_ALL: + return true; + case ALLOW_ALL: + default: + return false; + } + case TOKEN_BUCKET: + throw new UnsupportedOperationException("Not implemented yet"); + default: + throw new UnsupportedOperationException("Unexpected strategy kind"); + } + } + + // TODO(sergiitk): [IMPL] Replace with the internal class. + public abstract BlanketRule blanketRule(); + + // TODO(sergiitk): [IMPL] Replace with the implementation class. + public abstract TokenBucket tokenBucket(); + + public static RateLimitStrategy ofBlanketRule(BlanketRule blanketRuleProto) { + switch (blanketRuleProto) { + case ALLOW_ALL: + return RateLimitStrategy.ALLOW_ALL; + case DENY_ALL: + return RateLimitStrategy.DENY_ALL; + default: + throw new UnsupportedOperationException("Wrong BlanketRule proto"); + } + } + + public static RateLimitStrategy ofTokenBucket(TokenBucket tokenBucketProto) { + return AutoOneOf_RateLimitStrategy.tokenBucket(tokenBucketProto); + } + + public static RateLimitStrategy fromEnvoyProto( + io.envoyproxy.envoy.type.v3.RateLimitStrategy rateLimitStrategyProto) { + switch (rateLimitStrategyProto.getStrategyCase()) { + case BLANKET_RULE: + return ofBlanketRule(rateLimitStrategyProto.getBlanketRule()); + case TOKEN_BUCKET: + return ofTokenBucket(rateLimitStrategyProto.getTokenBucket()); + case REQUESTS_PER_TIME_UNIT: + // TODO(sergiitk): [IMPL] convert to token bucket; + throw new UnsupportedOperationException("Not implemented yet"); + default: + // TODO(sergiitk): [IMPL[ replace with a custom exception. + throw new UnsupportedOperationException("Unknown RL type"); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/CelMatcher.java b/xds/src/main/java/io/grpc/xds/internal/matchers/CelMatcher.java new file mode 100644 index 00000000000..803a88696a6 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/CelMatcher.java @@ -0,0 +1,81 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.matchers; + +import static com.google.common.base.Preconditions.checkNotNull; + +import dev.cel.common.CelAbstractSyntaxTree; +import dev.cel.common.CelProtoAbstractSyntaxTree; +import dev.cel.expr.CheckedExpr; +import dev.cel.runtime.CelEvaluationException; +import io.grpc.xds.client.XdsResourceType.ResourceInvalidException; +import java.util.function.Predicate; + +/** Unified Matcher API: xds.type.matcher.v3.CelMatcher. */ +public class CelMatcher implements Predicate { + + private final GrpcCelEnvironment program; + private final String description; + + private CelMatcher(CelAbstractSyntaxTree ast, String description) throws CelEvaluationException { + this.program = new GrpcCelEnvironment(checkNotNull(ast)); + this.description = description != null ? description : ""; + } + + public static CelMatcher create(CelAbstractSyntaxTree ast) throws CelEvaluationException { + return new CelMatcher(ast, null); + } + + public static CelMatcher create(CelAbstractSyntaxTree ast, String description) + throws CelEvaluationException { + return new CelMatcher(ast, description); + } + + public static CelMatcher fromEnvoyProto(com.github.xds.type.matcher.v3.CelMatcher proto) + throws ResourceInvalidException { + com.github.xds.type.v3.CelExpression exprMatch = proto.getExprMatch(); + // TODO(sergiitk): do i need this? + // checkNotNull(exprMatch); + + if (!exprMatch.hasCelExprChecked()) { + throw ResourceInvalidException.ofResource(proto, "cel_expr_checked is required"); + } + + // Canonical CEL. + CheckedExpr celExprChecked = exprMatch.getCelExprChecked(); + + // TODO(sergiitk): catch tree build errors? + CelAbstractSyntaxTree ast = CelProtoAbstractSyntaxTree.fromCheckedExpr(celExprChecked).getAst(); + + try { + return new CelMatcher(ast, proto.getDescription()); + } catch (CelEvaluationException e) { + throw ResourceInvalidException.ofResource(exprMatch, + "Error Building CEL Program cel_expr_checked: " + e.getErrorCode() + " " + + e.getMessage()); + } + } + + public String description() { + return description; + } + + @Override + public boolean test(HttpMatchInput httpMatchInput) { + return program.eval(httpMatchInput); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/GrpcCelEnvironment.java b/xds/src/main/java/io/grpc/xds/internal/matchers/GrpcCelEnvironment.java new file mode 100644 index 00000000000..736b2012fb1 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/GrpcCelEnvironment.java @@ -0,0 +1,117 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.matchers; + +import com.google.common.base.Splitter; +import dev.cel.common.CelAbstractSyntaxTree; +import dev.cel.common.CelErrorCode; +import dev.cel.common.CelOptions; +import dev.cel.common.CelRuntimeException; +import dev.cel.common.types.SimpleType; +import dev.cel.runtime.CelEvaluationException; +import dev.cel.runtime.CelRuntime; +import dev.cel.runtime.CelRuntimeFactory; +import dev.cel.runtime.CelVariableResolver; +import io.grpc.Status; +import java.util.List; +import java.util.Optional; +import javax.annotation.Nullable; + +/** Unified Matcher API: xds.type.matcher.v3.CelMatcher. */ +public class GrpcCelEnvironment { + private static final CelOptions CEL_OPTIONS = CelOptions + .current() + .resolveTypeDependencies(false) + // Parity with Envoy. + .maxRegexProgramSize(100) + .enableComprehension(false) + .enableListConcatenation(false) + .enableStringConcatenation(false) + .enableStringConversion(false) + .build(); + private static final CelRuntime CEL_RUNTIME = CelRuntimeFactory + .standardCelRuntimeBuilder() + .setOptions(CEL_OPTIONS) + .build(); + + private final CelRuntime.Program program; + + GrpcCelEnvironment(CelAbstractSyntaxTree ast) throws CelEvaluationException { + if (ast.getResultType() != SimpleType.BOOL) { + throw new CelEvaluationException("Expected bool return type"); + } + this.program = CEL_RUNTIME.createProgram(ast); + } + + public boolean eval(HttpMatchInput httpMatchInput) { + try { + GrpcCelVariableResolver requestResolver = new GrpcCelVariableResolver(httpMatchInput); + return (boolean) program.eval(requestResolver); + } catch (CelEvaluationException | ClassCastException e) { + throw Status.fromThrowable(e).asRuntimeException(); + } + } + + static class GrpcCelVariableResolver implements CelVariableResolver { + private static final Splitter SPLITTER = Splitter.on('.').limit(2); + private final HttpMatchInput httpMatchInput; + + GrpcCelVariableResolver(HttpMatchInput httpMatchInput) { + this.httpMatchInput = httpMatchInput; + } + + @Override + public Optional find(String name) { + List components = SPLITTER.splitToList(name); + if (components.size() < 2 || !components.get(0).equals("request")) { + return Optional.empty(); + } + return Optional.ofNullable(getRequestField(components.get(1))); + } + + @Nullable + private Object getRequestField(String requestField) { + switch (requestField) { + case "headers": + return httpMatchInput.getHeadersWrapper(); + case "host": + return httpMatchInput.getHost(); + case "id": + return httpMatchInput.getHeadersWrapper().get("x-request-id"); + case "method": + return httpMatchInput.getMethod(); + case "path": + case "url_path": + return httpMatchInput.getPath(); + case "query": + return ""; + case "referer": + return httpMatchInput.getHeadersWrapper().get("referer"); + case "useragent": + return httpMatchInput.getHeadersWrapper().get("user-agent"); + default: + // Throwing instead of Optional.empty() prevents evaluation non-boolean result type + // when comparing unknown fields, f.e. `request.protocol == 'HTTP'` will silently + // fail because `null == "HTTP" is not a valid CEL operation. + throw new CelRuntimeException( + // Similar to dev.cel.runtime.DescriptorMessageProvider#selectField + new IllegalArgumentException("request." + requestField), + CelErrorCode.ATTRIBUTE_NOT_FOUND); + } + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/HeadersWrapper.java b/xds/src/main/java/io/grpc/xds/internal/matchers/HeadersWrapper.java new file mode 100644 index 00000000000..1820980df33 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/HeadersWrapper.java @@ -0,0 +1,160 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.matchers; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableSet; +import com.google.errorprone.annotations.DoNotCall; +import io.grpc.xds.internal.MetadataHelper; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; +import javax.annotation.Nullable; + +public final class HeadersWrapper extends AbstractMap { + private static final ImmutableSet PSEUDO_HEADERS = + ImmutableSet.of(":method", ":authority", ":path"); + private final HttpMatchInput httpMatchInput; + + HeadersWrapper(HttpMatchInput httpMatchInput) { + this.httpMatchInput = httpMatchInput; + } + + @Override + @Nullable + public String get(Object key) { + String headerName = (String) key; + // Pseudo-headers. + switch (headerName) { + case ":method": + return httpMatchInput.getMethod(); + case ":authority": + return httpMatchInput.getHost(); + case ":path": + return httpMatchInput.getPath(); + default: + return httpMatchInput.getHeader(headerName); + } + } + + @Override + public boolean containsKey(Object key) { + String headerName = (String) key; + if (PSEUDO_HEADERS.contains(headerName)) { + return true; + } + return MetadataHelper.containsHeader(httpMatchInput.metadata(), headerName); + } + + @Override + public Set keySet() { + return ImmutableSet.builder() + .addAll(httpMatchInput.metadata().keys()) + .addAll(PSEUDO_HEADERS).build(); + } + + @Override + public String getOrDefault(Object key, String defaultValue) { + String value = get(key); + return value != null ? value : defaultValue; + } + + @Override + @DoNotCall("Always throws UnsupportedOperationException") + public Set> entrySet() { + throw new UnsupportedOperationException( + "Should not be called to prevent resolving header values."); + } + + @Override + @DoNotCall("Always throws UnsupportedOperationException") + public Collection values() { + throw new UnsupportedOperationException( + "Should not be called to prevent resolving header values."); + } + + @Override + public String toString() { + // Prevent iterating to avoid resolving all values on "key not found". + return getClass().getName() + "@" + Integer.toHexString(hashCode()); + } + + @Override public int hashCode() { + return Objects.hashCode(httpMatchInput.serverCall(), httpMatchInput.metadata()); + } + + @Override + @DoNotCall("Always throws UnsupportedOperationException") + public void replaceAll(BiFunction function) { + throw new UnsupportedOperationException(); + } + + @Override + @DoNotCall("Always throws UnsupportedOperationException") + public String putIfAbsent(String key, String value) { + throw new UnsupportedOperationException(); + } + + @Override + @DoNotCall("Always throws UnsupportedOperationException") + public boolean remove(Object key, Object value) { + throw new UnsupportedOperationException(); + } + + @Override + @DoNotCall("Always throws UnsupportedOperationException") + public boolean replace(String key, String oldValue, String newValue) { + throw new UnsupportedOperationException(); + } + + @Override + @DoNotCall("Always throws UnsupportedOperationException") + public String replace(String key, String value) { + throw new UnsupportedOperationException(); + } + + @Override + @DoNotCall("Always throws UnsupportedOperationException") + public String computeIfAbsent( + String key, Function mappingFunction) { + throw new UnsupportedOperationException(); + } + + @Override + @DoNotCall("Always throws UnsupportedOperationException") + public String computeIfPresent( + String key, BiFunction remappingFunction) { + throw new UnsupportedOperationException(); + } + + @Override + @DoNotCall("Always throws UnsupportedOperationException") + public String compute( + String key, BiFunction remappingFunction) { + throw new UnsupportedOperationException(); + } + + @Override + @DoNotCall("Always throws UnsupportedOperationException") + public String merge( + String key, String value, + BiFunction remappingFunction) { + throw new UnsupportedOperationException(); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/HttpMatchInput.java b/xds/src/main/java/io/grpc/xds/internal/matchers/HttpMatchInput.java new file mode 100644 index 00000000000..5c960cb55ba --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/HttpMatchInput.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package io.grpc.xds.internal.matchers; + +import com.google.auto.value.AutoValue; +import com.google.auto.value.extension.memoized.Memoized; +import com.google.common.base.Strings; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.xds.internal.MetadataHelper; +import java.util.Map; +import javax.annotation.Nullable; + +@AutoValue +public abstract class HttpMatchInput { + public abstract Metadata metadata(); + + // TODO(sergiitk): [IMPL] consider + public abstract ServerCall serverCall(); + + public static HttpMatchInput create(Metadata metadata, ServerCall serverCall) { + return new AutoValue_HttpMatchInput(metadata, serverCall); + } + + public String getMethod() { + return "POST"; + } + + public String getHost() { + return Strings.nullToEmpty(serverCall().getAuthority()); + } + + public String getPath() { + return "/" + serverCall().getMethodDescriptor().getFullMethodName(); + } + + @Nullable + public String getHeader(String headerName) { + return MetadataHelper.deserializeHeader(metadata(), headerName); + } + + @Memoized + public Map getHeadersWrapper() { + return new HeadersWrapper(this); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/Matcher.java b/xds/src/main/java/io/grpc/xds/internal/matchers/Matcher.java new file mode 100644 index 00000000000..6740406d3c0 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/Matcher.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.matchers; + +import javax.annotation.Nullable; + +/** Unified Matcher API: xds.type.matcher.v3.Matcher. */ +public abstract class Matcher { + // TODO(sergiitk): [IMPL] iterator? + // TODO(sergiitk): [IMPL] public boolean matches(EvaluateArgs args) ? + + // TODO(sergiitk): [IMPL] AutoOneOf MatcherList, MatcherTree + @Nullable + public abstract MatcherList matcherList(); + + public abstract OnMatch onNoMatch(); + + public abstract ResultT match(InputT input); +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/MatcherList.java b/xds/src/main/java/io/grpc/xds/internal/matchers/MatcherList.java new file mode 100644 index 00000000000..76fb2d4b632 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/MatcherList.java @@ -0,0 +1,22 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.matchers; + +/** Unified Matcher API: xds.type.matcher.v3.Matcher.MatcherList. */ +public class MatcherList { + +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/OnMatch.java b/xds/src/main/java/io/grpc/xds/internal/matchers/OnMatch.java new file mode 100644 index 00000000000..5e3609ab72a --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/OnMatch.java @@ -0,0 +1,41 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.matchers; + + +import com.google.auto.value.AutoOneOf; + +/** Unified Matcher API: xds.type.matcher.v3.Matcher.OnMatch. */ +@AutoOneOf(OnMatch.Kind.class) +public abstract class OnMatch { + public enum Kind { MATCHER, ACTION } + + public abstract Kind getKind(); + + public abstract Matcher matcher(); + + public abstract ResultT action(); + + public static OnMatch ofMatcher( + Matcher matcher) { + return AutoOneOf_OnMatch.matcher(matcher); + } + + public static OnMatch ofAction(ResultT result) { + return AutoOneOf_OnMatch.action(result); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucket.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucket.java new file mode 100644 index 00000000000..37170477dd0 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucket.java @@ -0,0 +1,136 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + +import com.google.auto.value.AutoValue; +import io.grpc.Deadline; +import io.grpc.xds.internal.datatype.RateLimitStrategy; +import io.grpc.xds.internal.rlqs.RlqsRateLimitResult.DenyResponse; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; + + +public class RlqsBucket { + private final RlqsBucketId bucketId; + private final long reportingIntervalMillis; + + private final RateLimitStrategy noAssignmentStrategy; + private final RateLimitStrategy expiredAssignmentStrategy; + private final DenyResponse denyResponse; + + // TODO(sergiitk): [impl] consider AtomicLongFieldUpdater + private final AtomicLong lastSnapshotTimeNanos = new AtomicLong(-1); + + // TODO(sergiitk): [impl] consider java.util.concurrent.atomic.LongAdder for counters + private final AtomicLong numRequestsAllowed = new AtomicLong(); + private final AtomicLong numRequestsDenied = new AtomicLong(); + + // TODO(sergiitk): [impl] consider AtomicReferenceFieldUpdater + @Nullable + private volatile RateLimitStrategy assignmentStrategy = null; + private volatile long assignmentExpiresTimeNanos; + // TODO(sergiitk): needed for expired_assignment_behavior_timeout + private volatile long lastAssignmentTimeNanos; + + RlqsBucket(RlqsBucketId bucketId, RlqsBucketSettings bucketSettings) { + // TODO(sergiitk): [design] consider lock per bucket instance + this.bucketId = bucketId; + reportingIntervalMillis = bucketSettings.reportingIntervalMillis(); + expiredAssignmentStrategy = bucketSettings.expiredAssignmentStrategy(); + noAssignmentStrategy = bucketSettings.noAssignmentStrategy(); + denyResponse = bucketSettings.denyResponse(); + } + + public RlqsBucketId getBucketId() { + return bucketId; + } + + public long getReportingIntervalMillis() { + return reportingIntervalMillis; + } + + public RlqsRateLimitResult rateLimit() { + boolean rateLimited = resolveStrategy().rateLimit(); + if (!rateLimited) { + numRequestsAllowed.incrementAndGet(); + return RlqsRateLimitResult.allow(); + } + numRequestsDenied.incrementAndGet(); + return RlqsRateLimitResult.deny(denyResponse); + } + + private RateLimitStrategy resolveStrategy() { + if (assignmentStrategy == null) { + return noAssignmentStrategy; + } + if (assignmentExpiresTimeNanos > nanoTimeNow()) { + // TODO(sergiitk): handle expired behavior properly: it has own ttl, + // after the bucket is abandoned. + // Also, there's reuse last assignment option. + return expiredAssignmentStrategy; + } + return assignmentStrategy; + } + + public RlqsBucketUsage snapshotAndResetUsage() { + // TODO(sergiitk): [IMPL] ensure synchronized + long snapAllowed = numRequestsAllowed.get(); + long snapDenied = numRequestsDenied.get(); + long snapTime = nanoTimeNow(); + + // Reset stats. + numRequestsAllowed.addAndGet(-snapAllowed); + numRequestsDenied.addAndGet(-snapDenied); + + long lastSnapTime = lastSnapshotTimeNanos.getAndSet(snapTime); + // First snapshot. + if (lastSnapTime < 0) { + lastSnapTime = snapTime; + } + return RlqsBucketUsage.create(bucketId, snapAllowed, snapDenied, snapTime - lastSnapTime); + } + + public void updateAction(RateLimitStrategy strategy, long ttlMillis) { + // TODO(sergiitk): [IMPL] ensure synchronized + lastAssignmentTimeNanos = nanoTimeNow(); + assignmentExpiresTimeNanos = lastAssignmentTimeNanos + (ttlMillis * 1_000_000); + assignmentStrategy = strategy; + } + + private static long nanoTimeNow() { + return Deadline.getSystemTicker().nanoTime(); + } + + @AutoValue + public abstract static class RlqsBucketUsage { + + public abstract RlqsBucketId bucketId(); + + public abstract long numRequestsAllowed(); + + public abstract long numRequestsDenied(); + + public abstract long timeElapsedNanos(); + + public static RlqsBucketUsage create( + RlqsBucketId bucketId, long numRequestsAllowed, long numRequestsDenied, + long timeElapsedNanos) { + return new AutoValue_RlqsBucket_RlqsBucketUsage(bucketId, numRequestsAllowed, + numRequestsDenied, timeElapsedNanos); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketCache.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketCache.java new file mode 100644 index 00000000000..6faf41ca984 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketCache.java @@ -0,0 +1,76 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import io.grpc.xds.internal.datatype.RateLimitStrategy; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; + +final class RlqsBucketCache { + // TODO(sergiitk): consider volatile + synchronize instead + private final ConcurrentMap> bucketsPerInterval = new ConcurrentHashMap<>(); + private final ConcurrentMap buckets = new ConcurrentHashMap<>(); + + public RlqsBucket getOrCreate( + RlqsBucketId bucketId, RlqsBucketSettings bucketSettings, Consumer onCreate) { + // read synchronize trick + RlqsBucket bucket = buckets.get(bucketId); + if (bucket != null) { + return bucket; + } + synchronized (this) { + bucket = new RlqsBucket(bucketId, bucketSettings); + long interval = bucket.getReportingIntervalMillis(); + bucketsPerInterval.computeIfAbsent(interval, k -> Sets.newConcurrentHashSet()).add(bucket); + buckets.put(bucket.getBucketId(), bucket); + // TODO(sergiitk): [IMPL] call async + onCreate.accept(bucket); + return bucket; + } + } + + public void deleteBucket(RlqsBucketId bucketId) { + RlqsBucket bucket = buckets.get(bucketId); + if (bucket == null) { + return; + } + synchronized (this) { + buckets.remove(bucket.getBucketId()); + bucketsPerInterval.computeIfPresent(bucket.getReportingIntervalMillis(), (k, buckets) -> { + buckets.remove(bucket); + return buckets.isEmpty() ? null : buckets; + }); + } + } + + public void updateBucket( + RlqsBucketId bucketId, RateLimitStrategy rateLimitStrategy, long ttlMillis) { + RlqsBucket bucket = buckets.get(bucketId); + bucket.updateAction(rateLimitStrategy, ttlMillis); + } + + public ImmutableList getBucketsToReport(long reportingIntervalMillis) { + return ImmutableList.copyOf( + bucketsPerInterval.getOrDefault(reportingIntervalMillis, Collections.emptySet())); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketId.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketId.java new file mode 100644 index 00000000000..e78b36ebbd3 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketId.java @@ -0,0 +1,53 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + +import com.google.auto.value.AutoValue; +import com.google.auto.value.extension.memoized.Memoized; +import com.google.common.collect.ImmutableMap; +import io.envoyproxy.envoy.service.rate_limit_quota.v3.BucketId; +import java.util.Map; + +@AutoValue +public abstract class RlqsBucketId { + // No class loading deadlock, see + // https://github.com/google/error-prone/issues/2062#issuecomment-1566253739 + public static final RlqsBucketId EMPTY = create(ImmutableMap.of()); + + public abstract ImmutableMap bucketId(); + + public static RlqsBucketId create(Map bucketIdMap) { + if (bucketIdMap.isEmpty()) { + return EMPTY; + } + return new AutoValue_RlqsBucketId(ImmutableMap.copyOf(bucketIdMap)); + } + + public final boolean isEmpty() { + return bucketId().isEmpty(); + } + + public static RlqsBucketId fromEnvoyProto(BucketId envoyProto) { + return RlqsBucketId.create(ImmutableMap.copyOf(envoyProto.getBucketMap().entrySet())); + } + + @Memoized + public BucketId toEnvoyProto() { + // TODO(sergiitk): [impl] can be cached. + return BucketId.newBuilder().putAllBucket(bucketId()).build(); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketSettings.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketSettings.java new file mode 100644 index 00000000000..da202de011c --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketSettings.java @@ -0,0 +1,89 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Duration; +import com.google.protobuf.util.Durations; +import io.grpc.xds.internal.datatype.RateLimitStrategy; +import io.grpc.xds.internal.matchers.HttpMatchInput; +import io.grpc.xds.internal.rlqs.RlqsRateLimitResult.DenyResponse; +import java.util.function.Function; +import javax.annotation.Nullable; + +@AutoValue +public abstract class RlqsBucketSettings { + // TODO(sergiitk): [IMPL] this misses most of the parsing and implementation. + + @Nullable + public abstract ImmutableMap> bucketIdBuilder(); + + abstract RlqsBucketId staticBucketId(); + + public abstract long reportingIntervalMillis(); + + public final RlqsBucketId toBucketId(HttpMatchInput input) { + if (bucketIdBuilder() == null) { + return staticBucketId(); + } + return processBucketBuilder(bucketIdBuilder(), input); + } + + public RateLimitStrategy noAssignmentStrategy() { + return null; + } + + public DenyResponse denyResponse() { + return DenyResponse.DEFAULT; + } + + public RateLimitStrategy expiredAssignmentStrategy() { + return null; + } + + public static RlqsBucketSettings create( + ImmutableMap> bucketIdBuilder, + Duration reportingInterval) { + // TODO(sergiitk): instead of create, use Builder pattern. + RlqsBucketId staticBucketId = processBucketBuilder(bucketIdBuilder, null); + return new AutoValue_RlqsBucketSettings( + staticBucketId.isEmpty() ? bucketIdBuilder : null, + staticBucketId, + Durations.toMillis(reportingInterval)); + } + + private static RlqsBucketId processBucketBuilder( + ImmutableMap> bucketIdBuilder, + HttpMatchInput input) { + ImmutableMap.Builder bucketIdMapBuilder = ImmutableMap.builder(); + if (input == null) { + // TODO(sergiitk): [IMPL] calculate static map + return RlqsBucketId.EMPTY; + } + for (String key : bucketIdBuilder.keySet()) { + Function fn = bucketIdBuilder.get(key); + String value = null; + if (fn != null) { + value = fn.apply(input); + } + bucketIdMapBuilder.put(key, value != null ? value : ""); + } + ImmutableMap bucketIdMap = bucketIdMapBuilder.build(); + return bucketIdMap.isEmpty() ? RlqsBucketId.EMPTY : RlqsBucketId.create(bucketIdMap); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsCache.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsCache.java new file mode 100644 index 00000000000..f2da5da6b61 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsCache.java @@ -0,0 +1,127 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Throwables; +import io.grpc.ChannelCredentials; +import io.grpc.InsecureChannelCredentials; +import io.grpc.InternalLogId; +import io.grpc.SynchronizationContext; +import io.grpc.xds.RlqsFilterConfig; +import io.grpc.xds.client.Bootstrapper.RemoteServerInfo; +import io.grpc.xds.client.XdsLogger; +import io.grpc.xds.client.XdsLogger.XdsLogLevel; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; + +public final class RlqsCache { + // TODO(sergiitk): [QUESTION] always in sync context? + private volatile boolean shutdown = false; + + private final XdsLogger logger; + private final SynchronizationContext syncContext; + + private final ConcurrentMap filterStateCache = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler; + + + private RlqsCache(ScheduledExecutorService scheduler) { + this.scheduler = checkNotNull(scheduler, "scheduler"); + // TODO(sergiitk): should be filter name? + logger = XdsLogger.withLogId(InternalLogId.allocate(this.getClass(), null)); + + syncContext = new SynchronizationContext((thread, error) -> { + String message = "Uncaught exception in RlqsCache SynchronizationContext. Panic!"; + logger.log(XdsLogLevel.DEBUG, + message + " {0} \nTrace:\n {1}", error, Throwables.getStackTraceAsString(error)); + throw new RlqsCacheSynchronizationException(message, error); + }); + } + + /** Creates an instance. */ + public static RlqsCache newInstance(ScheduledExecutorService scheduler) { + // TODO(sergiitk): [IMPL] scheduler - consider using GrpcUtil.TIMER_SERVICE. + // TODO(sergiitk): [IMPL] note that the scheduler has a finite lifetime. + return new RlqsCache(scheduler); + } + + public void shutdown() { + if (shutdown) { + return; + } + syncContext.execute(() -> { + shutdown = true; + logger.log(XdsLogLevel.DEBUG, "Shutting down RlqsCache"); + for (long configHash : filterStateCache.keySet()) { + filterStateCache.get(configHash).shutdown(); + } + filterStateCache.clear(); + shutdown = false; + }); + } + + public void shutdownFilterState(RlqsFilterConfig oldConfig) { + // TODO(sergiitk): shutdown one + // make it async. + } + + public RlqsFilterState getOrCreateFilterState(final RlqsFilterConfig config) { + // TODO(sergiitk): handle being shut down. + long configHash = hashFilterConfig(config); + return filterStateCache.computeIfAbsent(configHash, k -> newFilterState(k, config)); + } + + private RlqsFilterState newFilterState(long configHash, RlqsFilterConfig config) { + // TODO(sergiitk): [IMPL] get channel creds from the bootstrap. + ChannelCredentials creds = InsecureChannelCredentials.create(); + return new RlqsFilterState( + RemoteServerInfo.create(config.rlqsService().targetUri(), creds), + config.domain(), + config.bucketMatchers(), + configHash, + scheduler); + } + + private long hashFilterConfig(RlqsFilterConfig config) { + // TODO(sergiitk): [QUESTION] better name? - ask Eric. + // TODO(sergiitk): [DESIGN] the key should be hashed (domain + buckets) merged config? + // TODO(sergiitk): [IMPL] Hash buckets + int k1 = Objects.hash(config.rlqsService().targetUri(), config.domain()); + int k2; + if (config.bucketMatchers() == null) { + k2 = 0x42c0ffee; + } else { + k2 = config.bucketMatchers().hashCode(); + } + return Long.rotateLeft(Integer.toUnsignedLong(k1), 32) + Integer.toUnsignedLong(k2); + } + + /** + * Throws when fail to bootstrap or initialize the XdsClient. + */ + public static final class RlqsCacheSynchronizationException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public RlqsCacheSynchronizationException(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsClient.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsClient.java new file mode 100644 index 00000000000..8fb50e81930 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsClient.java @@ -0,0 +1,162 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.service.rate_limit_quota.v3.RateLimitQuotaResponse; +import io.envoyproxy.envoy.service.rate_limit_quota.v3.RateLimitQuotaResponse.BucketAction; +import io.envoyproxy.envoy.service.rate_limit_quota.v3.RateLimitQuotaServiceGrpc; +import io.envoyproxy.envoy.service.rate_limit_quota.v3.RateLimitQuotaServiceGrpc.RateLimitQuotaServiceStub; +import io.envoyproxy.envoy.service.rate_limit_quota.v3.RateLimitQuotaUsageReports; +import io.envoyproxy.envoy.service.rate_limit_quota.v3.RateLimitQuotaUsageReports.BucketQuotaUsage; +import io.grpc.Grpc; +import io.grpc.InternalLogId; +import io.grpc.ManagedChannel; +import io.grpc.internal.GrpcUtil; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.StreamObserver; +import io.grpc.xds.client.Bootstrapper.RemoteServerInfo; +import io.grpc.xds.client.XdsLogger; +import io.grpc.xds.client.XdsLogger.XdsLogLevel; +import io.grpc.xds.internal.rlqs.RlqsBucket.RlqsBucketUsage; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import javax.annotation.Nullable; + +public final class RlqsClient { + // TODO(sergiitk): [IMPL] remove + // Do do not fail on parsing errors, only log requests. + static final boolean dryRun = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_RLQS_DRY_RUN", false); + + private final XdsLogger logger; + + private final RemoteServerInfo serverInfo; + private final Consumer> bucketsUpdateCallback; + private final RlqsStream rlqsStream; + + RlqsClient( + RemoteServerInfo serverInfo, String domain, + Consumer> bucketsUpdateCallback, String prettyHash) { + // TODO(sergiitk): [post] check not null. + this.serverInfo = serverInfo; + this.bucketsUpdateCallback = bucketsUpdateCallback; + + logger = XdsLogger.withLogId( + InternalLogId.allocate(this.getClass(), "<" + prettyHash + "> " + serverInfo.target())); + + this.rlqsStream = new RlqsStream(serverInfo, domain); + } + + public void sendUsageReports(List bucketUsages) { + if (bucketUsages.isEmpty()) { + return; + } + // TODO(sergiitk): [impl] offload to serialized executor. + rlqsStream.reportUsage(bucketUsages); + } + + public void shutdown() { + logger.log(XdsLogLevel.DEBUG, "Shutting down RlqsClient to {0}", serverInfo.target()); + // TODO(sergiitk): [IMPL] RlqsClient shutdown + } + + public void handleStreamClosed() { + // TODO(sergiitk): [IMPL] reconnect on stream down. + } + + private class RlqsStream { + private final AtomicBoolean isFirstReport = new AtomicBoolean(true); + private final String domain; + @Nullable + private final ClientCallStreamObserver clientCallStream; + + RlqsStream(RemoteServerInfo serverInfo, String domain) { + this.domain = domain; + + if (dryRun) { + clientCallStream = null; + logger.log(XdsLogLevel.DEBUG, "Dry run, not connecting to " + serverInfo.target()); + return; + } + + // TODO(sergiitk): [IMPL] Manage State changes? + ManagedChannel channel = + Grpc.newChannelBuilder(serverInfo.target(), serverInfo.channelCredentials()).build(); + // keepalive? + // .keepAliveTime(10, TimeUnit.SECONDS) + // .keepAliveWithoutCalls(true) + + RateLimitQuotaServiceStub stub = RateLimitQuotaServiceGrpc.newStub(channel); + clientCallStream = (ClientCallStreamObserver) + stub.streamRateLimitQuotas(new RlqsStreamObserver()); + // TODO(sergiitk): [IMPL] set on ready handler? + } + + private BucketQuotaUsage toUsageReport(RlqsBucket.RlqsBucketUsage usage) { + return BucketQuotaUsage.newBuilder() + .setBucketId(usage.bucketId().toEnvoyProto()) + .setNumRequestsAllowed(usage.numRequestsAllowed()) + .setNumRequestsDenied(usage.numRequestsDenied()) + .setTimeElapsed(Durations.fromNanos(usage.timeElapsedNanos())) + .build(); + } + + void reportUsage(List usageReports) { + RateLimitQuotaUsageReports.Builder report = RateLimitQuotaUsageReports.newBuilder(); + if (isFirstReport.compareAndSet(true, false)) { + report.setDomain(domain); + } + for (RlqsBucket.RlqsBucketUsage bucketUsage : usageReports) { + report.addBucketQuotaUsages(toUsageReport(bucketUsage)); + } + if (clientCallStream == null) { + logger.log(XdsLogLevel.DEBUG, "Dry run, skipping bucket usage report: " + report.build()); + return; + } + clientCallStream.onNext(report.build()); + } + + /** + * RLQS Stream observer. + * + *

See {@link io.grpc.alts.internal.AltsHandshakerStub.Reader} for examples. + * See {@link io.grpc.stub.ClientResponseObserver} for flow control examples. + */ + private class RlqsStreamObserver implements StreamObserver { + @Override + public void onNext(RateLimitQuotaResponse response) { + ImmutableList.Builder updateActions = ImmutableList.builder(); + for (BucketAction bucketAction : response.getBucketActionList()) { + updateActions.add(RlqsUpdateBucketAction.fromEnvoyProto(bucketAction)); + } + bucketsUpdateCallback.accept(updateActions.build()); + } + + @Override + public void onError(Throwable t) { + logger.log(XdsLogLevel.DEBUG, "Got error in RlqsStreamObserver: " + t.toString()); + } + + @Override + public void onCompleted() { + logger.log(XdsLogLevel.DEBUG, "RlqsStreamObserver completed"); + } + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsFilterState.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsFilterState.java new file mode 100644 index 00000000000..923153d2ec0 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsFilterState.java @@ -0,0 +1,137 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + +import com.google.common.collect.ImmutableList; +import io.grpc.InternalLogId; +import io.grpc.xds.client.Bootstrapper.RemoteServerInfo; +import io.grpc.xds.client.XdsLogger; +import io.grpc.xds.client.XdsLogger.XdsLogLevel; +import io.grpc.xds.internal.datatype.RateLimitStrategy; +import io.grpc.xds.internal.matchers.HttpMatchInput; +import io.grpc.xds.internal.matchers.Matcher; +import io.grpc.xds.internal.rlqs.RlqsBucket.RlqsBucketUsage; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class RlqsFilterState { + private final XdsLogger logger; + + private final RlqsClient rlqsClient; + private final Matcher bucketMatchers; + private final RlqsBucketCache bucketCache; + private final long configHash; + private final ScheduledExecutorService scheduler; + private final ConcurrentMap> timers = new ConcurrentHashMap<>(); + + public RlqsFilterState( + RemoteServerInfo rlqsServer, String domain, + Matcher bucketMatchers, long configHash, + ScheduledExecutorService scheduler) { + this.bucketMatchers = bucketMatchers; + this.configHash = configHash; + this.scheduler = scheduler; + + String prettyHash = "0x" + Long.toHexString(configHash); + logger = XdsLogger.withLogId(InternalLogId.allocate(this.getClass(), prettyHash)); + logger.log(XdsLogLevel.DEBUG, + "Initialized RlqsFilterState for hash={0}, domain={1}", prettyHash, domain); + + bucketCache = new RlqsBucketCache(); + rlqsClient = new RlqsClient(rlqsServer, domain, this::onBucketsUpdate, prettyHash); + } + + public RlqsRateLimitResult rateLimit(HttpMatchInput input) { + RlqsBucketSettings bucketSettings = bucketMatchers.match(input); + RlqsBucketId bucketId = bucketSettings.toBucketId(input); + // Special case when bucket id builder not set, or has no values. + if (bucketId.isEmpty()) { + return rateLimitWithoutReports(bucketSettings); + } + RlqsBucket bucket = bucketCache.getOrCreate(bucketId, bucketSettings, newBucket -> { + // Called if a new bucket was created. + scheduleImmediateReport(newBucket); + registerReportTimer(newBucket.getReportingIntervalMillis()); + }); + return bucket.rateLimit(); + } + + private static RlqsRateLimitResult rateLimitWithoutReports(RlqsBucketSettings bucketSettings) { + if (bucketSettings.noAssignmentStrategy().rateLimit()) { + return RlqsRateLimitResult.deny(bucketSettings.denyResponse()); + } + return RlqsRateLimitResult.allow(); + } + + private void onBucketsUpdate(List bucketActions) { + // TODO(sergiitk): [impl] ensure no more than 1 update at a time. + for (RlqsUpdateBucketAction bucketAction : bucketActions) { + RlqsBucketId bucketId = bucketAction.bucketId(); + RateLimitStrategy rateLimitStrategy = bucketAction.rateLimitStrategy(); + if (rateLimitStrategy == null) { + bucketCache.deleteBucket(bucketId); + continue; + } + bucketCache.updateBucket(bucketId, rateLimitStrategy, bucketAction.ttlMillis()); + } + } + + private void scheduleImmediateReport(RlqsBucket newBucket) { + try { + ScheduledFuture unused = scheduler.schedule( + () -> rlqsClient.sendUsageReports(ImmutableList.of(newBucket.snapshotAndResetUsage())), + 1, TimeUnit.MICROSECONDS); + } catch (RejectedExecutionException e) { + // Shouldn't happen. + logger.log(XdsLogLevel.WARNING, + "Couldn't schedule immediate report for bucket " + newBucket.getBucketId()); + } + } + + private void registerReportTimer(final long intervalMillis) { + // TODO(sergiitk): [IMPL] cap the interval. + timers.computeIfAbsent(intervalMillis, k -> newTimer(intervalMillis)); + } + + private ScheduledFuture newTimer(final long intervalMillis) { + return scheduler.scheduleWithFixedDelay( + () -> reportBucketsWithInterval(intervalMillis), + intervalMillis, + intervalMillis, + TimeUnit.MILLISECONDS); + } + + private void reportBucketsWithInterval(long intervalMillis) { + ImmutableList.Builder reports = ImmutableList.builder(); + for (RlqsBucket bucket : bucketCache.getBucketsToReport(intervalMillis)) { + reports.add(bucket.snapshotAndResetUsage()); + } + rlqsClient.sendUsageReports(reports.build()); + } + + public void shutdown() { + // TODO(sergiitk): [IMPL] Timers shutdown + // TODO(sergiitk): [IMPL] RlqsFilterState shutdown + logger.log(XdsLogLevel.DEBUG, "Shutting down RlqsFilterState with hash {0}", configHash); + rlqsClient.shutdown(); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsRateLimitResult.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsRateLimitResult.java new file mode 100644 index 00000000000..803be9c9f8d --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsRateLimitResult.java @@ -0,0 +1,68 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + +import com.google.auto.value.AutoValue; +import io.grpc.Metadata; +import io.grpc.Status; +import java.util.Optional; +import javax.annotation.Nullable; + +@AutoValue +public abstract class RlqsRateLimitResult { + // TODO(sergiitk): make RateLimitResult an interface, + // RlqsRateLimitResult extends it - which contains DenyResponse. + + public abstract Optional denyResponse(); + + public final boolean isAllowed() { + return !isDenied(); + } + + public final boolean isDenied() { + return denyResponse().isPresent(); + } + + public static RlqsRateLimitResult deny(@Nullable DenyResponse denyResponse) { + if (denyResponse == null) { + denyResponse = DenyResponse.DEFAULT; + } + return new AutoValue_RlqsRateLimitResult(Optional.of(denyResponse)); + } + + public static RlqsRateLimitResult allow() { + return new AutoValue_RlqsRateLimitResult(Optional.empty()); + } + + @AutoValue + public abstract static class DenyResponse { + public static final DenyResponse DEFAULT = + DenyResponse.create(Status.UNAVAILABLE.withDescription("")); + + public abstract Status status(); + + public abstract Metadata headersToAdd(); + + public static DenyResponse create(Status status, Metadata headersToAdd) { + return new AutoValue_RlqsRateLimitResult_DenyResponse(status, headersToAdd); + } + + public static DenyResponse create(Status status) { + return create(status, new Metadata()); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsUpdateBucketAction.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsUpdateBucketAction.java new file mode 100644 index 00000000000..e39a3b90f80 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsUpdateBucketAction.java @@ -0,0 +1,65 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Preconditions; +import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.service.rate_limit_quota.v3.RateLimitQuotaResponse; +import io.envoyproxy.envoy.service.rate_limit_quota.v3.RateLimitQuotaResponse.BucketAction.QuotaAssignmentAction; +import io.grpc.xds.internal.datatype.RateLimitStrategy; +import javax.annotation.Nullable; + +@AutoValue +public abstract class RlqsUpdateBucketAction { + + public abstract RlqsBucketId bucketId(); + + @Nullable public abstract RateLimitStrategy rateLimitStrategy(); + + public abstract long ttlMillis(); + + public static RlqsUpdateBucketAction ofQuotaAssignmentAction( + RlqsBucketId bucketId, RateLimitStrategy rateLimitStrategy, long ttlMillis) { + Preconditions.checkNotNull(rateLimitStrategy, "rateLimitStrategy"); + return new AutoValue_RlqsUpdateBucketAction(bucketId, rateLimitStrategy, ttlMillis); + } + + public static RlqsUpdateBucketAction ofQuotaAbandonAction(RlqsBucketId bucketId) { + return new AutoValue_RlqsUpdateBucketAction(bucketId, null, 0); + } + + public static RlqsUpdateBucketAction fromEnvoyProto( + RateLimitQuotaResponse.BucketAction bucketAction) { + RlqsBucketId bucketId = RlqsBucketId.fromEnvoyProto(bucketAction.getBucketId()); + switch (bucketAction.getBucketActionCase()) { + case ABANDON_ACTION: + return RlqsUpdateBucketAction.ofQuotaAbandonAction(bucketId); + case QUOTA_ASSIGNMENT_ACTION: + QuotaAssignmentAction quotaAssignment = bucketAction.getQuotaAssignmentAction(); + RateLimitStrategy strategy = RateLimitStrategy.ALLOW_ALL; + if (quotaAssignment.hasRateLimitStrategy()) { + strategy = RateLimitStrategy.fromEnvoyProto(quotaAssignment.getRateLimitStrategy()); + } + return RlqsUpdateBucketAction.ofQuotaAssignmentAction(bucketId, strategy, + Durations.toMillis(quotaAssignment.getAssignmentTimeToLive())); + default: + // TODO(sergiitk): [impl] error + throw new UnsupportedOperationException("Wrong BlanketRule proto"); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/internal/matchers/CelMatcherTest.java b/xds/src/test/java/io/grpc/xds/internal/matchers/CelMatcherTest.java new file mode 100644 index 00000000000..52094da7e9c --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/internal/matchers/CelMatcherTest.java @@ -0,0 +1,200 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.matchers; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.common.base.Strings; +import dev.cel.common.CelAbstractSyntaxTree; +import dev.cel.common.CelErrorCode; +import dev.cel.common.CelValidationException; +import dev.cel.common.types.MapType; +import dev.cel.common.types.SimpleType; +import dev.cel.compiler.CelCompiler; +import dev.cel.compiler.CelCompilerFactory; +import dev.cel.parser.CelStandardMacro; +import dev.cel.runtime.CelEvaluationException; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.NoopServerCall; +import io.grpc.ServerCall; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; +import io.grpc.StringMarshaller; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class CelMatcherTest { + // Construct the compilation and runtime environments. + // These instances are immutable and thus trivially thread-safe and amenable to caching. + private static final CelCompiler CEL_COMPILER = + CelCompilerFactory.standardCelCompilerBuilder() + .addVar("request.path", SimpleType.STRING) + .addVar("request.host", SimpleType.STRING) + .addVar("request.method", SimpleType.STRING) + .addVar("request.headers", MapType.create(SimpleType.STRING, SimpleType.STRING)) + // request.protocol is a legal input, but we don't set it in java. + // TODO(sergiitk): add other fields not supported by gRPC + .addVar("request.protocol", SimpleType.STRING) + .setResultType(SimpleType.BOOL) + .setStandardMacros(CelStandardMacro.STANDARD_MACROS) + .build(); + + + private static final HttpMatchInput fakeInput = new HttpMatchInput() { + @Override + public Metadata metadata() { + return new Metadata(); + } + + @Override public ServerCall serverCall() { + final MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(new StringMarshaller()) + .setResponseMarshaller(new StringMarshaller()) + .build(); + return new NoopServerCall() { + @Override + public MethodDescriptor getMethodDescriptor() { + return method; + } + }; + } + }; + + @Test + public void construct() throws Exception { + CelAbstractSyntaxTree ast = celAst("1 == 1"); + CelMatcher matcher = CelMatcher.create(ast); + assertThat(matcher.description()).isEqualTo(""); + + String description = "Optional description"; + matcher = CelMatcher.create(ast, description); + assertThat(matcher.description()).isEqualTo(description); + } + + @Test + public void progTrue() throws Exception { + assertThat(newMatcher("request.method == 'POST'").test(fakeInput)).isTrue(); + } + + @Test + public void unknownRequestProperty() throws Exception { + CelMatcher matcher = newMatcher("request.protocol == 'Whatever'"); + Status status = assertThrows(StatusRuntimeException.class, + () -> matcher.test(fakeInput)).getStatus(); + + assertCelCauseErrorCode(status, CelErrorCode.ATTRIBUTE_NOT_FOUND); + } + + @Test + public void unknownHeader() throws Exception { + CelMatcher matcher = newMatcher("request.headers['foo'] == 'bar'"); + Status status = assertThrows(StatusRuntimeException.class, + () -> matcher.test(fakeInput)).getStatus(); + + assertCelCauseErrorCode(status, CelErrorCode.ATTRIBUTE_NOT_FOUND); + } + + @Test + public void macros_comprehensionsDisabled() throws Exception { + CelMatcher matcherWithComprehensions = newMatcher( + "size(['foo', 'bar'].map(x, [request.headers[x], request.headers[x]])) == 1"); + Status status = assertThrows(StatusRuntimeException.class, + () -> matcherWithComprehensions.test(fakeInput)).getStatus(); + + assertCelCauseErrorCode(status, CelErrorCode.ITERATION_BUDGET_EXCEEDED); + } + + @Test + public void macros_hasEnabled() throws Exception { + boolean result = newMatcher("has(request.headers.foo)").test(fakeInput); + assertThat(result).isFalse(); + } + + @Test + public void env_listConcatenationDisabled() throws Exception { + CelMatcher matcher = newMatcher("size([1, 2] + [3, 4]) == 4"); + Status status = assertThrows(StatusRuntimeException.class, + () -> matcher.test(fakeInput)).getStatus(); + + assertCelCauseErrorCode(status, CelErrorCode.OVERLOAD_NOT_FOUND); + } + + @Test + public void env_stringConcatenationDisabled() throws Exception { + CelMatcher matcher = newMatcher("'ab' + 'cd' == 'abcd'"); + Status status = assertThrows(StatusRuntimeException.class, + () -> matcher.test(fakeInput)).getStatus(); + + assertCelCauseErrorCode(status, CelErrorCode.OVERLOAD_NOT_FOUND); + } + + @Test + public void env_stringConversionDisabled() throws Exception { + // TODO(sergiitk): [TEST] verify conversions to all types? + CelMatcher matcher = newMatcher("string(3.14) == '3.14'"); + Status status = assertThrows(StatusRuntimeException.class, + () -> matcher.test(fakeInput)).getStatus(); + + assertCelCauseErrorCode(status, CelErrorCode.OVERLOAD_NOT_FOUND); + } + + @Test + public void env_regexProgramSize() throws Exception { + String ten = "0123456780"; + + // Positive case, program size <= 100. + assertThat(newMatcher("matches('" + ten + "', '" + ten + "')").test(fakeInput)).isTrue(); + + // Negative case, program size > 100. + @SuppressWarnings("InlineMeInliner") // String.repeat() requires Java 11. + String patternOverLimit = Strings.repeat(ten, 11); // Program Size = 112 in re2j 1.8 / cel 1.9.1 + + CelMatcher matcher = newMatcher("matches('foo', '" + patternOverLimit + "')"); + Status status = assertThrows(StatusRuntimeException.class, + () -> matcher.test(fakeInput)).getStatus(); + + assertCelCauseErrorCode(status, CelErrorCode.INVALID_ARGUMENT); + CelEvaluationException cause = (CelEvaluationException) status.getCause(); + assertThat(cause.getMessage()).contains( + "Regex pattern exceeds allowed program size. Allowed: 100, Provided:"); + } + + private void assertCelCauseErrorCode(Status status, CelErrorCode expectedCelCode) { + assertThat(status.getCode()).isEqualTo(Code.UNKNOWN); + assertThat(status.getCause()).isInstanceOf(CelEvaluationException.class); + + CelEvaluationException cause = (CelEvaluationException) status.getCause(); + assertThat(cause.getErrorCode()).isEqualTo(expectedCelCode); + } + + private CelMatcher newMatcher(String expr) throws CelValidationException, CelEvaluationException { + return CelMatcher.create(celAst(expr)); + } + + private CelAbstractSyntaxTree celAst(String expr) throws CelValidationException { + return CEL_COMPILER.compile(expr).getAst(); + } +}