Skip to content

Commit b24bef9

Browse files
committed
RLQS PoC v1 (minus shared code changes)
1 parent 25199e9 commit b24bef9

27 files changed

+2232
-2
lines changed

gradle/libs.versions.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@ commons-math3 = "org.apache.commons:commons-math3:3.6.1"
2929
conscrypt = "org.conscrypt:conscrypt-openjdk-uber:2.5.2"
3030
cronet-api = "org.chromium.net:cronet-api:119.6045.31"
3131
cronet-embedded = "org.chromium.net:cronet-embedded:119.6045.31"
32+
dev-cel-compiler = "dev.cel:compiler:0.9.1-proto3"
33+
dev-cel-protobuf = "dev.cel:protobuf:0.9.1-proto3"
34+
dev-cel-runtime = "dev.cel:runtime:0.9.1-proto3"
3235
# error-prone 2.31.0+ blocked on https://github.com/grpc/grpc-java/issues/10152
3336
# It breaks Bazel (ArrayIndexOutOfBoundsException in turbine) and Dexing ("D8:
3437
# java.lang.NullPointerException"). We can trivially upgrade the Bazel CI to
3538
# 6.3.0+ (https://github.com/bazelbuild/bazel/issues/18743).
36-
errorprone-annotations = "com.google.errorprone:error_prone_annotations:2.30.0"
39+
errorprone-annotations = "com.google.errorprone:error_prone_annotations:2.36.0"
3740
# error-prone 2.32.0+ require Java 17+
3841
errorprone-core = "com.google.errorprone:error_prone_core:2.31.0"
3942
google-api-protos = "com.google.api.grpc:proto-google-common-protos:2.51.0"

xds/build.gradle

+4-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ dependencies {
5353
project(':grpc-services'),
5454
project(':grpc-auth'),
5555
project(path: ':grpc-alts', configuration: 'shadow'),
56+
libraries.dev.cel.runtime,
57+
libraries.dev.cel.protobuf,
5658
libraries.guava,
5759
libraries.gson,
5860
libraries.re2j,
@@ -72,7 +74,8 @@ dependencies {
7274
compileOnly libraries.netty.transport.epoll
7375

7476
testImplementation project(':grpc-testing'),
75-
project(':grpc-testing-proto')
77+
project(':grpc-testing-proto'),
78+
libraries.dev.cel.compiler
7679
testImplementation (libraries.netty.transport.epoll) {
7780
artifact {
7881
classifier = "linux-x86_64"

xds/src/main/java/io/grpc/xds/MessagePrinter.java

+9
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.grpc.xds;
1818

19+
import com.github.xds.type.matcher.v3.CelMatcher;
20+
import com.github.xds.type.matcher.v3.HttpAttributesCelMatchInput;
1921
import com.google.protobuf.Descriptors.Descriptor;
2022
import com.google.protobuf.InvalidProtocolBufferException;
2123
import com.google.protobuf.Message;
@@ -28,6 +30,8 @@
2830
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
2931
import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig;
3032
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
33+
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig;
34+
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride;
3135
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBAC;
3236
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBACPerRoute;
3337
import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router;
@@ -58,6 +62,11 @@ private static JsonFormat.Printer newPrinter() {
5862
.add(RBAC.getDescriptor())
5963
.add(RBACPerRoute.getDescriptor())
6064
.add(Router.getDescriptor())
65+
// RLQS
66+
.add(RateLimitQuotaFilterConfig.getDescriptor())
67+
.add(RateLimitQuotaOverride.getDescriptor())
68+
.add(HttpAttributesCelMatchInput.getDescriptor())
69+
.add(CelMatcher.getDescriptor())
6170
// UpstreamTlsContext and DownstreamTlsContext in v3 are not transitively imported
6271
// by top-level resource types.
6372
.add(UpstreamTlsContext.getDescriptor())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/*
2+
* Copyright 2024 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.xds;
18+
19+
import static com.google.common.base.Preconditions.checkNotNull;
20+
import static io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
21+
22+
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.collect.ImmutableMap;
24+
import com.google.protobuf.Any;
25+
import com.google.protobuf.InvalidProtocolBufferException;
26+
import com.google.protobuf.Message;
27+
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaBucketSettings;
28+
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig;
29+
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride;
30+
import io.grpc.InternalLogId;
31+
import io.grpc.Metadata;
32+
import io.grpc.ServerCall;
33+
import io.grpc.ServerCall.Listener;
34+
import io.grpc.ServerCallHandler;
35+
import io.grpc.ServerInterceptor;
36+
import io.grpc.internal.GrpcUtil;
37+
import io.grpc.xds.Filter.ServerInterceptorBuilder;
38+
import io.grpc.xds.client.XdsLogger;
39+
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
40+
import io.grpc.xds.internal.datatype.GrpcService;
41+
import io.grpc.xds.internal.matchers.HttpMatchInput;
42+
import io.grpc.xds.internal.matchers.Matcher;
43+
import io.grpc.xds.internal.matchers.MatcherList;
44+
import io.grpc.xds.internal.matchers.OnMatch;
45+
import io.grpc.xds.internal.rlqs.RlqsBucketSettings;
46+
import io.grpc.xds.internal.rlqs.RlqsCache;
47+
import io.grpc.xds.internal.rlqs.RlqsFilterState;
48+
import io.grpc.xds.internal.rlqs.RlqsRateLimitResult;
49+
import java.util.concurrent.ScheduledExecutorService;
50+
import java.util.concurrent.atomic.AtomicReference;
51+
import javax.annotation.Nullable;
52+
53+
/** RBAC Http filter implementation. */
54+
// TODO(sergiitk): introduce a layer between the filter and interceptor.
55+
// lds has filter names and the names are unique - even for server instances.
56+
final class RlqsFilter implements Filter, ServerInterceptorBuilder {
57+
private final XdsLogger logger;
58+
59+
static final boolean enabled = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_ENABLE_RLQS", false);
60+
61+
// TODO(sergiitk): [IMPL] remove
62+
// Do do not fail on parsing errors, only log requests.
63+
static final boolean dryRun = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_RLQS_DRY_RUN", false);
64+
65+
static final RlqsFilter INSTANCE = new RlqsFilter();
66+
67+
static final String TYPE_URL = "type.googleapis.com/"
68+
+ "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig";
69+
static final String TYPE_URL_OVERRIDE_CONFIG = "type.googleapis.com/"
70+
+ "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride";
71+
72+
private final AtomicReference<RlqsCache> rlqsCache = new AtomicReference<>();
73+
74+
public RlqsFilter() {
75+
// TODO(sergiitk): one per new instance when filters are refactored.
76+
logger = XdsLogger.withLogId(InternalLogId.allocate(this.getClass(), null));
77+
logger.log(XdsLogLevel.DEBUG,
78+
"Created RLQS Filter with enabled=" + enabled + ", dryRun=" + dryRun);
79+
}
80+
81+
@Override
82+
public String[] typeUrls() {
83+
return new String[]{TYPE_URL, TYPE_URL_OVERRIDE_CONFIG};
84+
}
85+
86+
@Override
87+
public boolean isEnabled() {
88+
return enabled;
89+
}
90+
91+
@Override
92+
public ConfigOrError<RlqsFilterConfig> parseFilterConfig(Message rawProtoMessage) {
93+
try {
94+
RlqsFilterConfig rlqsFilterConfig =
95+
parseRlqsFilter(unpackAny(rawProtoMessage, RateLimitQuotaFilterConfig.class));
96+
return ConfigOrError.fromConfig(rlqsFilterConfig);
97+
} catch (InvalidProtocolBufferException e) {
98+
return ConfigOrError.fromError("Can't unpack RateLimitQuotaFilterConfig proto: " + e);
99+
} catch (ResourceInvalidException e) {
100+
return ConfigOrError.fromError(e.getMessage());
101+
}
102+
}
103+
104+
@Override
105+
public ConfigOrError<RlqsFilterConfig> parseFilterConfigOverride(Message rawProtoMessage) {
106+
try {
107+
RlqsFilterConfig rlqsFilterConfig =
108+
parseRlqsFilterOverride(unpackAny(rawProtoMessage, RateLimitQuotaOverride.class));
109+
return ConfigOrError.fromConfig(rlqsFilterConfig);
110+
} catch (InvalidProtocolBufferException e) {
111+
return ConfigOrError.fromError("Can't unpack RateLimitQuotaOverride proto: " + e);
112+
} catch (ResourceInvalidException e) {
113+
return ConfigOrError.fromError(e.getMessage());
114+
}
115+
}
116+
117+
@Nullable
118+
@Override
119+
public ServerInterceptor buildServerInterceptor(
120+
FilterConfig config, @Nullable FilterConfig overrideConfig) {
121+
throw new UnsupportedOperationException("ScheduledExecutorService scheduler required");
122+
}
123+
124+
@Override
125+
public ServerInterceptor buildServerInterceptor(
126+
FilterConfig config,
127+
@Nullable FilterConfig overrideConfig,
128+
ScheduledExecutorService scheduler) {
129+
// Called when we get an xds update - when the LRS or RLS changes.
130+
RlqsFilterConfig rlqsFilterConfig = (RlqsFilterConfig) checkNotNull(config, "config");
131+
132+
// Per-route and per-host configuration overrides.
133+
if (overrideConfig != null) {
134+
RlqsFilterConfig rlqsFilterOverride = (RlqsFilterConfig) overrideConfig;
135+
// All fields are inherited from the main config, unless overridden.
136+
RlqsFilterConfig.Builder overrideBuilder = rlqsFilterConfig.toBuilder();
137+
if (!rlqsFilterOverride.domain().isEmpty()) {
138+
overrideBuilder.domain(rlqsFilterOverride.domain());
139+
}
140+
if (rlqsFilterOverride.bucketMatchers() != null) {
141+
overrideBuilder.bucketMatchers(rlqsFilterOverride.bucketMatchers());
142+
}
143+
// Override bucket matchers if not null.
144+
rlqsFilterConfig = overrideBuilder.build();
145+
}
146+
147+
rlqsCache.compareAndSet(null, RlqsCache.newInstance(scheduler));
148+
return generateRlqsInterceptor(rlqsFilterConfig);
149+
}
150+
151+
@Override
152+
public void shutdown() {
153+
// TODO(sergiitk): [DESIGN] besides shutting down everything, should there
154+
// be per-route interceptor destructors?
155+
RlqsCache oldCache = rlqsCache.getAndUpdate(unused -> null);
156+
if (oldCache != null) {
157+
oldCache.shutdown();
158+
}
159+
}
160+
161+
@Nullable
162+
private ServerInterceptor generateRlqsInterceptor(RlqsFilterConfig config) {
163+
checkNotNull(config, "config");
164+
checkNotNull(config.rlqsService(), "config.rlqsService");
165+
RlqsCache rlqsCache = this.rlqsCache.get();
166+
if (rlqsCache == null) {
167+
// Being shut down, return no interceptor.
168+
return null;
169+
}
170+
171+
final RlqsFilterState rlqsFilterState = rlqsCache.getOrCreateFilterState(config);
172+
173+
return new ServerInterceptor() {
174+
@Override
175+
public <ReqT, RespT> Listener<ReqT> interceptCall(
176+
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
177+
HttpMatchInput httpMatchInput = HttpMatchInput.create(headers, call);
178+
179+
// TODO(sergiitk): [IMPL] Remove
180+
if (dryRun) {
181+
// logger.log(XdsLogLevel.INFO, "RLQS DRY RUN: request <<" + httpMatchInput + ">>");
182+
return next.startCall(call, headers);
183+
}
184+
185+
RlqsRateLimitResult result = rlqsFilterState.rateLimit(httpMatchInput);
186+
if (result.isAllowed()) {
187+
return next.startCall(call, headers);
188+
}
189+
RlqsRateLimitResult.DenyResponse denyResponse = result.denyResponse().get();
190+
call.close(denyResponse.status(), denyResponse.headersToAdd());
191+
return new ServerCall.Listener<ReqT>(){};
192+
}
193+
};
194+
}
195+
196+
@VisibleForTesting
197+
RlqsFilterConfig parseRlqsFilter(RateLimitQuotaFilterConfig rlqsFilterProto)
198+
throws ResourceInvalidException, InvalidProtocolBufferException {
199+
RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder();
200+
if (rlqsFilterProto.getDomain().isEmpty()) {
201+
throw new ResourceInvalidException("RateLimitQuotaFilterConfig domain is required");
202+
}
203+
builder.domain(rlqsFilterProto.getDomain())
204+
.rlqsService(GrpcService.fromEnvoyProto(rlqsFilterProto.getRlqsServer()));
205+
206+
// TODO(sergiitk): [IMPL] Remove
207+
if (dryRun) {
208+
logger.log(XdsLogLevel.DEBUG, "Dry run: not parsing matchers in the filter filter");
209+
return builder.build();
210+
}
211+
212+
// TODO(sergiitk): [IMPL] actually parse, move to RlqsBucketSettings.fromProto()
213+
RateLimitQuotaBucketSettings fallbackBucketSettingsProto = unpackAny(
214+
rlqsFilterProto.getBucketMatchers().getOnNoMatch().getAction().getTypedConfig(),
215+
RateLimitQuotaBucketSettings.class);
216+
RlqsBucketSettings fallbackBucket = RlqsBucketSettings.create(
217+
ImmutableMap.of("bucket_id", headers -> "hello"),
218+
fallbackBucketSettingsProto.getReportingInterval());
219+
220+
// TODO(sergiitk): [IMPL] actually parse, move to Matcher.fromProto()
221+
Matcher<HttpMatchInput, RlqsBucketSettings> bucketMatchers = new RlqsMatcher(fallbackBucket);
222+
223+
return builder.bucketMatchers(bucketMatchers).build();
224+
}
225+
226+
static class RlqsMatcher extends Matcher<HttpMatchInput, RlqsBucketSettings> {
227+
private final RlqsBucketSettings fallbackBucket;
228+
229+
RlqsMatcher(RlqsBucketSettings fallbackBucket) {
230+
this.fallbackBucket = fallbackBucket;
231+
}
232+
233+
@Nullable
234+
@Override
235+
public MatcherList<HttpMatchInput, RlqsBucketSettings> matcherList() {
236+
return null;
237+
}
238+
239+
@Override
240+
public OnMatch<HttpMatchInput, RlqsBucketSettings> onNoMatch() {
241+
return OnMatch.ofAction(fallbackBucket);
242+
}
243+
244+
@Override
245+
public RlqsBucketSettings match(HttpMatchInput input) {
246+
return null;
247+
}
248+
}
249+
250+
@VisibleForTesting
251+
static RlqsFilterConfig parseRlqsFilterOverride(RateLimitQuotaOverride rlqsFilterProtoOverride)
252+
throws ResourceInvalidException {
253+
RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder();
254+
// TODO(sergiitk): [IMPL] bucket_matchers.
255+
256+
return builder.domain(rlqsFilterProtoOverride.getDomain()).build();
257+
}
258+
259+
private static <T extends com.google.protobuf.Message> T unpackAny(
260+
Message message, Class<T> clazz) throws InvalidProtocolBufferException {
261+
if (!(message instanceof Any)) {
262+
throw new InvalidProtocolBufferException(
263+
"Invalid config type: " + message.getClass().getCanonicalName());
264+
}
265+
return ((Any) message).unpack(clazz);
266+
}
267+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2024 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.xds;
18+
19+
import com.google.auto.value.AutoValue;
20+
import io.grpc.xds.Filter.FilterConfig;
21+
import io.grpc.xds.internal.datatype.GrpcService;
22+
import io.grpc.xds.internal.matchers.HttpMatchInput;
23+
import io.grpc.xds.internal.matchers.Matcher;
24+
import io.grpc.xds.internal.rlqs.RlqsBucketSettings;
25+
import javax.annotation.Nullable;
26+
27+
/** Parsed RateLimitQuotaFilterConfig. */
28+
@AutoValue
29+
public abstract class RlqsFilterConfig implements FilterConfig {
30+
31+
@Override
32+
public final String typeUrl() {
33+
return RlqsFilter.TYPE_URL;
34+
}
35+
36+
public abstract String domain();
37+
38+
// TODO(sergiitk): make not nullable, introduce RlqsFilterConfigOverride
39+
@Nullable
40+
public abstract GrpcService rlqsService();
41+
42+
// TODO(sergiitk): make not nullable, introduce RlqsFilterConfigOverride
43+
@Nullable
44+
public abstract Matcher<HttpMatchInput, RlqsBucketSettings> bucketMatchers();
45+
46+
public static Builder builder() {
47+
return new AutoValue_RlqsFilterConfig.Builder();
48+
}
49+
50+
abstract Builder toBuilder();
51+
52+
@AutoValue.Builder
53+
abstract static class Builder {
54+
abstract Builder domain(String domain);
55+
56+
abstract Builder rlqsService(GrpcService rlqsService);
57+
58+
public abstract Builder bucketMatchers(Matcher<HttpMatchInput, RlqsBucketSettings> matcher);
59+
60+
abstract RlqsFilterConfig build();
61+
}
62+
63+
}

0 commit comments

Comments
 (0)